Skip to main content

zig_core/
run.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use serde::Serialize;
7use tokio::task::JoinSet;
8use zag_agent::builder::AgentBuilder;
9use zag_agent::{plan as agent_plan, review as agent_review};
10use zag_orch::collect as orch_collect;
11use zag_orch::summary as orch_summary;
12
13use crate::config::ZigConfig;
14use crate::dry_run::{DryRunContext, DryRunFormat};
15use crate::error::ZigError;
16use crate::memory::{MemoryCollector, render_memory_block};
17use crate::paths::expand_path;
18use crate::resources::{ResourceCollector, render_system_block};
19use crate::session::{OutputStream, SessionCoordinator, SessionStatus, SessionWriter};
20use crate::storage::{FilesystemBackend, StorageManager};
21use crate::workflow::model::{FailurePolicy, MemoryMode, Role, Step, StepCommand, Workflow};
22use crate::workflow::{parser, validate};
23
24/// Maximum number of loop iterations to prevent infinite loops from `next` fields.
25const MAX_LOOP_ITERATIONS: usize = 100;
26
27/// Execute a workflow file (`.zwf` or `.zwfz`).
28///
29/// Parses the workflow, validates it, resolves the step DAG, and executes
30/// each step via the embedded [`zag_agent::builder::AgentBuilder`] (and
31/// [`zag_orch`] for pipe/collect/summary commands). The optional
32/// `user_prompt` is injected as additional context into every step's prompt.
33///
34/// Resource advertisement (the `<resources>` block prepended to each step's
35/// system prompt) is enabled by default; pass `disable_resources = true` to
36/// opt out, e.g. via `zig run --no-resources`.
37///
38/// Memory injection (the `<memory>` block) is similarly enabled by default;
39/// pass `disable_memory = true` to opt out via `zig run --no-memory`.
40///
41/// Storage injection (the `<storage>` block) is similarly enabled by default;
42/// pass `disable_storage = true` to opt out via `zig run --no-storage`.
43///
44/// When `dry_run = true`, the workflow is parsed, validated, and its plan is
45/// printed in the requested `format` — no agent invocation, session log,
46/// storage creation, or memory write occurs. The three `disable_*` flags are
47/// respected and surface in the plan output.
48#[allow(clippy::too_many_arguments)]
49pub async fn run_workflow(
50    workflow_path: &str,
51    user_prompt: Option<&str>,
52    disable_resources: bool,
53    disable_memory: bool,
54    disable_storage: bool,
55    dry_run: bool,
56    dry_run_format: DryRunFormat,
57) -> Result<(), ZigError> {
58    let path = resolve_workflow_path(workflow_path)?;
59    let (workflow, source) = parser::parse_workflow(&path)?;
60
61    if let Err(errors) = validate::validate(&workflow) {
62        let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
63        return Err(ZigError::Validation(msgs.join("; ")));
64    }
65
66    execute(
67        &workflow,
68        &path,
69        user_prompt,
70        source.dir(),
71        disable_resources,
72        disable_memory,
73        disable_storage,
74        dry_run,
75        dry_run_format,
76    )
77    .await
78}
79
80/// Resolve a workflow argument to an actual file path.
81///
82/// Tries in order:
83/// 1. Literal path as given
84/// 2. With `.zwf` extension appended
85/// 3. With `.zwfz` extension appended
86/// 4. Under local project `.zig/workflows/` directory
87/// 5. Under local project `.zig/workflows/` with `.zwf` appended
88/// 6. Under local project `.zig/workflows/` with `.zwfz` appended
89/// 7. Under global `~/.zig/workflows/` directory
90/// 8. Under global `~/.zig/workflows/` with `.zwf` appended
91/// 9. Under global `~/.zig/workflows/` with `.zwfz` appended
92pub fn resolve_workflow_path(workflow: &str) -> Result<PathBuf, ZigError> {
93    let mut candidates = vec![
94        PathBuf::from(workflow),
95        PathBuf::from(format!("{workflow}.zwf")),
96        PathBuf::from(format!("{workflow}.zwfz")),
97    ];
98
99    if let Some(local_dir) = crate::paths::cwd_workflows_dir() {
100        candidates.push(local_dir.join(workflow));
101        candidates.push(local_dir.join(format!("{workflow}.zwf")));
102        candidates.push(local_dir.join(format!("{workflow}.zwfz")));
103    }
104
105    if let Some(global_dir) = crate::paths::global_workflows_dir() {
106        candidates.push(global_dir.join(workflow));
107        candidates.push(global_dir.join(format!("{workflow}.zwf")));
108        candidates.push(global_dir.join(format!("{workflow}.zwfz")));
109    }
110
111    for candidate in &candidates {
112        if candidate.exists() {
113            return Ok(candidate.clone());
114        }
115    }
116
117    Err(ZigError::Io(format!(
118        "workflow not found: '{workflow}' (tried: {})",
119        candidates
120            .iter()
121            .map(|p| p.display().to_string())
122            .collect::<Vec<_>>()
123            .join(", ")
124    )))
125}
126
127/// Compute a topological ordering of steps grouped into tiers.
128///
129/// Each tier contains steps whose dependencies are all in earlier tiers,
130/// meaning steps within a tier can (in principle) run in parallel.
131/// Uses Kahn's algorithm.
132pub(crate) fn topological_sort(steps: &[Step]) -> Result<Vec<Vec<&Step>>, ZigError> {
133    let step_index: HashMap<&str, usize> = steps
134        .iter()
135        .enumerate()
136        .map(|(i, s)| (s.name.as_str(), i))
137        .collect();
138
139    let mut in_degree = vec![0usize; steps.len()];
140    for (i, step) in steps.iter().enumerate() {
141        for dep in &step.depends_on {
142            if step_index.contains_key(dep.as_str()) {
143                in_degree[i] += 1;
144            }
145        }
146    }
147
148    let mut tiers = Vec::new();
149    let mut remaining = in_degree.clone();
150    let mut completed: Vec<bool> = vec![false; steps.len()];
151
152    loop {
153        let tier: Vec<usize> = (0..steps.len())
154            .filter(|&i| !completed[i] && remaining[i] == 0)
155            .collect();
156
157        if tier.is_empty() {
158            break;
159        }
160
161        for &i in &tier {
162            completed[i] = true;
163        }
164
165        // Decrement in-degrees for dependents of this tier
166        for &i in &tier {
167            for (j, step) in steps.iter().enumerate() {
168                if !completed[j] && step.depends_on.contains(&steps[i].name) {
169                    remaining[j] -= 1;
170                }
171            }
172        }
173
174        tiers.push(tier.iter().map(|&i| &steps[i]).collect());
175    }
176
177    let completed_count: usize = completed.iter().filter(|&&c| c).count();
178    if completed_count != steps.len() {
179        return Err(ZigError::Execution(
180            "could not resolve all steps — possible undetected cycle".into(),
181        ));
182    }
183
184    Ok(tiers)
185}
186
187/// Replace `${var_name}` references in a template with values from the variable map.
188///
189/// Supports dotted paths like `${result.score}` — the root variable name is
190/// looked up, and if its value is valid JSON, the nested path is traversed.
191/// Unknown variables are left as-is.
192pub(crate) fn substitute_vars(template: &str, vars: &HashMap<String, String>) -> String {
193    let mut result = String::with_capacity(template.len());
194    let mut rest = template;
195
196    while let Some(start) = rest.find("${") {
197        result.push_str(&rest[..start]);
198        let after_start = &rest[start + 2..];
199
200        if let Some(end) = after_start.find('}') {
201            let var_expr = &after_start[..end];
202            let mut parts = var_expr.splitn(2, '.');
203            let root = parts.next().unwrap_or(var_expr);
204
205            if let Some(value) = vars.get(root) {
206                if let Some(path) = parts.next() {
207                    // Try to navigate a JSON path
208                    if let Ok(json) = serde_json::from_str::<serde_json::Value>(value) {
209                        let resolved = json_path_lookup(&json, path);
210                        result.push_str(&resolved);
211                    } else {
212                        result.push_str(value);
213                    }
214                } else {
215                    result.push_str(value);
216                }
217            } else {
218                // Unknown variable — leave as-is
219                result.push_str(&rest[start..start + 2 + end + 1]);
220            }
221
222            rest = &after_start[end + 1..];
223        } else {
224            result.push_str(&rest[start..]);
225            rest = "";
226        }
227    }
228
229    result.push_str(rest);
230    result
231}
232
233/// Look up a dotted path in a JSON value (e.g., "nested.field").
234fn json_path_lookup(value: &serde_json::Value, path: &str) -> String {
235    let mut current = value;
236    for key in path.split('.') {
237        match current.get(key) {
238            Some(v) => current = v,
239            None => return format!("${{?.{path}}}"),
240        }
241    }
242    match current {
243        serde_json::Value::String(s) => s.clone(),
244        other => other.to_string(),
245    }
246}
247
248/// Resolve the effective system prompt for a step, with any advertised
249/// resources prepended as a `<resources>` block.
250///
251/// Resolution order:
252/// 1. If `step.system_prompt` is set, use it (with variable substitution).
253/// 2. If `step.role` is set, resolve the role name (may contain `${var}`),
254///    look it up in the roles table, and use the role's system prompt
255///    (loaded from file if `system_prompt_file` is set).
256/// 3. Otherwise the base prompt is empty.
257///
258/// Resources from all configured tiers (global shared, global per-workflow,
259/// project cwd, inline workflow, inline step) are then collected by the
260/// supplied [`ResourceCollector`] and rendered into a `<resources>` block
261/// that is prepended to the base prompt. If both the resources set and the
262/// base prompt are empty, returns `None` — keeping the current behavior when
263/// nothing is configured.
264#[allow(clippy::too_many_arguments)]
265pub(crate) fn resolve_role_system_prompt(
266    step: &Step,
267    roles: &HashMap<String, Role>,
268    resources: &ResourceCollector<'_>,
269    memory: &MemoryCollector,
270    storage: &StorageManager,
271    vars: &HashMap<String, String>,
272    workflow_dir: &Path,
273    workflow_name: &str,
274) -> Result<Option<String>, ZigError> {
275    // Resolve the base system prompt (may be empty if neither is set).
276    let base_prompt: Option<String> = if let Some(ref sp) = step.system_prompt {
277        Some(substitute_vars(sp, vars))
278    } else if let Some(ref role_ref) = step.role {
279        let resolved_name = substitute_vars(role_ref, vars);
280        let role = roles.get(&resolved_name).ok_or_else(|| {
281            ZigError::Execution(format!(
282                "step '{}' references role '{}' which does not exist",
283                step.name, resolved_name
284            ))
285        })?;
286
287        let raw_prompt = if let Some(ref file_path) = role.system_prompt_file {
288            let full_path = workflow_dir.join(expand_path(file_path));
289            Some(std::fs::read_to_string(&full_path).map_err(|e| {
290                ZigError::Execution(format!(
291                    "failed to read system_prompt_file '{}' for role '{}': {e}",
292                    full_path.display(),
293                    resolved_name
294                ))
295            })?)
296        } else {
297            role.system_prompt.clone()
298        };
299
300        raw_prompt.map(|p| substitute_vars(&p, vars))
301    } else {
302        None
303    };
304
305    // Collect and render resources.
306    let set = resources.collect_for_step(&step.resources)?;
307    let resource_block = render_system_block(&set);
308
309    // Collect and render memory.
310    let memory_entries = memory.collect_for_step(step.memory.as_deref())?;
311    let memory_block = render_memory_block(&memory_entries, workflow_name, Some(&step.name));
312
313    // Render storage block for this step's scope.
314    let storage_block = match storage.render_block(step.storage.as_deref())? {
315        Some(mut s) => {
316            s.push('\n');
317            s
318        }
319        None => String::new(),
320    };
321
322    let prefix = format!("{resource_block}{memory_block}{storage_block}");
323
324    match (prefix.is_empty(), base_prompt) {
325        (true, None) => Ok(None),
326        (true, Some(p)) => Ok(Some(p)),
327        (false, None) => Ok(Some(prefix.trim_end().to_string())),
328        (false, Some(p)) => Ok(Some(format!("{prefix}{p}"))),
329    }
330}
331
332/// Load file-backed default values for variables.
333///
334/// For each variable with `default_file` set (and no `default`), reads the
335/// file contents relative to `workflow_dir` and inserts them into the vars map.
336fn load_file_defaults(
337    vars: &mut HashMap<String, String>,
338    declarations: &HashMap<String, crate::workflow::model::Variable>,
339    workflow_dir: &Path,
340) -> Result<(), ZigError> {
341    for (name, decl) in declarations {
342        if decl.default.is_none() {
343            if let Some(ref file_path) = decl.default_file {
344                let full_path = workflow_dir.join(expand_path(file_path));
345                let content = std::fs::read_to_string(&full_path).map_err(|e| {
346                    ZigError::Execution(format!(
347                        "failed to read default_file '{}' for variable '{name}': {e}",
348                        full_path.display()
349                    ))
350                })?;
351                vars.insert(name.clone(), content);
352            }
353        }
354    }
355    Ok(())
356}
357
358/// Evaluate a simple condition expression against the current variable state.
359///
360/// Supports:
361/// - Numeric comparisons: `score < 8`, `retries <= max_retries`
362/// - String equality: `status == "done"`, `status != "pending"`
363/// - Truthy checks: `approved` (true if value is "true" or non-empty and non-zero)
364pub(crate) fn evaluate_condition(
365    condition: &str,
366    vars: &HashMap<String, String>,
367) -> Result<bool, ZigError> {
368    let condition = condition.trim();
369
370    // Try comparison operators (ordered by length to match `<=` before `<`)
371    let operators = ["<=", ">=", "!=", "==", "<", ">"];
372    for op in &operators {
373        if let Some(pos) = condition.find(op) {
374            let lhs = resolve_operand(condition[..pos].trim(), vars);
375            let rhs = resolve_operand(condition[pos + op.len()..].trim(), vars);
376            return Ok(compare(&lhs, &rhs, op));
377        }
378    }
379
380    // Truthy check: single variable name
381    let value = vars.get(condition).map(|s| s.as_str()).unwrap_or("");
382    Ok(is_truthy(value))
383}
384
385/// Resolve a condition operand to its string value.
386/// - String literals ("done") → done
387/// - Variable names → looked up in vars
388/// - Numeric literals → left as-is
389fn resolve_operand(token: &str, vars: &HashMap<String, String>) -> String {
390    // Strip surrounding quotes for string literals
391    if (token.starts_with('"') && token.ends_with('"'))
392        || (token.starts_with('\'') && token.ends_with('\''))
393    {
394        return token[1..token.len() - 1].to_string();
395    }
396    // Try variable lookup
397    if let Some(val) = vars.get(token) {
398        return val.clone();
399    }
400    // Return as-is (numeric literal or unknown)
401    token.to_string()
402}
403
404/// Compare two string operands with the given operator.
405/// Attempts numeric comparison first, falls back to lexicographic.
406fn compare(lhs: &str, rhs: &str, op: &str) -> bool {
407    if let (Ok(l), Ok(r)) = (lhs.parse::<f64>(), rhs.parse::<f64>()) {
408        return match op {
409            "==" => (l - r).abs() < f64::EPSILON,
410            "!=" => (l - r).abs() >= f64::EPSILON,
411            "<" => l < r,
412            ">" => l > r,
413            "<=" => l <= r,
414            ">=" => l >= r,
415            _ => false,
416        };
417    }
418    match op {
419        "==" => lhs == rhs,
420        "!=" => lhs != rhs,
421        "<" => lhs < rhs,
422        ">" => lhs > rhs,
423        "<=" => lhs <= rhs,
424        ">=" => lhs >= rhs,
425        _ => false,
426    }
427}
428
429/// Check if a string value is truthy.
430fn is_truthy(value: &str) -> bool {
431    !value.is_empty() && value != "false" && value != "0"
432}
433
434/// Build the final prompt for a step, incorporating variable substitution,
435/// dependency outputs, and the user's context prompt.
436pub(crate) fn render_step_prompt(
437    step: &Step,
438    vars: &HashMap<String, String>,
439    user_prompt: Option<&str>,
440    dependency_outputs: &HashMap<String, String>,
441) -> String {
442    let mut prompt = String::new();
443
444    // Prepend user context if provided
445    if let Some(ctx) = user_prompt {
446        prompt.push_str(&format!("User context: {ctx}\n\n"));
447    }
448
449    // Inject dependency outputs if requested
450    if step.inject_context {
451        for dep in &step.depends_on {
452            if let Some(output) = dependency_outputs.get(dep) {
453                prompt.push_str(&format!("--- Output from '{dep}' ---\n{output}\n\n"));
454            }
455        }
456    }
457
458    // Append the step's prompt with variable substitution
459    prompt.push_str(&substitute_vars(&step.prompt, vars));
460
461    prompt
462}
463
464/// Serializable snapshot of everything a step contributes to an agent
465/// invocation. Produced by [`build_agent_config`] and used by both the
466/// executor (to configure a [`AgentBuilder`]) and `--dry-run` (to show
467/// the author what each step will ask of the agent).
468///
469/// The shape mirrors [`AgentBuilder`] field-for-field; additional
470/// command-specific parameters (review/plan/pipe/collect/summary) are
471/// stored in the optional `command_params` bag.
472#[derive(Debug, Clone, Serialize, Default)]
473pub struct AgentConfig {
474    /// Subcommand label — `"run"`, `"review"`, `"plan"`, `"pipe"`,
475    /// `"collect"`, or `"summary"`.
476    pub command: String,
477
478    /// Agent-level knobs.
479    pub provider: Option<String>,
480    pub model: Option<String>,
481    pub system_prompt: Option<String>,
482    pub root: Option<String>,
483    pub add_dirs: Vec<String>,
484    #[serde(serialize_with = "serialize_env_pairs")]
485    pub env: Vec<(String, String)>,
486    pub files: Vec<String>,
487    pub auto_approve: bool,
488    /// `None` → no worktree, `Some(None)` → generated name, `Some(Some(n))` → explicit.
489    #[serde(skip_serializing_if = "Option::is_none")]
490    pub worktree: Option<Option<String>>,
491    #[serde(skip_serializing_if = "Option::is_none")]
492    pub sandbox: Option<String>,
493
494    /// Output shaping.
495    pub json_mode: bool,
496    #[serde(skip_serializing_if = "Option::is_none")]
497    pub json_schema: Option<String>,
498    #[serde(skip_serializing_if = "Option::is_none")]
499    pub output_format: Option<String>,
500
501    /// Turn / timeout / MCP.
502    #[serde(skip_serializing_if = "Option::is_none")]
503    pub max_turns: Option<u32>,
504    #[serde(skip_serializing_if = "Option::is_none")]
505    pub timeout: Option<String>,
506    #[serde(skip_serializing_if = "Option::is_none")]
507    pub mcp_config: Option<String>,
508
509    /// Session metadata — always set (session name derives from workflow/step).
510    pub session_name: String,
511    #[serde(skip_serializing_if = "Option::is_none")]
512    pub description: Option<String>,
513    pub tags: Vec<String>,
514
515    /// The effective prompt after dependency/context/plan prepending.
516    pub prompt: String,
517
518    /// Whether `accepts_agent_args` was true for this command — pipe/run/
519    /// review/plan/exec respect agent-level flags; collect/summary don't.
520    pub accepts_agent_args: bool,
521
522    /// Extra params for the non-plain commands. `None` for `run`.
523    #[serde(skip_serializing_if = "Option::is_none")]
524    pub command_params: Option<CommandParams>,
525
526    /// Interactive flag — steps with `interactive = true` run through
527    /// `AgentBuilder::run` instead of `exec`.
528    pub interactive: bool,
529}
530
531fn serialize_env_pairs<S>(pairs: &[(String, String)], s: S) -> Result<S::Ok, S::Error>
532where
533    S: serde::Serializer,
534{
535    use serde::ser::SerializeSeq;
536    let mut seq = s.serialize_seq(Some(pairs.len()))?;
537    for (k, v) in pairs {
538        seq.serialize_element(&format!("{k}={v}"))?;
539    }
540    seq.end()
541}
542
543/// Command-specific parameter bag. Only populated for non-`run` commands.
544#[derive(Debug, Clone, Serialize)]
545#[serde(tag = "kind", rename_all = "snake_case")]
546pub enum CommandParams {
547    Review {
548        uncommitted: bool,
549        base: Option<String>,
550        commit: Option<String>,
551        title: Option<String>,
552    },
553    Plan {
554        output: Option<String>,
555        instructions: Option<String>,
556    },
557    Pipe {
558        session_ids: Vec<String>,
559    },
560    Collect {
561        session_ids: Vec<String>,
562    },
563    Summary {
564        session_ids: Vec<String>,
565    },
566}
567
568/// Build an [`AgentConfig`] snapshot for a step. Replaces the old argv
569/// builder (`build_zag_args`) — the returned config is applied to an
570/// [`AgentBuilder`] at execution time by [`apply_agent_config`].
571#[allow(clippy::too_many_arguments)]
572pub(crate) fn build_agent_config(
573    step: &Step,
574    prompt: &str,
575    workflow_name: &str,
576    model_override: Option<&str>,
577    rendered_system_prompt: Option<&str>,
578    workflow_provider: Option<&str>,
579    workflow_model: Option<&str>,
580    extra_add_dirs: &[std::path::PathBuf],
581) -> AgentConfig {
582    let session_name = |dep: &str| format!("zig-{workflow_name}-{dep}");
583
584    let (command_label, accepts_agent_args, command_params) = match &step.command {
585        None => ("run".to_string(), true, None),
586        Some(StepCommand::Review) => (
587            "review".to_string(),
588            true,
589            Some(CommandParams::Review {
590                uncommitted: step.uncommitted,
591                base: step.base.clone(),
592                commit: step.commit.clone(),
593                title: step.title.clone(),
594            }),
595        ),
596        Some(StepCommand::Plan) => (
597            "plan".to_string(),
598            true,
599            Some(CommandParams::Plan {
600                output: step.plan_output.as_deref().map(expand_path),
601                instructions: step.instructions.clone(),
602            }),
603        ),
604        Some(StepCommand::Pipe) => {
605            let session_ids: Vec<String> =
606                step.depends_on.iter().map(|d| session_name(d)).collect();
607            (
608                "pipe".to_string(),
609                true,
610                Some(CommandParams::Pipe { session_ids }),
611            )
612        }
613        Some(StepCommand::Collect) => {
614            let session_ids: Vec<String> =
615                step.depends_on.iter().map(|d| session_name(d)).collect();
616            (
617                "collect".to_string(),
618                false,
619                Some(CommandParams::Collect { session_ids }),
620            )
621        }
622        Some(StepCommand::Summary) => {
623            let session_ids: Vec<String> =
624                step.depends_on.iter().map(|d| session_name(d)).collect();
625            (
626                "summary".to_string(),
627                false,
628                Some(CommandParams::Summary { session_ids }),
629            )
630        }
631    };
632
633    // Build the agent config. Agent-level knobs only apply when the
634    // command accepts them (matching the old `accepts_agent_args` gate).
635    let mut cfg = AgentConfig {
636        command: command_label,
637        session_name: session_name(&step.name),
638        description: if step.description.is_empty() {
639            None
640        } else {
641            Some(step.description.clone())
642        },
643        tags: {
644            let mut t = vec!["zig-workflow".to_string()];
645            t.extend(step.tags.iter().cloned());
646            t
647        },
648        timeout: step.timeout.clone(),
649        prompt: prompt.to_string(),
650        accepts_agent_args,
651        command_params,
652        interactive: step.interactive,
653        ..Default::default()
654    };
655
656    if !accepts_agent_args {
657        return cfg;
658    }
659
660    cfg.provider = step
661        .provider
662        .clone()
663        .or_else(|| workflow_provider.map(String::from));
664    cfg.model = model_override
665        .map(String::from)
666        .or_else(|| step.model.clone())
667        .or_else(|| workflow_model.map(String::from));
668    cfg.system_prompt = rendered_system_prompt.map(String::from);
669    cfg.max_turns = step.max_turns;
670
671    // Output format: explicit `output` overrides the `json` bool; the
672    // two map onto AgentBuilder::output_format and AgentBuilder::json.
673    if let Some(output) = &step.output {
674        cfg.output_format = Some(output.clone());
675    } else if step.json {
676        cfg.json_mode = true;
677    }
678    cfg.json_schema = step.json_schema.clone();
679    cfg.mcp_config = step.mcp_config.as_deref().map(expand_path);
680
681    cfg.auto_approve = step.auto_approve;
682    cfg.root = step.root.as_deref().map(expand_path);
683    cfg.add_dirs = step
684        .add_dirs
685        .iter()
686        .map(|d| expand_path(d))
687        .chain(extra_add_dirs.iter().map(|p| p.display().to_string()))
688        .collect();
689    cfg.env = step
690        .env
691        .iter()
692        .map(|(k, v)| (k.clone(), v.clone()))
693        .collect();
694    cfg.files = step.files.iter().map(|f| expand_path(f)).collect();
695
696    // Isolation
697    if step.worktree {
698        cfg.worktree = Some(None);
699    }
700    cfg.sandbox = step.sandbox.clone();
701
702    cfg
703}
704
705/// Apply an [`AgentConfig`] to an [`AgentBuilder`]. The prompt itself
706/// is NOT set here — callers pass it to `exec`/`run` directly.
707pub(crate) fn apply_agent_config(mut builder: AgentBuilder, cfg: &AgentConfig) -> AgentBuilder {
708    if let Some(ref p) = cfg.provider {
709        builder = builder.provider(p);
710    }
711    if let Some(ref m) = cfg.model {
712        builder = builder.model(m);
713    }
714    if let Some(ref sp) = cfg.system_prompt {
715        builder = builder.system_prompt(sp);
716    }
717    if let Some(ref r) = cfg.root {
718        builder = builder.root(r);
719    }
720    if cfg.auto_approve {
721        builder = builder.auto_approve(true);
722    }
723    for dir in &cfg.add_dirs {
724        builder = builder.add_dir(dir);
725    }
726    for (k, v) in &cfg.env {
727        builder = builder.env(k, v);
728    }
729    for f in &cfg.files {
730        builder = builder.file(f);
731    }
732    if let Some(ref wt) = cfg.worktree {
733        builder = builder.worktree(wt.as_deref());
734    }
735    if let Some(ref sb) = cfg.sandbox {
736        builder = builder.sandbox(Some(sb));
737    }
738    if let Some(ref fmt) = cfg.output_format {
739        builder = builder.output_format(fmt);
740    }
741    if cfg.json_mode {
742        if let Some(ref schema) = cfg.json_schema {
743            if let Ok(v) = serde_json::from_str::<serde_json::Value>(schema) {
744                builder = builder.json_schema(v);
745            } else {
746                builder = builder.json();
747            }
748        } else {
749            builder = builder.json();
750        }
751    } else if let Some(ref schema) = cfg.json_schema {
752        if let Ok(v) = serde_json::from_str::<serde_json::Value>(schema) {
753            builder = builder.json_schema(v);
754        }
755    }
756    if let Some(turns) = cfg.max_turns {
757        builder = builder.max_turns(turns);
758    }
759    if let Some(ref t) = cfg.timeout {
760        if let Some(dur) = parse_timeout_string(t) {
761            builder = builder.timeout(dur);
762        }
763    }
764    if let Some(ref mcp) = cfg.mcp_config {
765        builder = builder.mcp_config(mcp);
766    }
767    builder = builder.name(&cfg.session_name);
768    if let Some(ref d) = cfg.description {
769        builder = builder.description(d);
770    }
771    for tag in &cfg.tags {
772        builder = builder.tag(tag);
773    }
774    builder
775}
776
777/// Parse a duration string like `"5m"`, `"30s"`, `"1h30m"` into a
778/// [`std::time::Duration`]. Returns `None` if the format is unrecognised.
779fn parse_timeout_string(s: &str) -> Option<Duration> {
780    // Mirrors zag_orch::duration::parse_duration — allows `1h30m`, `5m`,
781    // `30s`, `500ms`, bare seconds (`60`). Keep the dependency inline so
782    // we don't have to thread its error type through ZigError.
783    let s = s.trim();
784    if s.is_empty() {
785        return None;
786    }
787    if let Ok(secs) = s.parse::<u64>() {
788        return Some(Duration::from_secs(secs));
789    }
790    let mut total = Duration::ZERO;
791    let mut current = String::new();
792    let mut chars = s.chars().peekable();
793    while let Some(c) = chars.next() {
794        if c.is_ascii_digit() || c == '.' {
795            current.push(c);
796            continue;
797        }
798        let mut unit = String::from(c);
799        if c == 'm' && chars.peek() == Some(&'s') {
800            unit.push(chars.next().unwrap());
801        }
802        let value: f64 = current.parse().ok()?;
803        current.clear();
804        let piece = match unit.as_str() {
805            "ms" => Duration::from_millis(value as u64),
806            "s" => Duration::from_secs_f64(value),
807            "m" => Duration::from_secs_f64(value * 60.0),
808            "h" => Duration::from_secs_f64(value * 3600.0),
809            _ => return None,
810        };
811        total += piece;
812    }
813    if !current.is_empty() {
814        return None;
815    }
816    Some(total)
817}
818
819/// Dispatch a built [`AgentConfig`] through the appropriate `zag-agent`
820/// / `zag-orch` entry point and return the captured result text used for
821/// `saves` and dependency injection.
822///
823/// Every path that drives an agent attaches a live `.on_log_event` hook
824/// via [`install_live_streaming`] so per-turn activity (assistant
825/// messages, tool calls, reasoning) streams to stderr and the zig session
826/// log as it happens — restoring the old `zag run` streaming UX after the
827/// subprocess-drop refactor.
828///
829/// - `run` → [`AgentBuilder::exec`] (or [`AgentBuilder::run`] for `interactive`).
830/// - `review` (non-codex) → prompt built inline, then `AgentBuilder::exec`.
831/// - `review` (codex) → [`zag_agent::review::run_review`] (no live stream; codex has a native review flow).
832/// - `plan` → prompt built via [`zag_agent::plan::build_plan_prompt`], then `AgentBuilder::exec`, result optionally written to a file.
833/// - `pipe` → context built inline via [`zag_orch::collect::extract_last_assistant_message`], then `AgentBuilder::exec`.
834/// - `collect` → [`zag_orch::collect::collect_results`] (serialized as JSON; no agent).
835/// - `summary` → [`zag_orch::summary::summarize_sessions`] (serialized as JSON; no agent).
836async fn dispatch_agent(
837    cfg: &AgentConfig,
838    step_name: &str,
839    session: Option<&Arc<SessionWriter>>,
840    prefix: Option<&str>,
841) -> Result<String, ZigError> {
842    match cfg.command.as_str() {
843        "run" => {
844            if cfg.interactive {
845                // Interactive sessions inherit stdio — the provider TUI
846                // takes over the terminal and renders events directly, so
847                // no log-event hook is wired here.
848                let builder = apply_agent_config(AgentBuilder::new(), cfg);
849                builder.run(Some(&cfg.prompt)).await.map_err(|e| {
850                    ZigError::Zag(format!("agent run failed for step '{step_name}': {e}"))
851                })?;
852                Ok(String::new())
853            } else {
854                let mut builder = apply_agent_config(AgentBuilder::new(), cfg);
855                builder = install_live_streaming(builder, step_name, session, prefix);
856                let output = builder.exec(&cfg.prompt).await.map_err(|e| {
857                    ZigError::Zag(format!("agent exec failed for step '{step_name}': {e}"))
858                })?;
859                Ok(output.result.unwrap_or_default())
860            }
861        }
862        "review" => {
863            let provider = cfg.provider.clone().unwrap_or_else(|| "claude".to_string());
864            let (uncommitted, base, commit, title) = match &cfg.command_params {
865                Some(CommandParams::Review {
866                    uncommitted,
867                    base,
868                    commit,
869                    title,
870                }) => (*uncommitted, base.clone(), commit.clone(), title.clone()),
871                _ => (false, None, None, None),
872            };
873
874            // Codex has a native review flow inside zag-agent that we
875            // don't want to reimplement — fall back to the library call.
876            // No live stream in that branch; same as before this fix.
877            if provider == "codex" {
878                let params = agent_review::ReviewParams {
879                    provider,
880                    uncommitted,
881                    base,
882                    commit,
883                    title,
884                    prompt: if cfg.prompt.is_empty() {
885                        None
886                    } else {
887                        Some(cfg.prompt.clone())
888                    },
889                    system_prompt: cfg.system_prompt.clone(),
890                    model: cfg.model.clone(),
891                    root: cfg.root.clone(),
892                    auto_approve: cfg.auto_approve,
893                    add_dirs: cfg.add_dirs.clone(),
894                    progress: Box::new(zag_agent::progress::SilentProgress),
895                };
896                let output = agent_review::run_review(params).await.map_err(|e| {
897                    ZigError::Zag(format!("review failed for step '{step_name}': {e}"))
898                })?;
899                return Ok(output.and_then(|o| o.result).unwrap_or_default());
900            }
901
902            // Non-codex review: build the diff and prompt in-process, then
903            // drive AgentBuilder directly so we can attach the live hook.
904            let diff = agent_review::gather_diff(
905                uncommitted,
906                base.as_deref(),
907                commit.as_deref(),
908                cfg.root.as_deref(),
909            )
910            .map_err(|e| {
911                ZigError::Zag(format!(
912                    "review gather_diff failed for step '{step_name}': {e}"
913                ))
914            })?;
915            let user_prompt = if cfg.prompt.is_empty() {
916                None
917            } else {
918                Some(cfg.prompt.as_str())
919            };
920            let review_prompt =
921                agent_review::build_review_prompt(&diff, title.as_deref(), user_prompt);
922
923            let mut builder = apply_agent_config(AgentBuilder::new(), cfg);
924            builder = install_live_streaming(builder, step_name, session, prefix);
925            let output = builder.exec(&review_prompt).await.map_err(|e| {
926                ZigError::Zag(format!("review exec failed for step '{step_name}': {e}"))
927            })?;
928            Ok(output.result.unwrap_or_default())
929        }
930        "plan" => {
931            let (plan_output_path, instructions) = match &cfg.command_params {
932                Some(CommandParams::Plan {
933                    output,
934                    instructions,
935                }) => (output.clone(), instructions.clone()),
936                _ => (None, None),
937            };
938
939            let plan_prompt = agent_plan::build_plan_prompt(&cfg.prompt, instructions.as_deref());
940
941            let mut builder = apply_agent_config(AgentBuilder::new(), cfg);
942            builder = install_live_streaming(builder, step_name, session, prefix);
943            let output = builder.exec(&plan_prompt).await.map_err(|e| {
944                ZigError::Zag(format!("plan exec failed for step '{step_name}': {e}"))
945            })?;
946            let text = output.result.unwrap_or_default();
947
948            if let Some(path_str) = plan_output_path {
949                let target = resolve_plan_output_path(&path_str);
950                if let Some(parent) = target.parent()
951                    && !parent.as_os_str().is_empty()
952                {
953                    std::fs::create_dir_all(parent).map_err(|e| {
954                        ZigError::Io(format!(
955                            "failed to create plan output directory {}: {e}",
956                            parent.display()
957                        ))
958                    })?;
959                }
960                std::fs::write(&target, &text).map_err(|e| {
961                    ZigError::Io(format!(
962                        "failed to write plan output to {}: {e}",
963                        target.display()
964                    ))
965                })?;
966                eprintln!("plan written to {}", target.display());
967            }
968
969            Ok(text)
970        }
971        "pipe" => {
972            let session_ids = match &cfg.command_params {
973                Some(CommandParams::Pipe { session_ids }) => session_ids.as_slice(),
974                _ => &[] as &[String],
975            };
976            let context = build_pipe_context(session_ids, cfg.root.as_deref())?;
977            let combined = format!(
978                "Here are results from previous agent sessions:\n\n{context}\n\n{}",
979                cfg.prompt
980            );
981
982            let mut builder = apply_agent_config(AgentBuilder::new(), cfg);
983            builder = install_live_streaming(builder, step_name, session, prefix);
984            let output = builder.exec(&combined).await.map_err(|e| {
985                ZigError::Zag(format!("pipe exec failed for step '{step_name}': {e}"))
986            })?;
987            Ok(output.result.unwrap_or_default())
988        }
989        "collect" => {
990            let session_ids = match &cfg.command_params {
991                Some(CommandParams::Collect { session_ids }) => session_ids.clone(),
992                _ => Vec::new(),
993            };
994            let params = orch_collect::CollectParams {
995                session_ids,
996                tag: None,
997                json: true,
998                root: cfg.root.clone(),
999            };
1000            let results = orch_collect::collect_results(&params).map_err(|e| {
1001                ZigError::Zag(format!("collect failed for step '{step_name}': {e}"))
1002            })?;
1003            let json = serde_json::to_string(&results)
1004                .map_err(|e| ZigError::Execution(format!("collect serialization failed: {e}")))?;
1005            emit_captured(&json, step_name, session, prefix);
1006            Ok(json)
1007        }
1008        "summary" => {
1009            let session_ids = match &cfg.command_params {
1010                Some(CommandParams::Summary { session_ids }) => session_ids.clone(),
1011                _ => Vec::new(),
1012            };
1013            let params = orch_summary::SummaryParams {
1014                session_ids,
1015                tag: None,
1016                stats: false,
1017                json: true,
1018                root: cfg.root.clone(),
1019            };
1020            let results = orch_summary::summarize_sessions(&params).map_err(|e| {
1021                ZigError::Zag(format!("summary failed for step '{step_name}': {e}"))
1022            })?;
1023            let json = serde_json::to_string(&results)
1024                .map_err(|e| ZigError::Execution(format!("summary serialization failed: {e}")))?;
1025            emit_captured(&json, step_name, session, prefix);
1026            Ok(json)
1027        }
1028        other => Err(ZigError::Execution(format!(
1029            "unknown command '{other}' for step '{step_name}'"
1030        ))),
1031    }
1032}
1033
1034/// Attach a live event stream to `builder`. Every [`AgentLogEvent`] that
1035/// reaches the session log fires the closure: rendered via
1036/// [`zag_agent::listen::format_event_text`] and routed to both stderr
1037/// (with optional `[prefix]` tagging for parallel tiers) and the zig
1038/// [`SessionWriter`] as `StepOutput` events — the same surface the old
1039/// `run_zag_streaming` helper produced before we dropped the subprocess.
1040fn install_live_streaming(
1041    builder: AgentBuilder,
1042    step_name: &str,
1043    session: Option<&Arc<SessionWriter>>,
1044    prefix: Option<&str>,
1045) -> AgentBuilder {
1046    let step_name_owned = step_name.to_string();
1047    let prefix_owned = prefix.map(String::from);
1048    let session_owned = session.cloned();
1049    builder.on_log_event(move |evt| {
1050        let Some(text) = zag_agent::listen::format_event_text(evt, false) else {
1051            return;
1052        };
1053        emit_live_line(
1054            &text,
1055            &step_name_owned,
1056            session_owned.as_ref(),
1057            prefix_owned.as_deref(),
1058        );
1059    })
1060}
1061
1062/// Write one or more rendered lines to stderr (with optional prefix) and
1063/// mirror each line to the zig session writer.
1064fn emit_live_line(
1065    text: &str,
1066    step_name: &str,
1067    session: Option<&Arc<SessionWriter>>,
1068    prefix: Option<&str>,
1069) {
1070    use std::io::Write;
1071    if text.is_empty() {
1072        return;
1073    }
1074    let stderr = std::io::stderr();
1075    for line in text.lines() {
1076        if let Some(w) = session {
1077            let _ = w.step_output(step_name, OutputStream::Stdout, line);
1078        }
1079        let mut h = stderr.lock();
1080        let _ = match prefix {
1081            Some(p) => writeln!(h, "[{p}] {line}"),
1082            None => writeln!(h, "{line}"),
1083        };
1084    }
1085}
1086
1087/// Emit captured non-agent output (collect / summary JSON) line-by-line
1088/// to stderr and the session writer. Kept for the two paths that don't
1089/// run an agent and therefore can't use [`install_live_streaming`].
1090fn emit_captured(
1091    text: &str,
1092    step_name: &str,
1093    session: Option<&Arc<SessionWriter>>,
1094    prefix: Option<&str>,
1095) {
1096    emit_live_line(text, step_name, session, prefix);
1097}
1098
1099/// Build the `<session-result>` context block from upstream session IDs.
1100///
1101/// Mirrors `zag_orch::pipe::build_context` (`zag-orch/src/pipe.rs:86-118`)
1102/// byte-for-byte so the combined prompt zig feeds the agent matches what
1103/// the `zag pipe` CLI would have produced.
1104fn build_pipe_context(session_ids: &[String], root: Option<&str>) -> Result<String, ZigError> {
1105    let mut parts = Vec::new();
1106    for (i, id) in session_ids.iter().enumerate() {
1107        let Some(text) = orch_collect::extract_last_assistant_message(id, root) else {
1108            eprintln!("warning: no result found for upstream session {id}");
1109            continue;
1110        };
1111        let short = &id[..id.len().min(8)];
1112        let block = if session_ids.len() == 1 {
1113            format!("<session-result session=\"{short}\">\n{text}\n</session-result>")
1114        } else {
1115            format!(
1116                "<session-result index=\"{}\" session=\"{short}\">\n{text}\n</session-result>",
1117                i + 1
1118            )
1119        };
1120        parts.push(block);
1121    }
1122
1123    if parts.is_empty() {
1124        return Err(ZigError::Execution(
1125            "pipe: no results available from the specified sessions".into(),
1126        ));
1127    }
1128    Ok(parts.join("\n\n"))
1129}
1130
1131/// Resolve a `plan_output` path. If the caller specified a bare directory
1132/// name (no extension), append a timestamped `plan-YYYYMMDD-HHMMSS.md`
1133/// inside it — matching the behavior documented on
1134/// [`zag_agent::plan::PlanParams::output`].
1135fn resolve_plan_output_path(path_str: &str) -> std::path::PathBuf {
1136    let expanded = expand_path(path_str);
1137    let path = std::path::PathBuf::from(&expanded);
1138    if path.extension().is_some() {
1139        return path;
1140    }
1141    let stamp = chrono::Utc::now().format("%Y%m%d-%H%M%S");
1142    path.join(format!("plan-{stamp}.md"))
1143}
1144
1145/// Execute a single step through the agent-builder dispatch. Returns the
1146/// captured result text used for `saves` and dependency injection. The
1147/// optional `model_override` is used during retries to escalate to a
1148/// different model.
1149#[allow(clippy::too_many_arguments)]
1150async fn execute_step(
1151    step: &Step,
1152    prompt: &str,
1153    workflow_name: &str,
1154    model_override: Option<&str>,
1155    prefix: Option<&str>,
1156    session: Option<&Arc<SessionWriter>>,
1157    rendered_system_prompt: Option<&str>,
1158    workflow_provider: Option<&str>,
1159    workflow_model: Option<&str>,
1160    extra_add_dirs: &[std::path::PathBuf],
1161) -> Result<String, ZigError> {
1162    let cfg = build_agent_config(
1163        step,
1164        prompt,
1165        workflow_name,
1166        model_override,
1167        rendered_system_prompt,
1168        workflow_provider,
1169        workflow_model,
1170        extra_add_dirs,
1171    );
1172    dispatch_agent(&cfg, &step.name, session, prefix).await
1173}
1174
1175/// Run a step with retry logic, returning its captured output on success.
1176///
1177/// Extracted so both sequential and parallel execution paths share the
1178/// same retry / model-escalation behavior.
1179#[allow(clippy::too_many_arguments)]
1180async fn run_step_attempts(
1181    step: &Step,
1182    prompt: &str,
1183    workflow_name: &str,
1184    prefix: Option<&str>,
1185    session: Option<&Arc<SessionWriter>>,
1186    rendered_system_prompt: Option<&str>,
1187    workflow_provider: Option<&str>,
1188    workflow_model: Option<&str>,
1189    extra_add_dirs: &[std::path::PathBuf],
1190) -> Result<String, ZigError> {
1191    let mut attempts = 0;
1192    let max_attempts = if step.on_failure.as_ref() == Some(&FailurePolicy::Retry) {
1193        step.max_retries.unwrap_or(1) + 1
1194    } else {
1195        1
1196    };
1197
1198    loop {
1199        attempts += 1;
1200        let model_override = if attempts > 1 {
1201            step.retry_model.as_deref()
1202        } else {
1203            None
1204        };
1205        match execute_step(
1206            step,
1207            prompt,
1208            workflow_name,
1209            model_override,
1210            prefix,
1211            session,
1212            rendered_system_prompt,
1213            workflow_provider,
1214            workflow_model,
1215            extra_add_dirs,
1216        )
1217        .await
1218        {
1219            Ok(output) => return Ok(output),
1220            Err(e) => {
1221                if let Some(w) = session {
1222                    let _ = w.step_failed(&step.name, None, attempts, &e.to_string());
1223                }
1224                if attempts < max_attempts {
1225                    eprintln!(
1226                        "    retry {}/{} for step '{}'",
1227                        attempts,
1228                        max_attempts - 1,
1229                        step.name
1230                    );
1231                    continue;
1232                }
1233                return Err(e);
1234            }
1235        }
1236    }
1237}
1238
1239/// Extract variable values from step output using `saves` selectors.
1240///
1241/// Selectors:
1242/// - `"$"` — the entire output
1243/// - `"$.field"` — a top-level JSON field
1244/// - `"$.nested.field"` — a nested JSON field
1245fn extract_saves(
1246    output: &str,
1247    saves: &HashMap<String, String>,
1248) -> Result<HashMap<String, String>, ZigError> {
1249    let mut extracted = HashMap::new();
1250
1251    for (var_name, selector) in saves {
1252        let value = if selector == "$" {
1253            output.trim().to_string()
1254        } else if let Some(path) = selector.strip_prefix("$.") {
1255            let json: serde_json::Value = serde_json::from_str(output.trim()).map_err(|e| {
1256                ZigError::Execution(format!(
1257                    "saves selector '{selector}' requires JSON output, but got parse error: {e}"
1258                ))
1259            })?;
1260            json_path_lookup(&json, path)
1261        } else {
1262            output.trim().to_string()
1263        };
1264
1265        extracted.insert(var_name.clone(), value);
1266    }
1267
1268    Ok(extracted)
1269}
1270
1271/// Partition a tier of steps into sequential steps and race groups.
1272///
1273/// Steps without a `race_group` are returned as sequential. Steps sharing
1274/// the same `race_group` value are grouped together for parallel execution.
1275fn partition_tier<'a>(tier: &[&'a Step]) -> (Vec<&'a Step>, HashMap<String, Vec<&'a Step>>) {
1276    let mut sequential = Vec::new();
1277    let mut race_groups: HashMap<String, Vec<&'a Step>> = HashMap::new();
1278
1279    for step in tier {
1280        if let Some(group) = &step.race_group {
1281            race_groups.entry(group.clone()).or_default().push(step);
1282        } else {
1283            sequential.push(*step);
1284        }
1285    }
1286
1287    (sequential, race_groups)
1288}
1289
1290/// Execute a race group: run all steps concurrently via a [`JoinSet`] and
1291/// return the first winner. Once one step succeeds, the remaining tasks
1292/// are aborted (the underlying `tokio::process::Child` is dropped, which
1293/// kills the provider subprocess if `kill_on_drop` is set — zag-agent
1294/// sets this on its internal commands).
1295#[allow(clippy::too_many_arguments)]
1296async fn execute_race_group(
1297    steps: &[&Step],
1298    prompts: &HashMap<String, String>,
1299    system_prompts: &HashMap<String, String>,
1300    workflow_name: &str,
1301    tier_index: usize,
1302    session: Option<&Arc<SessionWriter>>,
1303    workflow_provider: Option<&str>,
1304    workflow_model: Option<&str>,
1305    storage_dirs: &HashMap<String, Vec<std::path::PathBuf>>,
1306) -> Result<(String, String), ZigError> {
1307    if let Some(w) = session {
1308        for step in steps {
1309            let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
1310            let preview = prompts
1311                .get(&step.name)
1312                .map(|p| prompt_preview(p))
1313                .unwrap_or_default();
1314            let _ = w.step_started(
1315                &step.name,
1316                tier_index,
1317                &zag_session_id,
1318                zag_command_name(&step.command),
1319                step.model.as_deref(),
1320                &preview,
1321            );
1322        }
1323    }
1324
1325    let race_started = Instant::now();
1326    let mut set: JoinSet<(String, Result<String, ZigError>)> = JoinSet::new();
1327
1328    for step in steps {
1329        let prompt = prompts
1330            .get(&step.name)
1331            .ok_or_else(|| ZigError::Execution(format!("missing prompt for step '{}'", step.name)))?
1332            .clone();
1333        eprintln!("  racing step '{}'...", step.name);
1334        let rendered_sp = system_prompts.get(&step.name).cloned();
1335        let empty: Vec<std::path::PathBuf> = Vec::new();
1336        let extra_dirs = storage_dirs.get(&step.name).unwrap_or(&empty).clone();
1337        let step_clone: Step = (*step).clone();
1338        let wf_name = workflow_name.to_string();
1339        let wf_provider = workflow_provider.map(String::from);
1340        let wf_model = workflow_model.map(String::from);
1341        let session_clone = session.cloned();
1342        let name = step.name.clone();
1343        set.spawn(async move {
1344            let res = execute_step(
1345                &step_clone,
1346                &prompt,
1347                &wf_name,
1348                None,
1349                None,
1350                session_clone.as_ref(),
1351                rendered_sp.as_deref(),
1352                wf_provider.as_deref(),
1353                wf_model.as_deref(),
1354                &extra_dirs,
1355            )
1356            .await;
1357            (name, res)
1358        });
1359    }
1360
1361    // Wait for the first winner — drop (abort) the rest.
1362    while let Some(joined) = set.join_next().await {
1363        let (winner_name, result) = match joined {
1364            Ok(pair) => pair,
1365            Err(e) if e.is_cancelled() => continue,
1366            Err(e) => return Err(ZigError::Execution(format!("race task panicked: {e}"))),
1367        };
1368        match result {
1369            Ok(stdout) => {
1370                // Abort losers.
1371                set.abort_all();
1372                while let Some(r) = set.join_next().await {
1373                    if let Ok((name, _)) = r {
1374                        eprintln!("  cancelling step '{name}' (race lost)");
1375                    }
1376                }
1377                let elapsed = race_started.elapsed().as_millis() as u64;
1378                eprintln!("  race won by '{winner_name}'");
1379                if let Some(w) = session {
1380                    let _ = w.step_completed(&winner_name, 0, elapsed, Vec::new());
1381                }
1382                return Ok((winner_name, stdout));
1383            }
1384            Err(e) => {
1385                if let Some(w) = session {
1386                    let _ = w.step_failed(&winner_name, None, 1, &e.to_string());
1387                }
1388                // Keep racing remaining tasks; this one lost.
1389                continue;
1390            }
1391        }
1392    }
1393
1394    Err(ZigError::Execution(
1395        "all racers failed without a winner".into(),
1396    ))
1397}
1398
1399/// Execute a single sequential step with retry logic, saves, and next-jump handling.
1400#[allow(clippy::too_many_arguments)]
1401async fn execute_sequential_step(
1402    step: &Step,
1403    vars: &mut HashMap<String, String>,
1404    user_prompt: Option<&str>,
1405    step_outputs: &mut HashMap<String, String>,
1406    workflow_name: &str,
1407    pending_next: &mut Option<String>,
1408    tier_index: usize,
1409    session: Option<&Arc<SessionWriter>>,
1410    roles: &HashMap<String, Role>,
1411    resources: &ResourceCollector<'_>,
1412    memory: &MemoryCollector,
1413    storage: &StorageManager,
1414    workflow_dir: &Path,
1415    workflow_provider: Option<&str>,
1416    workflow_model: Option<&str>,
1417) -> Result<(), ZigError> {
1418    if let Some(condition) = &step.condition {
1419        if !evaluate_condition(condition, vars)? {
1420            eprintln!(
1421                "  skipping '{}' (condition not met: {condition})",
1422                step.name
1423            );
1424            if let Some(w) = session {
1425                let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
1426            }
1427            return Ok(());
1428        }
1429    }
1430
1431    eprintln!("  running step '{}'...", step.name);
1432
1433    let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
1434    let rendered_sp = resolve_role_system_prompt(
1435        step,
1436        roles,
1437        resources,
1438        memory,
1439        storage,
1440        vars,
1441        workflow_dir,
1442        workflow_name,
1443    )?;
1444    let storage_dirs = storage.add_dirs_for_step(step.storage.as_deref());
1445    if let Some(w) = session {
1446        let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
1447        let _ = w.step_started(
1448            &step.name,
1449            tier_index,
1450            &zag_session_id,
1451            zag_command_name(&step.command),
1452            step.model.as_deref(),
1453            &prompt_preview(&prompt),
1454        );
1455    }
1456    let started = Instant::now();
1457    let result = run_step_attempts(
1458        step,
1459        &prompt,
1460        workflow_name,
1461        None,
1462        session,
1463        rendered_sp.as_deref(),
1464        workflow_provider,
1465        workflow_model,
1466        &storage_dirs,
1467    )
1468    .await;
1469
1470    match result {
1471        Ok(output) => {
1472            let mut saved_keys: Vec<String> = Vec::new();
1473            if !step.saves.is_empty() {
1474                let saved = extract_saves(&output, &step.saves)?;
1475                for (k, v) in &saved {
1476                    eprintln!("    saved {k} = {v}");
1477                    saved_keys.push(k.clone());
1478                }
1479                vars.extend(saved);
1480            }
1481
1482            step_outputs.insert(step.name.clone(), output);
1483            eprintln!("  completed '{}'", step.name);
1484            if let Some(w) = session {
1485                let _ = w.step_completed(
1486                    &step.name,
1487                    0,
1488                    started.elapsed().as_millis() as u64,
1489                    saved_keys,
1490                );
1491            }
1492
1493            if step.next.is_some() {
1494                *pending_next = step.next.clone();
1495            }
1496        }
1497        Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1498            FailurePolicy::Fail => return Err(e),
1499            FailurePolicy::Continue => {
1500                eprintln!("  step '{}' failed (continuing): {e}", step.name);
1501            }
1502            FailurePolicy::Retry => {
1503                return Err(e);
1504            }
1505        },
1506    }
1507
1508    Ok(())
1509}
1510
1511/// Run multiple independent steps in a tier concurrently.
1512///
1513/// All non-skipped steps are spawned as tokio tasks via [`JoinSet`] and
1514/// we wait for every one to finish (unlike race groups, which abort
1515/// losers). Captured output lines are written back to stderr after each
1516/// task completes, prefixed with the step name to disambiguate. Results
1517/// are processed in tier-declaration order so `saves`, `next`, and
1518/// `on_failure` semantics remain deterministic.
1519#[allow(clippy::too_many_arguments)]
1520async fn execute_parallel_tier(
1521    steps: &[&Step],
1522    vars: &mut HashMap<String, String>,
1523    user_prompt: Option<&str>,
1524    step_outputs: &mut HashMap<String, String>,
1525    workflow_name: &str,
1526    pending_next: &mut Option<String>,
1527    tier_index: usize,
1528    session: Option<&Arc<SessionWriter>>,
1529    roles: &HashMap<String, Role>,
1530    resources: &ResourceCollector<'_>,
1531    memory: &MemoryCollector,
1532    storage: &StorageManager,
1533    workflow_dir: &Path,
1534    workflow_provider: Option<&str>,
1535    workflow_model: Option<&str>,
1536) -> Result<(), ZigError> {
1537    // Evaluate conditions and render prompts up front, so threads receive
1538    // the same variable snapshot they would have under sequential execution.
1539    let mut active: Vec<&Step> = Vec::new();
1540    let mut prompts: HashMap<String, String> = HashMap::new();
1541    let mut rendered_sps: HashMap<String, String> = HashMap::new();
1542    let mut storage_dirs_map: HashMap<String, Vec<std::path::PathBuf>> = HashMap::new();
1543    for step in steps {
1544        if let Some(condition) = &step.condition {
1545            if !evaluate_condition(condition, vars)? {
1546                eprintln!(
1547                    "  skipping '{}' (condition not met: {condition})",
1548                    step.name
1549                );
1550                if let Some(w) = session {
1551                    let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
1552                }
1553                continue;
1554            }
1555        }
1556        let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
1557        prompts.insert(step.name.clone(), prompt);
1558        if let Some(sp) = resolve_role_system_prompt(
1559            step,
1560            roles,
1561            resources,
1562            memory,
1563            storage,
1564            vars,
1565            workflow_dir,
1566            workflow_name,
1567        )? {
1568            rendered_sps.insert(step.name.clone(), sp);
1569        }
1570        storage_dirs_map.insert(
1571            step.name.clone(),
1572            storage.add_dirs_for_step(step.storage.as_deref()),
1573        );
1574        active.push(*step);
1575    }
1576
1577    if active.is_empty() {
1578        return Ok(());
1579    }
1580
1581    eprintln!("  running {} steps in parallel...", active.len());
1582
1583    let mut start_times: HashMap<String, Instant> = HashMap::new();
1584    let mut set: JoinSet<(String, Result<String, ZigError>)> = JoinSet::new();
1585    for step in &active {
1586        let step_clone: Step = (*step).clone();
1587        let prompt = prompts.remove(&step.name).unwrap_or_default();
1588        let rendered_sp = rendered_sps.remove(&step.name);
1589        let workflow_name_owned = workflow_name.to_string();
1590        let name = step.name.clone();
1591        eprintln!("  starting '{name}'...");
1592        if let Some(w) = session {
1593            let zag_session_id = format!("zig-{workflow_name}-{name}");
1594            let _ = w.step_started(
1595                &name,
1596                tier_index,
1597                &zag_session_id,
1598                zag_command_name(&step.command),
1599                step.model.as_deref(),
1600                &prompt_preview(&prompt),
1601            );
1602        }
1603        start_times.insert(name.clone(), Instant::now());
1604        let session_clone = session.cloned();
1605        let wf_provider = workflow_provider.map(String::from);
1606        let wf_model = workflow_model.map(String::from);
1607        let storage_dirs = storage_dirs_map.remove(&step.name).unwrap_or_default();
1608        set.spawn(async move {
1609            let res = run_step_attempts(
1610                &step_clone,
1611                &prompt,
1612                &workflow_name_owned,
1613                Some(&name),
1614                session_clone.as_ref(),
1615                rendered_sp.as_deref(),
1616                wf_provider.as_deref(),
1617                wf_model.as_deref(),
1618                &storage_dirs,
1619            )
1620            .await;
1621            (name, res)
1622        });
1623    }
1624
1625    let mut results: HashMap<String, Result<String, ZigError>> = HashMap::new();
1626    while let Some(joined) = set.join_next().await {
1627        match joined {
1628            Ok((name, res)) => {
1629                results.insert(name, res);
1630            }
1631            Err(e) => {
1632                return Err(ZigError::Execution(format!(
1633                    "parallel step task panicked: {e}"
1634                )));
1635            }
1636        }
1637    }
1638
1639    // Process results in tier-declaration order so `next` is deterministic.
1640    let mut errors: Vec<String> = Vec::new();
1641    for step in &active {
1642        let Some(res) = results.remove(&step.name) else {
1643            continue;
1644        };
1645        let elapsed = start_times
1646            .remove(&step.name)
1647            .map(|t| t.elapsed().as_millis() as u64)
1648            .unwrap_or(0);
1649        match res {
1650            Ok(output) => {
1651                let mut saved_keys: Vec<String> = Vec::new();
1652                if !step.saves.is_empty() {
1653                    let saved = extract_saves(&output, &step.saves)?;
1654                    for (k, v) in &saved {
1655                        eprintln!("    saved {k} = {v}");
1656                        saved_keys.push(k.clone());
1657                    }
1658                    vars.extend(saved);
1659                }
1660                step_outputs.insert(step.name.clone(), output);
1661                eprintln!("  completed '{}'", step.name);
1662                if let Some(w) = session {
1663                    let _ = w.step_completed(&step.name, 0, elapsed, saved_keys);
1664                }
1665                if step.next.is_some() && pending_next.is_none() {
1666                    *pending_next = step.next.clone();
1667                }
1668            }
1669            Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1670                FailurePolicy::Continue => {
1671                    eprintln!("  step '{}' failed (continuing): {e}", step.name);
1672                }
1673                FailurePolicy::Fail | FailurePolicy::Retry => {
1674                    errors.push(format!("'{}': {e}", step.name));
1675                }
1676            },
1677        }
1678    }
1679
1680    if !errors.is_empty() {
1681        return Err(ZigError::Execution(format!(
1682            "parallel step(s) failed: {}",
1683            errors.join("; ")
1684        )));
1685    }
1686
1687    Ok(())
1688}
1689
1690/// Initialize the variable map from workflow variable definitions.
1691/// Variables with defaults are set to their default value; others are empty.
1692fn init_vars(workflow: &Workflow) -> HashMap<String, String> {
1693    let mut vars = HashMap::new();
1694    for (name, var) in &workflow.vars {
1695        let value = match &var.default {
1696            Some(toml::Value::String(s)) => s.clone(),
1697            Some(toml::Value::Integer(n)) => n.to_string(),
1698            Some(toml::Value::Float(f)) => f.to_string(),
1699            Some(toml::Value::Boolean(b)) => b.to_string(),
1700            Some(other) => other.to_string(),
1701            None => String::new(),
1702        };
1703        vars.insert(name.clone(), value);
1704    }
1705    vars
1706}
1707
1708/// Main execution loop for a validated workflow.
1709#[allow(clippy::too_many_arguments)]
1710async fn execute(
1711    workflow: &Workflow,
1712    workflow_path: &std::path::Path,
1713    user_prompt: Option<&str>,
1714    workflow_dir: &Path,
1715    disable_resources: bool,
1716    disable_memory: bool,
1717    disable_storage: bool,
1718    dry_run: bool,
1719    dry_run_format: DryRunFormat,
1720) -> Result<(), ZigError> {
1721    let mut vars = init_vars(workflow);
1722
1723    let resource_collector = ResourceCollector::from_env(
1724        &workflow.workflow.name,
1725        &workflow.workflow.resources,
1726        workflow_dir,
1727        disable_resources,
1728    );
1729
1730    let config = ZigConfig::load();
1731    let workflow_memory_mode = MemoryMode::from_str_opt(workflow.workflow.memory.as_deref());
1732    let memory_collector = MemoryCollector::from_env(
1733        &workflow.workflow.name,
1734        workflow_memory_mode,
1735        &config,
1736        disable_memory,
1737    );
1738
1739    // Build storage manager for this run. Paths resolve against <cwd>/.zig/;
1740    // absolute paths pass through. `ensure` is called on every declared item
1741    // so step agents can trust the paths exist before they run.
1742    // When `--no-storage` is passed, skip building entirely so storage dirs
1743    // are not created and the `<storage>` block is omitted from prompts.
1744    // When `--dry-run` is passed, build the manager without `ensure` so the
1745    // block still renders with correct paths but nothing touches disk.
1746    let storage_manager = if disable_storage || workflow.storage.is_empty() {
1747        StorageManager::empty()
1748    } else if dry_run {
1749        let backend = FilesystemBackend::from_cwd()?;
1750        StorageManager::build_dry(&workflow.storage, backend)
1751    } else {
1752        let backend = FilesystemBackend::from_cwd()?;
1753        StorageManager::build(&workflow.storage, backend)?
1754    };
1755
1756    // Load file-backed variable defaults before prompt binding.
1757    load_file_defaults(&mut vars, &workflow.vars, workflow_dir)?;
1758
1759    // Bind user prompt to the variable with `from = "prompt"`, if any.
1760    let prompt_var = workflow
1761        .vars
1762        .iter()
1763        .find(|(_, v)| v.from.as_deref() == Some("prompt"))
1764        .map(|(name, _)| name.clone());
1765
1766    if let Some(ref var_name) = prompt_var {
1767        if let Some(prompt) = user_prompt {
1768            vars.insert(var_name.clone(), prompt.to_string());
1769        }
1770    }
1771
1772    // Validate variable values against constraints before executing.
1773    if let Err(errors) = validate::validate_var_values(&vars, &workflow.vars) {
1774        let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
1775        return Err(ZigError::Validation(msgs.join("; ")));
1776    }
1777
1778    // When prompt is bound to a variable, don't also prepend "User context:".
1779    let effective_user_prompt = if prompt_var.is_some() {
1780        None
1781    } else {
1782        user_prompt
1783    };
1784
1785    let mut step_outputs: HashMap<String, String> = HashMap::new();
1786
1787    let wf_provider = workflow.workflow.provider.as_deref();
1788    let wf_model = workflow.workflow.model.as_deref();
1789
1790    let tiers = topological_sort(&workflow.steps)?;
1791
1792    if dry_run {
1793        let ctx = DryRunContext {
1794            workflow,
1795            workflow_path,
1796            workflow_dir,
1797            vars: &vars,
1798            user_prompt: effective_user_prompt,
1799            roles: &workflow.roles,
1800            resources: &resource_collector,
1801            memory: &memory_collector,
1802            storage: &storage_manager,
1803            wf_provider,
1804            wf_model,
1805            disable_resources,
1806            disable_memory,
1807            disable_storage,
1808        };
1809        return crate::dry_run::print_plan(&ctx, &tiers, dry_run_format);
1810    }
1811
1812    eprintln!(
1813        "running workflow '{}' ({} steps in {} tiers)",
1814        workflow.workflow.name,
1815        workflow.steps.len(),
1816        tiers.len()
1817    );
1818
1819    // Open a zig session log for this run. Failure to open the log is
1820    // non-fatal — `zig run` should still execute even if `~/.zig` is
1821    // unwritable. The session writer is `Option<Arc<...>>` everywhere
1822    // downstream so the writer-less path stays intact for tests.
1823    let coordinator = match SessionWriter::create(
1824        &workflow.workflow.name,
1825        &workflow_path.to_string_lossy(),
1826        user_prompt,
1827        tiers.len(),
1828    ) {
1829        Ok(writer) => {
1830            eprintln!("zig session: {}", writer.session_id());
1831            Some(SessionCoordinator::start(writer))
1832        }
1833        Err(e) => {
1834            eprintln!("warning: failed to open zig session log: {e}");
1835            None
1836        }
1837    };
1838    let session_writer: Option<Arc<SessionWriter>> = coordinator.as_ref().map(|c| c.writer());
1839    let session_ref = session_writer.as_ref();
1840
1841    let mut iteration = 0;
1842    let mut pending_next: Option<String> = None;
1843
1844    loop {
1845        let tiers_to_run = if let Some(ref next_step) = pending_next {
1846            // Re-run from the target step onward
1847            let remaining: Vec<Vec<&Step>> = tiers
1848                .iter()
1849                .map(|tier| {
1850                    tier.iter()
1851                        .filter(|s| s.name == *next_step)
1852                        .copied()
1853                        .collect::<Vec<_>>()
1854                })
1855                .filter(|tier| !tier.is_empty())
1856                .collect();
1857            pending_next = None;
1858            remaining
1859        } else if iteration == 0 {
1860            tiers.clone()
1861        } else {
1862            break;
1863        };
1864
1865        for (tier_index, tier) in tiers_to_run.iter().enumerate() {
1866            let (non_race, race_groups) = partition_tier(tier);
1867
1868            if let Some(w) = session_ref {
1869                let names: Vec<String> = tier.iter().map(|s| s.name.clone()).collect();
1870                let _ = w.tier_started(tier_index, names);
1871            }
1872
1873            // Independent steps in the same tier run concurrently. A single
1874            // step takes the sequential path so its output streams without
1875            // a name prefix; multiple steps go through the parallel path.
1876            if non_race.len() <= 1 {
1877                for step in &non_race {
1878                    execute_sequential_step(
1879                        step,
1880                        &mut vars,
1881                        effective_user_prompt,
1882                        &mut step_outputs,
1883                        &workflow.workflow.name,
1884                        &mut pending_next,
1885                        tier_index,
1886                        session_ref,
1887                        &workflow.roles,
1888                        &resource_collector,
1889                        &memory_collector,
1890                        &storage_manager,
1891                        workflow_dir,
1892                        wf_provider,
1893                        wf_model,
1894                    )
1895                    .await?;
1896                }
1897            } else {
1898                execute_parallel_tier(
1899                    &non_race,
1900                    &mut vars,
1901                    effective_user_prompt,
1902                    &mut step_outputs,
1903                    &workflow.workflow.name,
1904                    &mut pending_next,
1905                    tier_index,
1906                    session_ref,
1907                    &workflow.roles,
1908                    &resource_collector,
1909                    &memory_collector,
1910                    &storage_manager,
1911                    workflow_dir,
1912                    wf_provider,
1913                    wf_model,
1914                )
1915                .await?;
1916            }
1917
1918            // Run race groups in parallel
1919            for (group_name, race_steps) in &race_groups {
1920                eprintln!("  starting race group '{group_name}'...");
1921
1922                // Build prompts for all racers (conditions evaluated here)
1923                let mut prompts = HashMap::new();
1924                let mut race_sps: HashMap<String, String> = HashMap::new();
1925                let mut race_storage_dirs: HashMap<String, Vec<std::path::PathBuf>> =
1926                    HashMap::new();
1927                let mut active_steps: Vec<&Step> = Vec::new();
1928                for step in race_steps {
1929                    if let Some(condition) = &step.condition {
1930                        if !evaluate_condition(condition, &vars)? {
1931                            eprintln!(
1932                                "  skipping '{}' (condition not met: {condition})",
1933                                step.name
1934                            );
1935                            continue;
1936                        }
1937                    }
1938                    let prompt =
1939                        render_step_prompt(step, &vars, effective_user_prompt, &step_outputs);
1940                    prompts.insert(step.name.clone(), prompt);
1941                    if let Some(sp) = resolve_role_system_prompt(
1942                        step,
1943                        &workflow.roles,
1944                        &resource_collector,
1945                        &memory_collector,
1946                        &storage_manager,
1947                        &vars,
1948                        workflow_dir,
1949                        &workflow.workflow.name,
1950                    )? {
1951                        race_sps.insert(step.name.clone(), sp);
1952                    }
1953                    race_storage_dirs.insert(
1954                        step.name.clone(),
1955                        storage_manager.add_dirs_for_step(step.storage.as_deref()),
1956                    );
1957                    active_steps.push(step);
1958                }
1959
1960                if active_steps.is_empty() {
1961                    continue;
1962                }
1963
1964                match execute_race_group(
1965                    &active_steps,
1966                    &prompts,
1967                    &race_sps,
1968                    &workflow.workflow.name,
1969                    tier_index,
1970                    session_ref,
1971                    wf_provider,
1972                    wf_model,
1973                    &race_storage_dirs,
1974                )
1975                .await
1976                {
1977                    Ok((winner_name, output)) => {
1978                        // Find the winning step to process saves/next
1979                        if let Some(winner) = active_steps.iter().find(|s| s.name == winner_name) {
1980                            if !winner.saves.is_empty() {
1981                                let saved = extract_saves(&output, &winner.saves)?;
1982                                for (k, v) in &saved {
1983                                    eprintln!("    saved {k} = {v}");
1984                                }
1985                                vars.extend(saved);
1986                            }
1987                            if winner.next.is_some() {
1988                                pending_next = winner.next.clone();
1989                            }
1990                        }
1991                        step_outputs.insert(winner_name.clone(), output);
1992                        eprintln!(
1993                            "  completed race group '{group_name}' (winner: '{winner_name}')"
1994                        );
1995                    }
1996                    Err(e) => return Err(e),
1997                }
1998            }
1999        }
2000
2001        iteration += 1;
2002        if pending_next.is_none() || iteration >= MAX_LOOP_ITERATIONS {
2003            if iteration >= MAX_LOOP_ITERATIONS {
2004                eprintln!("warning: reached maximum loop iterations ({MAX_LOOP_ITERATIONS})");
2005            }
2006            break;
2007        }
2008    }
2009
2010    eprintln!("workflow '{}' completed", workflow.workflow.name);
2011    if let Some(c) = coordinator {
2012        let _ = c.finish(SessionStatus::Success);
2013    }
2014    Ok(())
2015}
2016
2017/// Short label for the zag subcommand a step will invoke. Used in
2018/// `step_started` events so listeners can distinguish run/review/plan/etc.
2019fn zag_command_name(cmd: &Option<StepCommand>) -> &'static str {
2020    match cmd {
2021        None => "run",
2022        Some(StepCommand::Review) => "review",
2023        Some(StepCommand::Plan) => "plan",
2024        Some(StepCommand::Pipe) => "pipe",
2025        Some(StepCommand::Collect) => "collect",
2026        Some(StepCommand::Summary) => "summary",
2027    }
2028}
2029
2030/// Truncated single-line preview of a rendered prompt for the session log.
2031fn prompt_preview(prompt: &str) -> String {
2032    const MAX: usize = 200;
2033    let collapsed: String = prompt
2034        .chars()
2035        .map(|c| if c == '\n' { ' ' } else { c })
2036        .collect();
2037    if collapsed.chars().count() <= MAX {
2038        collapsed
2039    } else {
2040        let truncated: String = collapsed.chars().take(MAX).collect();
2041        format!("{truncated}…")
2042    }
2043}
2044
2045#[cfg(test)]
2046#[path = "run_tests.rs"]
2047mod tests;