Skip to main content

zig_core/
run.rs

1use std::collections::HashMap;
2use std::io::{BufRead, BufReader, Write};
3use std::path::{Path, PathBuf};
4use std::process::Command;
5use std::sync::{Arc, Mutex};
6use std::thread;
7use std::time::Duration;
8
9use std::time::Instant;
10
11use crate::error::ZigError;
12use crate::session::{OutputStream, SessionCoordinator, SessionStatus, SessionWriter};
13use crate::workflow::model::{FailurePolicy, Role, Step, StepCommand, Workflow};
14use crate::workflow::{parser, validate};
15
16/// Maximum number of loop iterations to prevent infinite loops from `next` fields.
17const MAX_LOOP_ITERATIONS: usize = 100;
18
19/// Execute a `.zug` workflow file.
20///
21/// Parses the workflow, validates it, resolves the step DAG, and executes
22/// each step by delegating to `zag`. The optional `user_prompt` is injected
23/// as additional context into every step's prompt.
24pub fn run_workflow(workflow_path: &str, user_prompt: Option<&str>) -> Result<(), ZigError> {
25    check_zag()?;
26
27    let path = resolve_workflow_path(workflow_path)?;
28    let (workflow, source) = parser::parse_workflow(&path)?;
29
30    if let Err(errors) = validate::validate(&workflow) {
31        let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
32        return Err(ZigError::Validation(msgs.join("; ")));
33    }
34
35    execute(&workflow, &path, user_prompt, source.dir())
36}
37
38/// Verify that `zag` is installed and available on PATH.
39pub(crate) fn check_zag() -> Result<(), ZigError> {
40    let zag_available = Command::new("zag")
41        .arg("--version")
42        .output()
43        .is_ok_and(|o| o.status.success());
44
45    if !zag_available {
46        return Err(ZigError::Zag(
47            "zag is not installed or not in PATH. Install it from https://github.com/niclaslindstedt/zag".into(),
48        ));
49    }
50    Ok(())
51}
52
53/// Resolve a workflow argument to an actual file path.
54///
55/// Tries in order:
56/// 1. Literal path as given
57/// 2. With `.zug` extension appended
58/// 3. Under `./workflows/` directory
59/// 4. Under `./workflows/` with `.zug` appended
60pub fn resolve_workflow_path(workflow: &str) -> Result<PathBuf, ZigError> {
61    let mut candidates = vec![
62        PathBuf::from(workflow),
63        PathBuf::from(format!("{workflow}.zug")),
64        PathBuf::from(format!("workflows/{workflow}")),
65        PathBuf::from(format!("workflows/{workflow}.zug")),
66    ];
67
68    if let Some(global_dir) = crate::paths::global_workflows_dir() {
69        candidates.push(global_dir.join(workflow));
70        candidates.push(global_dir.join(format!("{workflow}.zug")));
71    }
72
73    for candidate in &candidates {
74        if candidate.exists() {
75            return Ok(candidate.clone());
76        }
77    }
78
79    Err(ZigError::Io(format!(
80        "workflow not found: '{workflow}' (tried: {})",
81        candidates
82            .iter()
83            .map(|p| p.display().to_string())
84            .collect::<Vec<_>>()
85            .join(", ")
86    )))
87}
88
89/// Compute a topological ordering of steps grouped into tiers.
90///
91/// Each tier contains steps whose dependencies are all in earlier tiers,
92/// meaning steps within a tier can (in principle) run in parallel.
93/// Uses Kahn's algorithm.
94fn topological_sort(steps: &[Step]) -> Result<Vec<Vec<&Step>>, ZigError> {
95    let step_index: HashMap<&str, usize> = steps
96        .iter()
97        .enumerate()
98        .map(|(i, s)| (s.name.as_str(), i))
99        .collect();
100
101    let mut in_degree = vec![0usize; steps.len()];
102    for (i, step) in steps.iter().enumerate() {
103        for dep in &step.depends_on {
104            if step_index.contains_key(dep.as_str()) {
105                in_degree[i] += 1;
106            }
107        }
108    }
109
110    let mut tiers = Vec::new();
111    let mut remaining = in_degree.clone();
112    let mut completed: Vec<bool> = vec![false; steps.len()];
113
114    loop {
115        let tier: Vec<usize> = (0..steps.len())
116            .filter(|&i| !completed[i] && remaining[i] == 0)
117            .collect();
118
119        if tier.is_empty() {
120            break;
121        }
122
123        for &i in &tier {
124            completed[i] = true;
125        }
126
127        // Decrement in-degrees for dependents of this tier
128        for &i in &tier {
129            for (j, step) in steps.iter().enumerate() {
130                if !completed[j] && step.depends_on.contains(&steps[i].name) {
131                    remaining[j] -= 1;
132                }
133            }
134        }
135
136        tiers.push(tier.iter().map(|&i| &steps[i]).collect());
137    }
138
139    let completed_count: usize = completed.iter().filter(|&&c| c).count();
140    if completed_count != steps.len() {
141        return Err(ZigError::Execution(
142            "could not resolve all steps — possible undetected cycle".into(),
143        ));
144    }
145
146    Ok(tiers)
147}
148
149/// Replace `${var_name}` references in a template with values from the variable map.
150///
151/// Supports dotted paths like `${result.score}` — the root variable name is
152/// looked up, and if its value is valid JSON, the nested path is traversed.
153/// Unknown variables are left as-is.
154fn substitute_vars(template: &str, vars: &HashMap<String, String>) -> String {
155    let mut result = String::with_capacity(template.len());
156    let mut rest = template;
157
158    while let Some(start) = rest.find("${") {
159        result.push_str(&rest[..start]);
160        let after_start = &rest[start + 2..];
161
162        if let Some(end) = after_start.find('}') {
163            let var_expr = &after_start[..end];
164            let mut parts = var_expr.splitn(2, '.');
165            let root = parts.next().unwrap_or(var_expr);
166
167            if let Some(value) = vars.get(root) {
168                if let Some(path) = parts.next() {
169                    // Try to navigate a JSON path
170                    if let Ok(json) = serde_json::from_str::<serde_json::Value>(value) {
171                        let resolved = json_path_lookup(&json, path);
172                        result.push_str(&resolved);
173                    } else {
174                        result.push_str(value);
175                    }
176                } else {
177                    result.push_str(value);
178                }
179            } else {
180                // Unknown variable — leave as-is
181                result.push_str(&rest[start..start + 2 + end + 1]);
182            }
183
184            rest = &after_start[end + 1..];
185        } else {
186            result.push_str(&rest[start..]);
187            rest = "";
188        }
189    }
190
191    result.push_str(rest);
192    result
193}
194
195/// Look up a dotted path in a JSON value (e.g., "nested.field").
196fn json_path_lookup(value: &serde_json::Value, path: &str) -> String {
197    let mut current = value;
198    for key in path.split('.') {
199        match current.get(key) {
200            Some(v) => current = v,
201            None => return format!("${{?.{path}}}"),
202        }
203    }
204    match current {
205        serde_json::Value::String(s) => s.clone(),
206        other => other.to_string(),
207    }
208}
209
210/// Resolve the effective system prompt for a step.
211///
212/// Resolution order:
213/// 1. If `step.system_prompt` is set, use it (with variable substitution).
214/// 2. If `step.role` is set, resolve the role name (may contain `${var}`),
215///    look it up in the roles table, and return the role's system prompt
216///    (loaded from file if `system_prompt_file` is set).
217/// 3. Otherwise, return `None`.
218fn resolve_role_system_prompt(
219    step: &Step,
220    roles: &HashMap<String, Role>,
221    vars: &HashMap<String, String>,
222    workflow_dir: &Path,
223) -> Result<Option<String>, ZigError> {
224    // Direct system_prompt on step takes priority
225    if let Some(ref sp) = step.system_prompt {
226        return Ok(Some(substitute_vars(sp, vars)));
227    }
228
229    // Role reference
230    if let Some(ref role_ref) = step.role {
231        let resolved_name = substitute_vars(role_ref, vars);
232        let role = roles.get(&resolved_name).ok_or_else(|| {
233            ZigError::Execution(format!(
234                "step '{}' references role '{}' which does not exist",
235                step.name, resolved_name
236            ))
237        })?;
238
239        // Load from file or use inline prompt
240        let raw_prompt = if let Some(ref file_path) = role.system_prompt_file {
241            let full_path = workflow_dir.join(file_path);
242            std::fs::read_to_string(&full_path).map_err(|e| {
243                ZigError::Execution(format!(
244                    "failed to read system_prompt_file '{}' for role '{}': {e}",
245                    full_path.display(),
246                    resolved_name
247                ))
248            })?
249        } else if let Some(ref sp) = role.system_prompt {
250            sp.clone()
251        } else {
252            return Ok(None);
253        };
254
255        return Ok(Some(substitute_vars(&raw_prompt, vars)));
256    }
257
258    Ok(None)
259}
260
261/// Load file-backed default values for variables.
262///
263/// For each variable with `default_file` set (and no `default`), reads the
264/// file contents relative to `workflow_dir` and inserts them into the vars map.
265fn load_file_defaults(
266    vars: &mut HashMap<String, String>,
267    declarations: &HashMap<String, crate::workflow::model::Variable>,
268    workflow_dir: &Path,
269) -> Result<(), ZigError> {
270    for (name, decl) in declarations {
271        if decl.default.is_none() {
272            if let Some(ref file_path) = decl.default_file {
273                let full_path = workflow_dir.join(file_path);
274                let content = std::fs::read_to_string(&full_path).map_err(|e| {
275                    ZigError::Execution(format!(
276                        "failed to read default_file '{}' for variable '{name}': {e}",
277                        full_path.display()
278                    ))
279                })?;
280                vars.insert(name.clone(), content);
281            }
282        }
283    }
284    Ok(())
285}
286
287/// Evaluate a simple condition expression against the current variable state.
288///
289/// Supports:
290/// - Numeric comparisons: `score < 8`, `retries <= max_retries`
291/// - String equality: `status == "done"`, `status != "pending"`
292/// - Truthy checks: `approved` (true if value is "true" or non-empty and non-zero)
293fn evaluate_condition(condition: &str, vars: &HashMap<String, String>) -> Result<bool, ZigError> {
294    let condition = condition.trim();
295
296    // Try comparison operators (ordered by length to match `<=` before `<`)
297    let operators = ["<=", ">=", "!=", "==", "<", ">"];
298    for op in &operators {
299        if let Some(pos) = condition.find(op) {
300            let lhs = resolve_operand(condition[..pos].trim(), vars);
301            let rhs = resolve_operand(condition[pos + op.len()..].trim(), vars);
302            return Ok(compare(&lhs, &rhs, op));
303        }
304    }
305
306    // Truthy check: single variable name
307    let value = vars.get(condition).map(|s| s.as_str()).unwrap_or("");
308    Ok(is_truthy(value))
309}
310
311/// Resolve a condition operand to its string value.
312/// - String literals ("done") → done
313/// - Variable names → looked up in vars
314/// - Numeric literals → left as-is
315fn resolve_operand(token: &str, vars: &HashMap<String, String>) -> String {
316    // Strip surrounding quotes for string literals
317    if (token.starts_with('"') && token.ends_with('"'))
318        || (token.starts_with('\'') && token.ends_with('\''))
319    {
320        return token[1..token.len() - 1].to_string();
321    }
322    // Try variable lookup
323    if let Some(val) = vars.get(token) {
324        return val.clone();
325    }
326    // Return as-is (numeric literal or unknown)
327    token.to_string()
328}
329
330/// Compare two string operands with the given operator.
331/// Attempts numeric comparison first, falls back to lexicographic.
332fn compare(lhs: &str, rhs: &str, op: &str) -> bool {
333    if let (Ok(l), Ok(r)) = (lhs.parse::<f64>(), rhs.parse::<f64>()) {
334        return match op {
335            "==" => (l - r).abs() < f64::EPSILON,
336            "!=" => (l - r).abs() >= f64::EPSILON,
337            "<" => l < r,
338            ">" => l > r,
339            "<=" => l <= r,
340            ">=" => l >= r,
341            _ => false,
342        };
343    }
344    match op {
345        "==" => lhs == rhs,
346        "!=" => lhs != rhs,
347        "<" => lhs < rhs,
348        ">" => lhs > rhs,
349        "<=" => lhs <= rhs,
350        ">=" => lhs >= rhs,
351        _ => false,
352    }
353}
354
355/// Check if a string value is truthy.
356fn is_truthy(value: &str) -> bool {
357    !value.is_empty() && value != "false" && value != "0"
358}
359
360/// Build the final prompt for a step, incorporating variable substitution,
361/// dependency outputs, and the user's context prompt.
362fn render_step_prompt(
363    step: &Step,
364    vars: &HashMap<String, String>,
365    user_prompt: Option<&str>,
366    dependency_outputs: &HashMap<String, String>,
367) -> String {
368    let mut prompt = String::new();
369
370    // Prepend user context if provided
371    if let Some(ctx) = user_prompt {
372        prompt.push_str(&format!("User context: {ctx}\n\n"));
373    }
374
375    // Inject dependency outputs if requested
376    if step.inject_context {
377        for dep in &step.depends_on {
378            if let Some(output) = dependency_outputs.get(dep) {
379                prompt.push_str(&format!("--- Output from '{dep}' ---\n{output}\n\n"));
380            }
381        }
382    }
383
384    // Append the step's prompt with variable substitution
385    prompt.push_str(&substitute_vars(&step.prompt, vars));
386
387    prompt
388}
389
390/// Build the argument list for a zag invocation.
391///
392/// Separated from `execute_step` to allow unit testing of flag logic
393/// without a real `zag` binary. The optional `model_override` is used
394/// during retries when `retry_model` escalates to a different model.
395///
396/// The subcommand is determined by `step.command`:
397/// - `None` → `zag run <prompt>` (default)
398/// - `Review` → `zag review [prompt] [--uncommitted] [--base] [--commit] [--title]`
399/// - `Plan` → `zag plan <prompt> [-o path] [--instructions]`
400/// - `Pipe` → `zag pipe <session-ids...> -- <prompt>`
401/// - `Collect` → `zag collect <session-ids...>`
402/// - `Summary` → `zag summary <session-ids...>`
403fn build_zag_args(
404    step: &Step,
405    prompt: &str,
406    workflow_name: &str,
407    model_override: Option<&str>,
408    rendered_system_prompt: Option<&str>,
409    workflow_provider: Option<&str>,
410    workflow_model: Option<&str>,
411) -> Vec<String> {
412    let session_name = |dep: &str| format!("zig-{workflow_name}-{dep}");
413
414    // Build command-specific prefix and determine if agent args apply
415    let (mut args, accepts_agent_args) = match &step.command {
416        None => (vec!["run".to_string(), prompt.to_string()], true),
417        Some(StepCommand::Review) => {
418            let mut a = vec!["review".to_string()];
419            if !prompt.is_empty() {
420                a.push(prompt.to_string());
421            }
422            if step.uncommitted {
423                a.push("--uncommitted".into());
424            }
425            if let Some(base) = &step.base {
426                a.extend(["--base".into(), base.clone()]);
427            }
428            if let Some(commit) = &step.commit {
429                a.extend(["--commit".into(), commit.clone()]);
430            }
431            if let Some(title) = &step.title {
432                a.extend(["--title".into(), title.clone()]);
433            }
434            (a, true)
435        }
436        Some(StepCommand::Plan) => {
437            let mut a = vec!["plan".to_string(), prompt.to_string()];
438            if let Some(output) = &step.plan_output {
439                a.extend(["-o".into(), output.clone()]);
440            }
441            if let Some(instructions) = &step.instructions {
442                a.extend(["--instructions".into(), instructions.clone()]);
443            }
444            (a, true)
445        }
446        Some(StepCommand::Pipe) => {
447            let mut a = vec!["pipe".to_string()];
448            for dep in &step.depends_on {
449                a.push(session_name(dep));
450            }
451            a.push("--".into());
452            a.push(prompt.to_string());
453            (a, true)
454        }
455        Some(StepCommand::Collect) => {
456            let mut a = vec!["collect".to_string()];
457            for dep in &step.depends_on {
458                a.push(session_name(dep));
459            }
460            (a, false)
461        }
462        Some(StepCommand::Summary) => {
463            let mut a = vec!["summary".to_string()];
464            for dep in &step.depends_on {
465                a.push(session_name(dep));
466            }
467            (a, false)
468        }
469    };
470
471    // Agent args (provider, model, prompts, output, etc.) only apply to
472    // commands that launch an agent: run, review, plan, pipe.
473    if accepts_agent_args {
474        let effective_provider = step.provider.as_deref().or(workflow_provider);
475        if let Some(provider) = effective_provider {
476            args.extend(["--provider".into(), provider.to_string()]);
477        }
478
479        let effective_model = model_override.or(step.model.as_deref()).or(workflow_model);
480        if let Some(model) = effective_model {
481            args.extend(["--model".into(), model.to_string()]);
482        }
483
484        if let Some(sp) = rendered_system_prompt {
485            args.extend(["--system-prompt".into(), sp.to_string()]);
486        }
487        if let Some(max_turns) = step.max_turns {
488            args.extend(["--max-turns".into(), max_turns.to_string()]);
489        }
490
491        // Output format: explicit format overrides the json bool
492        if let Some(output) = &step.output {
493            args.extend(["-o".into(), output.clone()]);
494        } else if step.json {
495            args.push("--json".into());
496        }
497        if let Some(schema) = &step.json_schema {
498            args.extend(["--json-schema".into(), schema.clone()]);
499        }
500
501        if let Some(mcp_config) = &step.mcp_config {
502            args.extend(["--mcp-config".into(), mcp_config.clone()]);
503        }
504
505        // Execution environment
506        if step.auto_approve {
507            args.push("--auto-approve".into());
508        }
509        if let Some(root) = &step.root {
510            args.extend(["--root".into(), root.clone()]);
511        }
512        for dir in &step.add_dirs {
513            args.extend(["--add-dir".into(), dir.clone()]);
514        }
515        for (key, value) in &step.env {
516            args.extend(["--env".into(), format!("{key}={value}")]);
517        }
518        for file in &step.files {
519            args.extend(["--file".into(), file.clone()]);
520        }
521
522        // Context injection
523        for ctx in &step.context {
524            args.extend(["--context".into(), ctx.clone()]);
525        }
526        if let Some(plan) = &step.plan {
527            args.extend(["--plan".into(), plan.clone()]);
528        }
529
530        // Isolation
531        if step.worktree {
532            args.push("--worktree".into());
533        }
534        if let Some(sandbox) = &step.sandbox {
535            args.extend(["--sandbox".into(), sandbox.clone()]);
536        }
537    }
538
539    // Session metadata applies to all commands
540    let name = session_name(&step.name);
541    args.extend(["--name".into(), name]);
542
543    if !step.description.is_empty() {
544        args.extend(["--description".into(), step.description.clone()]);
545    }
546
547    args.extend(["--tag".into(), "zig-workflow".into()]);
548    for tag in &step.tags {
549        args.extend(["--tag".into(), tag.clone()]);
550    }
551
552    if let Some(timeout) = &step.timeout {
553        args.extend(["--timeout".into(), timeout.clone()]);
554    }
555
556    args
557}
558
559/// Spawn `zag` and stream its stdout/stderr live to our stderr while
560/// also accumulating stdout into a buffer for `saves` extraction.
561///
562/// If `prefix` is `Some`, every emitted line is prefixed with `[prefix] `
563/// — used to disambiguate output from steps running in parallel.
564fn run_zag_streaming(
565    args: &[String],
566    step_name: &str,
567    prefix: Option<&str>,
568    session: Option<&Arc<SessionWriter>>,
569) -> Result<(std::process::ExitStatus, String), ZigError> {
570    let mut cmd = Command::new("zag");
571    cmd.args(args)
572        .stdout(std::process::Stdio::piped())
573        .stderr(std::process::Stdio::piped());
574
575    let mut child = cmd
576        .spawn()
577        .map_err(|e| ZigError::Zag(format!("failed to launch zag for step '{step_name}': {e}")))?;
578
579    let stdout = child.stdout.take().expect("stdout was piped");
580    let stderr = child.stderr.take().expect("stderr was piped");
581
582    let buffer = Arc::new(Mutex::new(String::new()));
583    let buffer_clone = Arc::clone(&buffer);
584    let prefix_stdout = prefix.map(String::from);
585    let prefix_stderr = prefix.map(String::from);
586    let session_stdout = session.cloned();
587    let session_stderr = session.cloned();
588    let step_name_stdout = step_name.to_string();
589    let step_name_stderr = step_name.to_string();
590
591    let stdout_thread = thread::spawn(move || {
592        let reader = BufReader::new(stdout);
593        let stderr_handle = std::io::stderr();
594        for line in reader.lines().map_while(Result::ok) {
595            if let Ok(mut buf) = buffer_clone.lock() {
596                buf.push_str(&line);
597                buf.push('\n');
598            }
599            if let Some(w) = &session_stdout {
600                let _ = w.step_output(&step_name_stdout, OutputStream::Stdout, &line);
601            }
602            let mut h = stderr_handle.lock();
603            let _ = match &prefix_stdout {
604                Some(p) => writeln!(h, "[{p}] {line}"),
605                None => writeln!(h, "{line}"),
606            };
607        }
608    });
609
610    let stderr_thread = thread::spawn(move || {
611        let reader = BufReader::new(stderr);
612        let stderr_handle = std::io::stderr();
613        for line in reader.lines().map_while(Result::ok) {
614            if let Some(w) = &session_stderr {
615                let _ = w.step_output(&step_name_stderr, OutputStream::Stderr, &line);
616            }
617            let mut h = stderr_handle.lock();
618            let _ = match &prefix_stderr {
619                Some(p) => writeln!(h, "[{p}] {line}"),
620                None => writeln!(h, "{line}"),
621            };
622        }
623    });
624
625    let status = child
626        .wait()
627        .map_err(|e| ZigError::Execution(format!("failed to wait for child: {e}")))?;
628
629    let _ = stdout_thread.join();
630    let _ = stderr_thread.join();
631
632    let captured = Arc::try_unwrap(buffer)
633        .map_err(|_| ZigError::Execution("buffer still shared after threads joined".into()))?
634        .into_inner()
635        .map_err(|_| ZigError::Execution("output buffer poisoned".into()))?;
636
637    Ok((status, captured))
638}
639
640/// Execute a single step by invoking `zag`, streaming its output live.
641///
642/// Returns the captured stdout from zag. The optional `model_override`
643/// is used during retries to escalate to a different model. The optional
644/// `prefix` tags streamed lines with the step name (used for parallel runs).
645#[allow(clippy::too_many_arguments)]
646fn execute_step(
647    step: &Step,
648    prompt: &str,
649    workflow_name: &str,
650    model_override: Option<&str>,
651    prefix: Option<&str>,
652    session: Option<&Arc<SessionWriter>>,
653    rendered_system_prompt: Option<&str>,
654    workflow_provider: Option<&str>,
655    workflow_model: Option<&str>,
656) -> Result<String, ZigError> {
657    let args = build_zag_args(
658        step,
659        prompt,
660        workflow_name,
661        model_override,
662        rendered_system_prompt,
663        workflow_provider,
664        workflow_model,
665    );
666    let (status, stdout) = run_zag_streaming(&args, &step.name, prefix, session)?;
667
668    if !status.success() {
669        return Err(ZigError::Execution(format!(
670            "step '{}' failed (exit {})",
671            step.name, status,
672        )));
673    }
674
675    Ok(stdout)
676}
677
678/// Run a step with retry logic, returning its captured stdout on success.
679///
680/// Extracted so both sequential and parallel execution paths share the
681/// same retry / model-escalation behavior.
682#[allow(clippy::too_many_arguments)]
683fn run_step_attempts(
684    step: &Step,
685    prompt: &str,
686    workflow_name: &str,
687    prefix: Option<&str>,
688    session: Option<&Arc<SessionWriter>>,
689    rendered_system_prompt: Option<&str>,
690    workflow_provider: Option<&str>,
691    workflow_model: Option<&str>,
692) -> Result<String, ZigError> {
693    let mut attempts = 0;
694    let max_attempts = if step.on_failure.as_ref() == Some(&FailurePolicy::Retry) {
695        step.max_retries.unwrap_or(1) + 1
696    } else {
697        1
698    };
699
700    loop {
701        attempts += 1;
702        let model_override = if attempts > 1 {
703            step.retry_model.as_deref()
704        } else {
705            None
706        };
707        match execute_step(
708            step,
709            prompt,
710            workflow_name,
711            model_override,
712            prefix,
713            session,
714            rendered_system_prompt,
715            workflow_provider,
716            workflow_model,
717        ) {
718            Ok(output) => return Ok(output),
719            Err(e) => {
720                if let Some(w) = session {
721                    let _ = w.step_failed(&step.name, None, attempts, &e.to_string());
722                }
723                if attempts < max_attempts {
724                    eprintln!(
725                        "    retry {}/{} for step '{}'",
726                        attempts,
727                        max_attempts - 1,
728                        step.name
729                    );
730                    continue;
731                }
732                return Err(e);
733            }
734        }
735    }
736}
737
738/// Extract variable values from step output using `saves` selectors.
739///
740/// Selectors:
741/// - `"$"` — the entire output
742/// - `"$.field"` — a top-level JSON field
743/// - `"$.nested.field"` — a nested JSON field
744fn extract_saves(
745    output: &str,
746    saves: &HashMap<String, String>,
747) -> Result<HashMap<String, String>, ZigError> {
748    let mut extracted = HashMap::new();
749
750    for (var_name, selector) in saves {
751        let value = if selector == "$" {
752            output.trim().to_string()
753        } else if let Some(path) = selector.strip_prefix("$.") {
754            let json: serde_json::Value = serde_json::from_str(output.trim()).map_err(|e| {
755                ZigError::Execution(format!(
756                    "saves selector '{selector}' requires JSON output, but got parse error: {e}"
757                ))
758            })?;
759            json_path_lookup(&json, path)
760        } else {
761            output.trim().to_string()
762        };
763
764        extracted.insert(var_name.clone(), value);
765    }
766
767    Ok(extracted)
768}
769
770/// Partition a tier of steps into sequential steps and race groups.
771///
772/// Steps without a `race_group` are returned as sequential. Steps sharing
773/// the same `race_group` value are grouped together for parallel execution.
774fn partition_tier<'a>(tier: &[&'a Step]) -> (Vec<&'a Step>, HashMap<String, Vec<&'a Step>>) {
775    let mut sequential = Vec::new();
776    let mut race_groups: HashMap<String, Vec<&'a Step>> = HashMap::new();
777
778    for step in tier {
779        if let Some(group) = &step.race_group {
780            race_groups.entry(group.clone()).or_default().push(step);
781        } else {
782            sequential.push(*step);
783        }
784    }
785
786    (sequential, race_groups)
787}
788
789/// Spawn a step as a child process without waiting for it to finish.
790fn spawn_step(
791    step: &Step,
792    prompt: &str,
793    workflow_name: &str,
794    rendered_system_prompt: Option<&str>,
795    workflow_provider: Option<&str>,
796    workflow_model: Option<&str>,
797) -> Result<std::process::Child, ZigError> {
798    let args = build_zag_args(
799        step,
800        prompt,
801        workflow_name,
802        None,
803        rendered_system_prompt,
804        workflow_provider,
805        workflow_model,
806    );
807    let mut cmd = Command::new("zag");
808    cmd.args(&args)
809        .stdout(std::process::Stdio::piped())
810        .stderr(std::process::Stdio::piped());
811
812    cmd.spawn()
813        .map_err(|e| ZigError::Zag(format!("failed to spawn zag for step '{}': {e}", step.name)))
814}
815
816/// Execute a race group: run all steps in parallel, return the first winner.
817///
818/// When one step finishes successfully, all remaining steps are killed.
819/// Returns the winning step's name and its stdout output.
820#[allow(clippy::too_many_arguments)]
821fn execute_race_group(
822    steps: &[&Step],
823    prompts: &HashMap<String, String>,
824    system_prompts: &HashMap<String, String>,
825    workflow_name: &str,
826    tier_index: usize,
827    session: Option<&Arc<SessionWriter>>,
828    workflow_provider: Option<&str>,
829    workflow_model: Option<&str>,
830) -> Result<(String, String), ZigError> {
831    if let Some(w) = session {
832        for step in steps {
833            let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
834            let preview = prompts
835                .get(&step.name)
836                .map(|p| prompt_preview(p))
837                .unwrap_or_default();
838            let _ = w.step_started(
839                &step.name,
840                tier_index,
841                &zag_session_id,
842                zag_command_name(&step.command),
843                step.model.as_deref(),
844                &preview,
845            );
846        }
847    }
848    let race_started = Instant::now();
849    let mut children: Vec<(String, std::process::Child)> = Vec::new();
850
851    for step in steps {
852        let prompt = prompts.get(&step.name).ok_or_else(|| {
853            ZigError::Execution(format!("missing prompt for step '{}'", step.name))
854        })?;
855        eprintln!("  racing step '{}'...", step.name);
856        let rendered_sp = system_prompts.get(&step.name).map(|s| s.as_str());
857        let child = spawn_step(
858            step,
859            prompt,
860            workflow_name,
861            rendered_sp,
862            workflow_provider,
863            workflow_model,
864        )?;
865        children.push((step.name.clone(), child));
866    }
867
868    // Poll until one finishes
869    loop {
870        for i in 0..children.len() {
871            let status = children[i]
872                .1
873                .try_wait()
874                .map_err(|e| ZigError::Execution(format!("failed to poll child: {e}")))?;
875
876            if let Some(exit_status) = status {
877                let (winner_name, winner_child) = children.remove(i);
878
879                // Kill remaining children
880                for (name, mut child) in children {
881                    eprintln!("  cancelling step '{name}' (race lost)");
882                    let _ = child.kill();
883                    let _ = child.wait();
884                }
885
886                let elapsed = race_started.elapsed().as_millis() as u64;
887                if !exit_status.success() {
888                    let stderr = winner_child
889                        .stderr
890                        .map(|mut s| {
891                            let mut buf = String::new();
892                            std::io::Read::read_to_string(&mut s, &mut buf).ok();
893                            buf
894                        })
895                        .unwrap_or_default();
896                    let err_msg = format!(
897                        "race winner '{}' failed (exit {}): {}",
898                        winner_name,
899                        exit_status,
900                        stderr.trim()
901                    );
902                    if let Some(w) = session {
903                        let _ = w.step_failed(&winner_name, exit_status.code(), 1, &err_msg);
904                    }
905                    return Err(ZigError::Execution(err_msg));
906                }
907
908                let stdout = winner_child
909                    .stdout
910                    .map(|mut s| {
911                        let mut buf = String::new();
912                        std::io::Read::read_to_string(&mut s, &mut buf).ok();
913                        buf
914                    })
915                    .unwrap_or_default();
916
917                eprintln!("  race won by '{winner_name}'");
918                if let Some(w) = session {
919                    let _ = w.step_completed(&winner_name, 0, elapsed, Vec::new());
920                }
921                return Ok((winner_name, stdout));
922            }
923        }
924
925        std::thread::sleep(Duration::from_millis(100));
926    }
927}
928
929/// Execute a single sequential step with retry logic, saves, and next-jump handling.
930#[allow(clippy::too_many_arguments)]
931fn execute_sequential_step(
932    step: &Step,
933    vars: &mut HashMap<String, String>,
934    user_prompt: Option<&str>,
935    step_outputs: &mut HashMap<String, String>,
936    workflow_name: &str,
937    pending_next: &mut Option<String>,
938    tier_index: usize,
939    session: Option<&Arc<SessionWriter>>,
940    roles: &HashMap<String, Role>,
941    workflow_dir: &Path,
942    workflow_provider: Option<&str>,
943    workflow_model: Option<&str>,
944) -> Result<(), ZigError> {
945    if let Some(condition) = &step.condition {
946        if !evaluate_condition(condition, vars)? {
947            eprintln!(
948                "  skipping '{}' (condition not met: {condition})",
949                step.name
950            );
951            if let Some(w) = session {
952                let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
953            }
954            return Ok(());
955        }
956    }
957
958    eprintln!("  running step '{}'...", step.name);
959
960    let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
961    let rendered_sp = resolve_role_system_prompt(step, roles, vars, workflow_dir)?;
962    if let Some(w) = session {
963        let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
964        let _ = w.step_started(
965            &step.name,
966            tier_index,
967            &zag_session_id,
968            zag_command_name(&step.command),
969            step.model.as_deref(),
970            &prompt_preview(&prompt),
971        );
972    }
973    let started = Instant::now();
974    let result = run_step_attempts(
975        step,
976        &prompt,
977        workflow_name,
978        None,
979        session,
980        rendered_sp.as_deref(),
981        workflow_provider,
982        workflow_model,
983    );
984
985    match result {
986        Ok(output) => {
987            let mut saved_keys: Vec<String> = Vec::new();
988            if !step.saves.is_empty() {
989                let saved = extract_saves(&output, &step.saves)?;
990                for (k, v) in &saved {
991                    eprintln!("    saved {k} = {v}");
992                    saved_keys.push(k.clone());
993                }
994                vars.extend(saved);
995            }
996
997            step_outputs.insert(step.name.clone(), output);
998            eprintln!("  completed '{}'", step.name);
999            if let Some(w) = session {
1000                let _ = w.step_completed(
1001                    &step.name,
1002                    0,
1003                    started.elapsed().as_millis() as u64,
1004                    saved_keys,
1005                );
1006            }
1007
1008            if step.next.is_some() {
1009                *pending_next = step.next.clone();
1010            }
1011        }
1012        Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1013            FailurePolicy::Fail => return Err(e),
1014            FailurePolicy::Continue => {
1015                eprintln!("  step '{}' failed (continuing): {e}", step.name);
1016            }
1017            FailurePolicy::Retry => {
1018                return Err(e);
1019            }
1020        },
1021    }
1022
1023    Ok(())
1024}
1025
1026/// Run multiple independent steps in a tier concurrently.
1027///
1028/// All non-skipped steps are spawned in their own threads and we wait for
1029/// every one to finish (unlike race groups, which kill losers). Output
1030/// lines from each step are streamed live to stderr, prefixed with the
1031/// step name to disambiguate. After completion, results are processed in
1032/// tier-declaration order so `saves`, `next`, and `on_failure` semantics
1033/// remain deterministic.
1034#[allow(clippy::too_many_arguments)]
1035fn execute_parallel_tier(
1036    steps: &[&Step],
1037    vars: &mut HashMap<String, String>,
1038    user_prompt: Option<&str>,
1039    step_outputs: &mut HashMap<String, String>,
1040    workflow_name: &str,
1041    pending_next: &mut Option<String>,
1042    tier_index: usize,
1043    session: Option<&Arc<SessionWriter>>,
1044    roles: &HashMap<String, Role>,
1045    workflow_dir: &Path,
1046    workflow_provider: Option<&str>,
1047    workflow_model: Option<&str>,
1048) -> Result<(), ZigError> {
1049    // Evaluate conditions and render prompts up front, so threads receive
1050    // the same variable snapshot they would have under sequential execution.
1051    let mut active: Vec<&Step> = Vec::new();
1052    let mut prompts: HashMap<String, String> = HashMap::new();
1053    let mut rendered_sps: HashMap<String, String> = HashMap::new();
1054    for step in steps {
1055        if let Some(condition) = &step.condition {
1056            if !evaluate_condition(condition, vars)? {
1057                eprintln!(
1058                    "  skipping '{}' (condition not met: {condition})",
1059                    step.name
1060                );
1061                if let Some(w) = session {
1062                    let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
1063                }
1064                continue;
1065            }
1066        }
1067        let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
1068        prompts.insert(step.name.clone(), prompt);
1069        if let Some(sp) = resolve_role_system_prompt(step, roles, vars, workflow_dir)? {
1070            rendered_sps.insert(step.name.clone(), sp);
1071        }
1072        active.push(*step);
1073    }
1074
1075    if active.is_empty() {
1076        return Ok(());
1077    }
1078
1079    eprintln!("  running {} steps in parallel...", active.len());
1080
1081    let mut start_times: HashMap<String, Instant> = HashMap::new();
1082    let mut handles: Vec<thread::JoinHandle<(String, Result<String, ZigError>)>> = Vec::new();
1083    for step in &active {
1084        let step_clone: Step = (*step).clone();
1085        let prompt = prompts.remove(&step.name).unwrap_or_default();
1086        let rendered_sp = rendered_sps.remove(&step.name);
1087        let workflow_name_owned = workflow_name.to_string();
1088        let name = step.name.clone();
1089        eprintln!("  starting '{name}'...");
1090        if let Some(w) = session {
1091            let zag_session_id = format!("zig-{workflow_name}-{name}");
1092            let _ = w.step_started(
1093                &name,
1094                tier_index,
1095                &zag_session_id,
1096                zag_command_name(&step.command),
1097                step.model.as_deref(),
1098                &prompt_preview(&prompt),
1099            );
1100        }
1101        start_times.insert(name.clone(), Instant::now());
1102        let session_clone = session.cloned();
1103        let wf_provider = workflow_provider.map(String::from);
1104        let wf_model = workflow_model.map(String::from);
1105        let handle = thread::spawn(move || {
1106            let res = run_step_attempts(
1107                &step_clone,
1108                &prompt,
1109                &workflow_name_owned,
1110                Some(&name),
1111                session_clone.as_ref(),
1112                rendered_sp.as_deref(),
1113                wf_provider.as_deref(),
1114                wf_model.as_deref(),
1115            );
1116            (name, res)
1117        });
1118        handles.push(handle);
1119    }
1120
1121    let mut results: HashMap<String, Result<String, ZigError>> = HashMap::new();
1122    for handle in handles {
1123        match handle.join() {
1124            Ok((name, res)) => {
1125                results.insert(name, res);
1126            }
1127            Err(_) => {
1128                return Err(ZigError::Execution("parallel step thread panicked".into()));
1129            }
1130        }
1131    }
1132
1133    // Process results in tier-declaration order so `next` is deterministic.
1134    let mut errors: Vec<String> = Vec::new();
1135    for step in &active {
1136        let Some(res) = results.remove(&step.name) else {
1137            continue;
1138        };
1139        let elapsed = start_times
1140            .remove(&step.name)
1141            .map(|t| t.elapsed().as_millis() as u64)
1142            .unwrap_or(0);
1143        match res {
1144            Ok(output) => {
1145                let mut saved_keys: Vec<String> = Vec::new();
1146                if !step.saves.is_empty() {
1147                    let saved = extract_saves(&output, &step.saves)?;
1148                    for (k, v) in &saved {
1149                        eprintln!("    saved {k} = {v}");
1150                        saved_keys.push(k.clone());
1151                    }
1152                    vars.extend(saved);
1153                }
1154                step_outputs.insert(step.name.clone(), output);
1155                eprintln!("  completed '{}'", step.name);
1156                if let Some(w) = session {
1157                    let _ = w.step_completed(&step.name, 0, elapsed, saved_keys);
1158                }
1159                if step.next.is_some() && pending_next.is_none() {
1160                    *pending_next = step.next.clone();
1161                }
1162            }
1163            Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1164                FailurePolicy::Continue => {
1165                    eprintln!("  step '{}' failed (continuing): {e}", step.name);
1166                }
1167                FailurePolicy::Fail | FailurePolicy::Retry => {
1168                    errors.push(format!("'{}': {e}", step.name));
1169                }
1170            },
1171        }
1172    }
1173
1174    if !errors.is_empty() {
1175        return Err(ZigError::Execution(format!(
1176            "parallel step(s) failed: {}",
1177            errors.join("; ")
1178        )));
1179    }
1180
1181    Ok(())
1182}
1183
1184/// Initialize the variable map from workflow variable definitions.
1185/// Variables with defaults are set to their default value; others are empty.
1186fn init_vars(workflow: &Workflow) -> HashMap<String, String> {
1187    let mut vars = HashMap::new();
1188    for (name, var) in &workflow.vars {
1189        let value = match &var.default {
1190            Some(toml::Value::String(s)) => s.clone(),
1191            Some(toml::Value::Integer(n)) => n.to_string(),
1192            Some(toml::Value::Float(f)) => f.to_string(),
1193            Some(toml::Value::Boolean(b)) => b.to_string(),
1194            Some(other) => other.to_string(),
1195            None => String::new(),
1196        };
1197        vars.insert(name.clone(), value);
1198    }
1199    vars
1200}
1201
1202/// Main execution loop for a validated workflow.
1203fn execute(
1204    workflow: &Workflow,
1205    workflow_path: &std::path::Path,
1206    user_prompt: Option<&str>,
1207    workflow_dir: &Path,
1208) -> Result<(), ZigError> {
1209    let mut vars = init_vars(workflow);
1210
1211    // Load file-backed variable defaults before prompt binding.
1212    load_file_defaults(&mut vars, &workflow.vars, workflow_dir)?;
1213
1214    // Bind user prompt to the variable with `from = "prompt"`, if any.
1215    let prompt_var = workflow
1216        .vars
1217        .iter()
1218        .find(|(_, v)| v.from.as_deref() == Some("prompt"))
1219        .map(|(name, _)| name.clone());
1220
1221    if let Some(ref var_name) = prompt_var {
1222        if let Some(prompt) = user_prompt {
1223            vars.insert(var_name.clone(), prompt.to_string());
1224        }
1225    }
1226
1227    // Validate variable values against constraints before executing.
1228    if let Err(errors) = validate::validate_var_values(&vars, &workflow.vars) {
1229        let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
1230        return Err(ZigError::Validation(msgs.join("; ")));
1231    }
1232
1233    // When prompt is bound to a variable, don't also prepend "User context:".
1234    let effective_user_prompt = if prompt_var.is_some() {
1235        None
1236    } else {
1237        user_prompt
1238    };
1239
1240    let mut step_outputs: HashMap<String, String> = HashMap::new();
1241
1242    let wf_provider = workflow.workflow.provider.as_deref();
1243    let wf_model = workflow.workflow.model.as_deref();
1244
1245    let tiers = topological_sort(&workflow.steps)?;
1246
1247    eprintln!(
1248        "running workflow '{}' ({} steps in {} tiers)",
1249        workflow.workflow.name,
1250        workflow.steps.len(),
1251        tiers.len()
1252    );
1253
1254    // Open a zig session log for this run. Failure to open the log is
1255    // non-fatal — `zig run` should still execute even if `~/.zig` is
1256    // unwritable. The session writer is `Option<Arc<...>>` everywhere
1257    // downstream so the writer-less path stays intact for tests.
1258    let coordinator = match SessionWriter::create(
1259        &workflow.workflow.name,
1260        &workflow_path.to_string_lossy(),
1261        user_prompt,
1262        tiers.len(),
1263    ) {
1264        Ok(writer) => {
1265            eprintln!("zig session: {}", writer.session_id());
1266            Some(SessionCoordinator::start(writer))
1267        }
1268        Err(e) => {
1269            eprintln!("warning: failed to open zig session log: {e}");
1270            None
1271        }
1272    };
1273    let session_writer: Option<Arc<SessionWriter>> = coordinator.as_ref().map(|c| c.writer());
1274    let session_ref = session_writer.as_ref();
1275
1276    let mut iteration = 0;
1277    let mut pending_next: Option<String> = None;
1278
1279    loop {
1280        let tiers_to_run = if let Some(ref next_step) = pending_next {
1281            // Re-run from the target step onward
1282            let remaining: Vec<Vec<&Step>> = tiers
1283                .iter()
1284                .map(|tier| {
1285                    tier.iter()
1286                        .filter(|s| s.name == *next_step)
1287                        .copied()
1288                        .collect::<Vec<_>>()
1289                })
1290                .filter(|tier| !tier.is_empty())
1291                .collect();
1292            pending_next = None;
1293            remaining
1294        } else if iteration == 0 {
1295            tiers.clone()
1296        } else {
1297            break;
1298        };
1299
1300        for (tier_index, tier) in tiers_to_run.iter().enumerate() {
1301            let (non_race, race_groups) = partition_tier(tier);
1302
1303            if let Some(w) = session_ref {
1304                let names: Vec<String> = tier.iter().map(|s| s.name.clone()).collect();
1305                let _ = w.tier_started(tier_index, names);
1306            }
1307
1308            // Independent steps in the same tier run concurrently. A single
1309            // step takes the sequential path so its output streams without
1310            // a name prefix; multiple steps go through the parallel path.
1311            if non_race.len() <= 1 {
1312                for step in &non_race {
1313                    execute_sequential_step(
1314                        step,
1315                        &mut vars,
1316                        effective_user_prompt,
1317                        &mut step_outputs,
1318                        &workflow.workflow.name,
1319                        &mut pending_next,
1320                        tier_index,
1321                        session_ref,
1322                        &workflow.roles,
1323                        workflow_dir,
1324                        wf_provider,
1325                        wf_model,
1326                    )?;
1327                }
1328            } else {
1329                execute_parallel_tier(
1330                    &non_race,
1331                    &mut vars,
1332                    effective_user_prompt,
1333                    &mut step_outputs,
1334                    &workflow.workflow.name,
1335                    &mut pending_next,
1336                    tier_index,
1337                    session_ref,
1338                    &workflow.roles,
1339                    workflow_dir,
1340                    wf_provider,
1341                    wf_model,
1342                )?;
1343            }
1344
1345            // Run race groups in parallel
1346            for (group_name, race_steps) in &race_groups {
1347                eprintln!("  starting race group '{group_name}'...");
1348
1349                // Build prompts for all racers (conditions evaluated here)
1350                let mut prompts = HashMap::new();
1351                let mut race_sps: HashMap<String, String> = HashMap::new();
1352                let mut active_steps: Vec<&Step> = Vec::new();
1353                for step in race_steps {
1354                    if let Some(condition) = &step.condition {
1355                        if !evaluate_condition(condition, &vars)? {
1356                            eprintln!(
1357                                "  skipping '{}' (condition not met: {condition})",
1358                                step.name
1359                            );
1360                            continue;
1361                        }
1362                    }
1363                    let prompt =
1364                        render_step_prompt(step, &vars, effective_user_prompt, &step_outputs);
1365                    prompts.insert(step.name.clone(), prompt);
1366                    if let Some(sp) =
1367                        resolve_role_system_prompt(step, &workflow.roles, &vars, workflow_dir)?
1368                    {
1369                        race_sps.insert(step.name.clone(), sp);
1370                    }
1371                    active_steps.push(step);
1372                }
1373
1374                if active_steps.is_empty() {
1375                    continue;
1376                }
1377
1378                match execute_race_group(
1379                    &active_steps,
1380                    &prompts,
1381                    &race_sps,
1382                    &workflow.workflow.name,
1383                    tier_index,
1384                    session_ref,
1385                    wf_provider,
1386                    wf_model,
1387                ) {
1388                    Ok((winner_name, output)) => {
1389                        // Find the winning step to process saves/next
1390                        if let Some(winner) = active_steps.iter().find(|s| s.name == winner_name) {
1391                            if !winner.saves.is_empty() {
1392                                let saved = extract_saves(&output, &winner.saves)?;
1393                                for (k, v) in &saved {
1394                                    eprintln!("    saved {k} = {v}");
1395                                }
1396                                vars.extend(saved);
1397                            }
1398                            if winner.next.is_some() {
1399                                pending_next = winner.next.clone();
1400                            }
1401                        }
1402                        step_outputs.insert(winner_name.clone(), output);
1403                        eprintln!(
1404                            "  completed race group '{group_name}' (winner: '{winner_name}')"
1405                        );
1406                    }
1407                    Err(e) => return Err(e),
1408                }
1409            }
1410        }
1411
1412        iteration += 1;
1413        if pending_next.is_none() || iteration >= MAX_LOOP_ITERATIONS {
1414            if iteration >= MAX_LOOP_ITERATIONS {
1415                eprintln!("warning: reached maximum loop iterations ({MAX_LOOP_ITERATIONS})");
1416            }
1417            break;
1418        }
1419    }
1420
1421    eprintln!("workflow '{}' completed", workflow.workflow.name);
1422    if let Some(c) = coordinator {
1423        let _ = c.finish(SessionStatus::Success);
1424    }
1425    Ok(())
1426}
1427
1428/// Short label for the zag subcommand a step will invoke. Used in
1429/// `step_started` events so listeners can distinguish run/review/plan/etc.
1430fn zag_command_name(cmd: &Option<StepCommand>) -> &'static str {
1431    match cmd {
1432        None => "run",
1433        Some(StepCommand::Review) => "review",
1434        Some(StepCommand::Plan) => "plan",
1435        Some(StepCommand::Pipe) => "pipe",
1436        Some(StepCommand::Collect) => "collect",
1437        Some(StepCommand::Summary) => "summary",
1438    }
1439}
1440
1441/// Truncated single-line preview of a rendered prompt for the session log.
1442fn prompt_preview(prompt: &str) -> String {
1443    const MAX: usize = 200;
1444    let collapsed: String = prompt
1445        .chars()
1446        .map(|c| if c == '\n' { ' ' } else { c })
1447        .collect();
1448    if collapsed.chars().count() <= MAX {
1449        collapsed
1450    } else {
1451        let truncated: String = collapsed.chars().take(MAX).collect();
1452        format!("{truncated}…")
1453    }
1454}
1455
1456#[cfg(test)]
1457#[path = "run_tests.rs"]
1458mod tests;