1use std::collections::HashMap;
2use std::io::{BufRead, BufReader, Write};
3use std::path::{Path, PathBuf};
4use std::process::Command;
5use std::sync::{Arc, Mutex};
6use std::thread;
7use std::time::Duration;
8
9use std::time::Instant;
10
11use crate::error::ZigError;
12use crate::resources::{ResourceCollector, render_system_block};
13use crate::session::{OutputStream, SessionCoordinator, SessionStatus, SessionWriter};
14use crate::workflow::model::{FailurePolicy, Role, Step, StepCommand, Workflow};
15use crate::workflow::{parser, validate};
16
17const MAX_LOOP_ITERATIONS: usize = 100;
19
20pub fn run_workflow(
30 workflow_path: &str,
31 user_prompt: Option<&str>,
32 disable_resources: bool,
33) -> Result<(), ZigError> {
34 check_zag()?;
35
36 let path = resolve_workflow_path(workflow_path)?;
37 let (workflow, source) = parser::parse_workflow(&path)?;
38
39 if let Err(errors) = validate::validate(&workflow) {
40 let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
41 return Err(ZigError::Validation(msgs.join("; ")));
42 }
43
44 execute(
45 &workflow,
46 &path,
47 user_prompt,
48 source.dir(),
49 disable_resources,
50 )
51}
52
53pub(crate) fn check_zag() -> Result<(), ZigError> {
55 let zag_available = Command::new("zag")
56 .arg("--version")
57 .output()
58 .is_ok_and(|o| o.status.success());
59
60 if !zag_available {
61 return Err(ZigError::Zag(
62 "zag is not installed or not in PATH. Install it from https://github.com/niclaslindstedt/zag".into(),
63 ));
64 }
65 Ok(())
66}
67
68pub fn resolve_workflow_path(workflow: &str) -> Result<PathBuf, ZigError> {
76 let mut candidates = vec![
77 PathBuf::from(workflow),
78 PathBuf::from(format!("{workflow}.zug")),
79 PathBuf::from(format!("workflows/{workflow}")),
80 PathBuf::from(format!("workflows/{workflow}.zug")),
81 ];
82
83 if let Some(global_dir) = crate::paths::global_workflows_dir() {
84 candidates.push(global_dir.join(workflow));
85 candidates.push(global_dir.join(format!("{workflow}.zug")));
86 }
87
88 for candidate in &candidates {
89 if candidate.exists() {
90 return Ok(candidate.clone());
91 }
92 }
93
94 Err(ZigError::Io(format!(
95 "workflow not found: '{workflow}' (tried: {})",
96 candidates
97 .iter()
98 .map(|p| p.display().to_string())
99 .collect::<Vec<_>>()
100 .join(", ")
101 )))
102}
103
104fn topological_sort(steps: &[Step]) -> Result<Vec<Vec<&Step>>, ZigError> {
110 let step_index: HashMap<&str, usize> = steps
111 .iter()
112 .enumerate()
113 .map(|(i, s)| (s.name.as_str(), i))
114 .collect();
115
116 let mut in_degree = vec![0usize; steps.len()];
117 for (i, step) in steps.iter().enumerate() {
118 for dep in &step.depends_on {
119 if step_index.contains_key(dep.as_str()) {
120 in_degree[i] += 1;
121 }
122 }
123 }
124
125 let mut tiers = Vec::new();
126 let mut remaining = in_degree.clone();
127 let mut completed: Vec<bool> = vec![false; steps.len()];
128
129 loop {
130 let tier: Vec<usize> = (0..steps.len())
131 .filter(|&i| !completed[i] && remaining[i] == 0)
132 .collect();
133
134 if tier.is_empty() {
135 break;
136 }
137
138 for &i in &tier {
139 completed[i] = true;
140 }
141
142 for &i in &tier {
144 for (j, step) in steps.iter().enumerate() {
145 if !completed[j] && step.depends_on.contains(&steps[i].name) {
146 remaining[j] -= 1;
147 }
148 }
149 }
150
151 tiers.push(tier.iter().map(|&i| &steps[i]).collect());
152 }
153
154 let completed_count: usize = completed.iter().filter(|&&c| c).count();
155 if completed_count != steps.len() {
156 return Err(ZigError::Execution(
157 "could not resolve all steps — possible undetected cycle".into(),
158 ));
159 }
160
161 Ok(tiers)
162}
163
164fn substitute_vars(template: &str, vars: &HashMap<String, String>) -> String {
170 let mut result = String::with_capacity(template.len());
171 let mut rest = template;
172
173 while let Some(start) = rest.find("${") {
174 result.push_str(&rest[..start]);
175 let after_start = &rest[start + 2..];
176
177 if let Some(end) = after_start.find('}') {
178 let var_expr = &after_start[..end];
179 let mut parts = var_expr.splitn(2, '.');
180 let root = parts.next().unwrap_or(var_expr);
181
182 if let Some(value) = vars.get(root) {
183 if let Some(path) = parts.next() {
184 if let Ok(json) = serde_json::from_str::<serde_json::Value>(value) {
186 let resolved = json_path_lookup(&json, path);
187 result.push_str(&resolved);
188 } else {
189 result.push_str(value);
190 }
191 } else {
192 result.push_str(value);
193 }
194 } else {
195 result.push_str(&rest[start..start + 2 + end + 1]);
197 }
198
199 rest = &after_start[end + 1..];
200 } else {
201 result.push_str(&rest[start..]);
202 rest = "";
203 }
204 }
205
206 result.push_str(rest);
207 result
208}
209
210fn json_path_lookup(value: &serde_json::Value, path: &str) -> String {
212 let mut current = value;
213 for key in path.split('.') {
214 match current.get(key) {
215 Some(v) => current = v,
216 None => return format!("${{?.{path}}}"),
217 }
218 }
219 match current {
220 serde_json::Value::String(s) => s.clone(),
221 other => other.to_string(),
222 }
223}
224
225fn resolve_role_system_prompt(
242 step: &Step,
243 roles: &HashMap<String, Role>,
244 resources: &ResourceCollector,
245 vars: &HashMap<String, String>,
246 workflow_dir: &Path,
247) -> Result<Option<String>, ZigError> {
248 let base_prompt: Option<String> = if let Some(ref sp) = step.system_prompt {
250 Some(substitute_vars(sp, vars))
251 } else if let Some(ref role_ref) = step.role {
252 let resolved_name = substitute_vars(role_ref, vars);
253 let role = roles.get(&resolved_name).ok_or_else(|| {
254 ZigError::Execution(format!(
255 "step '{}' references role '{}' which does not exist",
256 step.name, resolved_name
257 ))
258 })?;
259
260 let raw_prompt = if let Some(ref file_path) = role.system_prompt_file {
261 let full_path = workflow_dir.join(file_path);
262 Some(std::fs::read_to_string(&full_path).map_err(|e| {
263 ZigError::Execution(format!(
264 "failed to read system_prompt_file '{}' for role '{}': {e}",
265 full_path.display(),
266 resolved_name
267 ))
268 })?)
269 } else {
270 role.system_prompt.clone()
271 };
272
273 raw_prompt.map(|p| substitute_vars(&p, vars))
274 } else {
275 None
276 };
277
278 let set = resources.collect_for_step(&step.resources)?;
280 let resource_block = render_system_block(&set);
281
282 match (resource_block.is_empty(), base_prompt) {
283 (true, None) => Ok(None),
284 (true, Some(p)) => Ok(Some(p)),
285 (false, None) => Ok(Some(resource_block.trim_end().to_string())),
286 (false, Some(p)) => Ok(Some(format!("{resource_block}{p}"))),
287 }
288}
289
290fn load_file_defaults(
295 vars: &mut HashMap<String, String>,
296 declarations: &HashMap<String, crate::workflow::model::Variable>,
297 workflow_dir: &Path,
298) -> Result<(), ZigError> {
299 for (name, decl) in declarations {
300 if decl.default.is_none() {
301 if let Some(ref file_path) = decl.default_file {
302 let full_path = workflow_dir.join(file_path);
303 let content = std::fs::read_to_string(&full_path).map_err(|e| {
304 ZigError::Execution(format!(
305 "failed to read default_file '{}' for variable '{name}': {e}",
306 full_path.display()
307 ))
308 })?;
309 vars.insert(name.clone(), content);
310 }
311 }
312 }
313 Ok(())
314}
315
316fn evaluate_condition(condition: &str, vars: &HashMap<String, String>) -> Result<bool, ZigError> {
323 let condition = condition.trim();
324
325 let operators = ["<=", ">=", "!=", "==", "<", ">"];
327 for op in &operators {
328 if let Some(pos) = condition.find(op) {
329 let lhs = resolve_operand(condition[..pos].trim(), vars);
330 let rhs = resolve_operand(condition[pos + op.len()..].trim(), vars);
331 return Ok(compare(&lhs, &rhs, op));
332 }
333 }
334
335 let value = vars.get(condition).map(|s| s.as_str()).unwrap_or("");
337 Ok(is_truthy(value))
338}
339
340fn resolve_operand(token: &str, vars: &HashMap<String, String>) -> String {
345 if (token.starts_with('"') && token.ends_with('"'))
347 || (token.starts_with('\'') && token.ends_with('\''))
348 {
349 return token[1..token.len() - 1].to_string();
350 }
351 if let Some(val) = vars.get(token) {
353 return val.clone();
354 }
355 token.to_string()
357}
358
359fn compare(lhs: &str, rhs: &str, op: &str) -> bool {
362 if let (Ok(l), Ok(r)) = (lhs.parse::<f64>(), rhs.parse::<f64>()) {
363 return match op {
364 "==" => (l - r).abs() < f64::EPSILON,
365 "!=" => (l - r).abs() >= f64::EPSILON,
366 "<" => l < r,
367 ">" => l > r,
368 "<=" => l <= r,
369 ">=" => l >= r,
370 _ => false,
371 };
372 }
373 match op {
374 "==" => lhs == rhs,
375 "!=" => lhs != rhs,
376 "<" => lhs < rhs,
377 ">" => lhs > rhs,
378 "<=" => lhs <= rhs,
379 ">=" => lhs >= rhs,
380 _ => false,
381 }
382}
383
384fn is_truthy(value: &str) -> bool {
386 !value.is_empty() && value != "false" && value != "0"
387}
388
389fn render_step_prompt(
392 step: &Step,
393 vars: &HashMap<String, String>,
394 user_prompt: Option<&str>,
395 dependency_outputs: &HashMap<String, String>,
396) -> String {
397 let mut prompt = String::new();
398
399 if let Some(ctx) = user_prompt {
401 prompt.push_str(&format!("User context: {ctx}\n\n"));
402 }
403
404 if step.inject_context {
406 for dep in &step.depends_on {
407 if let Some(output) = dependency_outputs.get(dep) {
408 prompt.push_str(&format!("--- Output from '{dep}' ---\n{output}\n\n"));
409 }
410 }
411 }
412
413 prompt.push_str(&substitute_vars(&step.prompt, vars));
415
416 prompt
417}
418
419fn build_zag_args(
433 step: &Step,
434 prompt: &str,
435 workflow_name: &str,
436 model_override: Option<&str>,
437 rendered_system_prompt: Option<&str>,
438 workflow_provider: Option<&str>,
439 workflow_model: Option<&str>,
440) -> Vec<String> {
441 let session_name = |dep: &str| format!("zig-{workflow_name}-{dep}");
442
443 let (mut args, accepts_agent_args) = match &step.command {
445 None => (vec!["run".to_string(), prompt.to_string()], true),
446 Some(StepCommand::Review) => {
447 let mut a = vec!["review".to_string()];
448 if !prompt.is_empty() {
449 a.push(prompt.to_string());
450 }
451 if step.uncommitted {
452 a.push("--uncommitted".into());
453 }
454 if let Some(base) = &step.base {
455 a.extend(["--base".into(), base.clone()]);
456 }
457 if let Some(commit) = &step.commit {
458 a.extend(["--commit".into(), commit.clone()]);
459 }
460 if let Some(title) = &step.title {
461 a.extend(["--title".into(), title.clone()]);
462 }
463 (a, true)
464 }
465 Some(StepCommand::Plan) => {
466 let mut a = vec!["plan".to_string(), prompt.to_string()];
467 if let Some(output) = &step.plan_output {
468 a.extend(["-o".into(), output.clone()]);
469 }
470 if let Some(instructions) = &step.instructions {
471 a.extend(["--instructions".into(), instructions.clone()]);
472 }
473 (a, true)
474 }
475 Some(StepCommand::Pipe) => {
476 let mut a = vec!["pipe".to_string()];
477 for dep in &step.depends_on {
478 a.push(session_name(dep));
479 }
480 a.push("--".into());
481 a.push(prompt.to_string());
482 (a, true)
483 }
484 Some(StepCommand::Collect) => {
485 let mut a = vec!["collect".to_string()];
486 for dep in &step.depends_on {
487 a.push(session_name(dep));
488 }
489 (a, false)
490 }
491 Some(StepCommand::Summary) => {
492 let mut a = vec!["summary".to_string()];
493 for dep in &step.depends_on {
494 a.push(session_name(dep));
495 }
496 (a, false)
497 }
498 };
499
500 if accepts_agent_args {
503 let effective_provider = step.provider.as_deref().or(workflow_provider);
504 if let Some(provider) = effective_provider {
505 args.extend(["--provider".into(), provider.to_string()]);
506 }
507
508 let effective_model = model_override.or(step.model.as_deref()).or(workflow_model);
509 if let Some(model) = effective_model {
510 args.extend(["--model".into(), model.to_string()]);
511 }
512
513 if let Some(sp) = rendered_system_prompt {
514 args.extend(["--system-prompt".into(), sp.to_string()]);
515 }
516 if let Some(max_turns) = step.max_turns {
517 args.extend(["--max-turns".into(), max_turns.to_string()]);
518 }
519
520 if let Some(output) = &step.output {
522 args.extend(["-o".into(), output.clone()]);
523 } else if step.json {
524 args.push("--json".into());
525 }
526 if let Some(schema) = &step.json_schema {
527 args.extend(["--json-schema".into(), schema.clone()]);
528 }
529
530 if let Some(mcp_config) = &step.mcp_config {
531 args.extend(["--mcp-config".into(), mcp_config.clone()]);
532 }
533
534 if step.auto_approve {
536 args.push("--auto-approve".into());
537 }
538 if let Some(root) = &step.root {
539 args.extend(["--root".into(), root.clone()]);
540 }
541 for dir in &step.add_dirs {
542 args.extend(["--add-dir".into(), dir.clone()]);
543 }
544 for (key, value) in &step.env {
545 args.extend(["--env".into(), format!("{key}={value}")]);
546 }
547 for file in &step.files {
548 args.extend(["--file".into(), file.clone()]);
549 }
550
551 for ctx in &step.context {
553 args.extend(["--context".into(), ctx.clone()]);
554 }
555 if let Some(plan) = &step.plan {
556 args.extend(["--plan".into(), plan.clone()]);
557 }
558
559 if step.worktree {
561 args.push("--worktree".into());
562 }
563 if let Some(sandbox) = &step.sandbox {
564 args.extend(["--sandbox".into(), sandbox.clone()]);
565 }
566 }
567
568 let name = session_name(&step.name);
570 args.extend(["--name".into(), name]);
571
572 if !step.description.is_empty() {
573 args.extend(["--description".into(), step.description.clone()]);
574 }
575
576 args.extend(["--tag".into(), "zig-workflow".into()]);
577 for tag in &step.tags {
578 args.extend(["--tag".into(), tag.clone()]);
579 }
580
581 if let Some(timeout) = &step.timeout {
582 args.extend(["--timeout".into(), timeout.clone()]);
583 }
584
585 args
586}
587
588fn run_zag_streaming(
594 args: &[String],
595 step_name: &str,
596 prefix: Option<&str>,
597 session: Option<&Arc<SessionWriter>>,
598) -> Result<(std::process::ExitStatus, String), ZigError> {
599 let mut cmd = Command::new("zag");
600 cmd.args(args)
601 .stdout(std::process::Stdio::piped())
602 .stderr(std::process::Stdio::piped());
603
604 let mut child = cmd
605 .spawn()
606 .map_err(|e| ZigError::Zag(format!("failed to launch zag for step '{step_name}': {e}")))?;
607
608 let stdout = child.stdout.take().expect("stdout was piped");
609 let stderr = child.stderr.take().expect("stderr was piped");
610
611 let buffer = Arc::new(Mutex::new(String::new()));
612 let buffer_clone = Arc::clone(&buffer);
613 let prefix_stdout = prefix.map(String::from);
614 let prefix_stderr = prefix.map(String::from);
615 let session_stdout = session.cloned();
616 let session_stderr = session.cloned();
617 let step_name_stdout = step_name.to_string();
618 let step_name_stderr = step_name.to_string();
619
620 let stdout_thread = thread::spawn(move || {
621 let reader = BufReader::new(stdout);
622 let stderr_handle = std::io::stderr();
623 for line in reader.lines().map_while(Result::ok) {
624 if let Ok(mut buf) = buffer_clone.lock() {
625 buf.push_str(&line);
626 buf.push('\n');
627 }
628 if let Some(w) = &session_stdout {
629 let _ = w.step_output(&step_name_stdout, OutputStream::Stdout, &line);
630 }
631 let mut h = stderr_handle.lock();
632 let _ = match &prefix_stdout {
633 Some(p) => writeln!(h, "[{p}] {line}"),
634 None => writeln!(h, "{line}"),
635 };
636 }
637 });
638
639 let stderr_thread = thread::spawn(move || {
640 let reader = BufReader::new(stderr);
641 let stderr_handle = std::io::stderr();
642 for line in reader.lines().map_while(Result::ok) {
643 if let Some(w) = &session_stderr {
644 let _ = w.step_output(&step_name_stderr, OutputStream::Stderr, &line);
645 }
646 let mut h = stderr_handle.lock();
647 let _ = match &prefix_stderr {
648 Some(p) => writeln!(h, "[{p}] {line}"),
649 None => writeln!(h, "{line}"),
650 };
651 }
652 });
653
654 let status = child
655 .wait()
656 .map_err(|e| ZigError::Execution(format!("failed to wait for child: {e}")))?;
657
658 let _ = stdout_thread.join();
659 let _ = stderr_thread.join();
660
661 let captured = Arc::try_unwrap(buffer)
662 .map_err(|_| ZigError::Execution("buffer still shared after threads joined".into()))?
663 .into_inner()
664 .map_err(|_| ZigError::Execution("output buffer poisoned".into()))?;
665
666 Ok((status, captured))
667}
668
669#[allow(clippy::too_many_arguments)]
675fn execute_step(
676 step: &Step,
677 prompt: &str,
678 workflow_name: &str,
679 model_override: Option<&str>,
680 prefix: Option<&str>,
681 session: Option<&Arc<SessionWriter>>,
682 rendered_system_prompt: Option<&str>,
683 workflow_provider: Option<&str>,
684 workflow_model: Option<&str>,
685) -> Result<String, ZigError> {
686 let args = build_zag_args(
687 step,
688 prompt,
689 workflow_name,
690 model_override,
691 rendered_system_prompt,
692 workflow_provider,
693 workflow_model,
694 );
695 let (status, stdout) = run_zag_streaming(&args, &step.name, prefix, session)?;
696
697 if !status.success() {
698 return Err(ZigError::Execution(format!(
699 "step '{}' failed (exit {})",
700 step.name, status,
701 )));
702 }
703
704 Ok(stdout)
705}
706
707#[allow(clippy::too_many_arguments)]
712fn run_step_attempts(
713 step: &Step,
714 prompt: &str,
715 workflow_name: &str,
716 prefix: Option<&str>,
717 session: Option<&Arc<SessionWriter>>,
718 rendered_system_prompt: Option<&str>,
719 workflow_provider: Option<&str>,
720 workflow_model: Option<&str>,
721) -> Result<String, ZigError> {
722 let mut attempts = 0;
723 let max_attempts = if step.on_failure.as_ref() == Some(&FailurePolicy::Retry) {
724 step.max_retries.unwrap_or(1) + 1
725 } else {
726 1
727 };
728
729 loop {
730 attempts += 1;
731 let model_override = if attempts > 1 {
732 step.retry_model.as_deref()
733 } else {
734 None
735 };
736 match execute_step(
737 step,
738 prompt,
739 workflow_name,
740 model_override,
741 prefix,
742 session,
743 rendered_system_prompt,
744 workflow_provider,
745 workflow_model,
746 ) {
747 Ok(output) => return Ok(output),
748 Err(e) => {
749 if let Some(w) = session {
750 let _ = w.step_failed(&step.name, None, attempts, &e.to_string());
751 }
752 if attempts < max_attempts {
753 eprintln!(
754 " retry {}/{} for step '{}'",
755 attempts,
756 max_attempts - 1,
757 step.name
758 );
759 continue;
760 }
761 return Err(e);
762 }
763 }
764 }
765}
766
767fn extract_saves(
774 output: &str,
775 saves: &HashMap<String, String>,
776) -> Result<HashMap<String, String>, ZigError> {
777 let mut extracted = HashMap::new();
778
779 for (var_name, selector) in saves {
780 let value = if selector == "$" {
781 output.trim().to_string()
782 } else if let Some(path) = selector.strip_prefix("$.") {
783 let json: serde_json::Value = serde_json::from_str(output.trim()).map_err(|e| {
784 ZigError::Execution(format!(
785 "saves selector '{selector}' requires JSON output, but got parse error: {e}"
786 ))
787 })?;
788 json_path_lookup(&json, path)
789 } else {
790 output.trim().to_string()
791 };
792
793 extracted.insert(var_name.clone(), value);
794 }
795
796 Ok(extracted)
797}
798
799fn partition_tier<'a>(tier: &[&'a Step]) -> (Vec<&'a Step>, HashMap<String, Vec<&'a Step>>) {
804 let mut sequential = Vec::new();
805 let mut race_groups: HashMap<String, Vec<&'a Step>> = HashMap::new();
806
807 for step in tier {
808 if let Some(group) = &step.race_group {
809 race_groups.entry(group.clone()).or_default().push(step);
810 } else {
811 sequential.push(*step);
812 }
813 }
814
815 (sequential, race_groups)
816}
817
818fn spawn_step(
820 step: &Step,
821 prompt: &str,
822 workflow_name: &str,
823 rendered_system_prompt: Option<&str>,
824 workflow_provider: Option<&str>,
825 workflow_model: Option<&str>,
826) -> Result<std::process::Child, ZigError> {
827 let args = build_zag_args(
828 step,
829 prompt,
830 workflow_name,
831 None,
832 rendered_system_prompt,
833 workflow_provider,
834 workflow_model,
835 );
836 let mut cmd = Command::new("zag");
837 cmd.args(&args)
838 .stdout(std::process::Stdio::piped())
839 .stderr(std::process::Stdio::piped());
840
841 cmd.spawn()
842 .map_err(|e| ZigError::Zag(format!("failed to spawn zag for step '{}': {e}", step.name)))
843}
844
845#[allow(clippy::too_many_arguments)]
850fn execute_race_group(
851 steps: &[&Step],
852 prompts: &HashMap<String, String>,
853 system_prompts: &HashMap<String, String>,
854 workflow_name: &str,
855 tier_index: usize,
856 session: Option<&Arc<SessionWriter>>,
857 workflow_provider: Option<&str>,
858 workflow_model: Option<&str>,
859) -> Result<(String, String), ZigError> {
860 if let Some(w) = session {
861 for step in steps {
862 let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
863 let preview = prompts
864 .get(&step.name)
865 .map(|p| prompt_preview(p))
866 .unwrap_or_default();
867 let _ = w.step_started(
868 &step.name,
869 tier_index,
870 &zag_session_id,
871 zag_command_name(&step.command),
872 step.model.as_deref(),
873 &preview,
874 );
875 }
876 }
877 let race_started = Instant::now();
878 let mut children: Vec<(String, std::process::Child)> = Vec::new();
879
880 for step in steps {
881 let prompt = prompts.get(&step.name).ok_or_else(|| {
882 ZigError::Execution(format!("missing prompt for step '{}'", step.name))
883 })?;
884 eprintln!(" racing step '{}'...", step.name);
885 let rendered_sp = system_prompts.get(&step.name).map(|s| s.as_str());
886 let child = spawn_step(
887 step,
888 prompt,
889 workflow_name,
890 rendered_sp,
891 workflow_provider,
892 workflow_model,
893 )?;
894 children.push((step.name.clone(), child));
895 }
896
897 loop {
899 for i in 0..children.len() {
900 let status = children[i]
901 .1
902 .try_wait()
903 .map_err(|e| ZigError::Execution(format!("failed to poll child: {e}")))?;
904
905 if let Some(exit_status) = status {
906 let (winner_name, winner_child) = children.remove(i);
907
908 for (name, mut child) in children {
910 eprintln!(" cancelling step '{name}' (race lost)");
911 let _ = child.kill();
912 let _ = child.wait();
913 }
914
915 let elapsed = race_started.elapsed().as_millis() as u64;
916 if !exit_status.success() {
917 let stderr = winner_child
918 .stderr
919 .map(|mut s| {
920 let mut buf = String::new();
921 std::io::Read::read_to_string(&mut s, &mut buf).ok();
922 buf
923 })
924 .unwrap_or_default();
925 let err_msg = format!(
926 "race winner '{}' failed (exit {}): {}",
927 winner_name,
928 exit_status,
929 stderr.trim()
930 );
931 if let Some(w) = session {
932 let _ = w.step_failed(&winner_name, exit_status.code(), 1, &err_msg);
933 }
934 return Err(ZigError::Execution(err_msg));
935 }
936
937 let stdout = winner_child
938 .stdout
939 .map(|mut s| {
940 let mut buf = String::new();
941 std::io::Read::read_to_string(&mut s, &mut buf).ok();
942 buf
943 })
944 .unwrap_or_default();
945
946 eprintln!(" race won by '{winner_name}'");
947 if let Some(w) = session {
948 let _ = w.step_completed(&winner_name, 0, elapsed, Vec::new());
949 }
950 return Ok((winner_name, stdout));
951 }
952 }
953
954 std::thread::sleep(Duration::from_millis(100));
955 }
956}
957
958#[allow(clippy::too_many_arguments)]
960fn execute_sequential_step(
961 step: &Step,
962 vars: &mut HashMap<String, String>,
963 user_prompt: Option<&str>,
964 step_outputs: &mut HashMap<String, String>,
965 workflow_name: &str,
966 pending_next: &mut Option<String>,
967 tier_index: usize,
968 session: Option<&Arc<SessionWriter>>,
969 roles: &HashMap<String, Role>,
970 resources: &ResourceCollector,
971 workflow_dir: &Path,
972 workflow_provider: Option<&str>,
973 workflow_model: Option<&str>,
974) -> Result<(), ZigError> {
975 if let Some(condition) = &step.condition {
976 if !evaluate_condition(condition, vars)? {
977 eprintln!(
978 " skipping '{}' (condition not met: {condition})",
979 step.name
980 );
981 if let Some(w) = session {
982 let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
983 }
984 return Ok(());
985 }
986 }
987
988 eprintln!(" running step '{}'...", step.name);
989
990 let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
991 let rendered_sp = resolve_role_system_prompt(step, roles, resources, vars, workflow_dir)?;
992 if let Some(w) = session {
993 let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
994 let _ = w.step_started(
995 &step.name,
996 tier_index,
997 &zag_session_id,
998 zag_command_name(&step.command),
999 step.model.as_deref(),
1000 &prompt_preview(&prompt),
1001 );
1002 }
1003 let started = Instant::now();
1004 let result = run_step_attempts(
1005 step,
1006 &prompt,
1007 workflow_name,
1008 None,
1009 session,
1010 rendered_sp.as_deref(),
1011 workflow_provider,
1012 workflow_model,
1013 );
1014
1015 match result {
1016 Ok(output) => {
1017 let mut saved_keys: Vec<String> = Vec::new();
1018 if !step.saves.is_empty() {
1019 let saved = extract_saves(&output, &step.saves)?;
1020 for (k, v) in &saved {
1021 eprintln!(" saved {k} = {v}");
1022 saved_keys.push(k.clone());
1023 }
1024 vars.extend(saved);
1025 }
1026
1027 step_outputs.insert(step.name.clone(), output);
1028 eprintln!(" completed '{}'", step.name);
1029 if let Some(w) = session {
1030 let _ = w.step_completed(
1031 &step.name,
1032 0,
1033 started.elapsed().as_millis() as u64,
1034 saved_keys,
1035 );
1036 }
1037
1038 if step.next.is_some() {
1039 *pending_next = step.next.clone();
1040 }
1041 }
1042 Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1043 FailurePolicy::Fail => return Err(e),
1044 FailurePolicy::Continue => {
1045 eprintln!(" step '{}' failed (continuing): {e}", step.name);
1046 }
1047 FailurePolicy::Retry => {
1048 return Err(e);
1049 }
1050 },
1051 }
1052
1053 Ok(())
1054}
1055
1056#[allow(clippy::too_many_arguments)]
1065fn execute_parallel_tier(
1066 steps: &[&Step],
1067 vars: &mut HashMap<String, String>,
1068 user_prompt: Option<&str>,
1069 step_outputs: &mut HashMap<String, String>,
1070 workflow_name: &str,
1071 pending_next: &mut Option<String>,
1072 tier_index: usize,
1073 session: Option<&Arc<SessionWriter>>,
1074 roles: &HashMap<String, Role>,
1075 resources: &ResourceCollector,
1076 workflow_dir: &Path,
1077 workflow_provider: Option<&str>,
1078 workflow_model: Option<&str>,
1079) -> Result<(), ZigError> {
1080 let mut active: Vec<&Step> = Vec::new();
1083 let mut prompts: HashMap<String, String> = HashMap::new();
1084 let mut rendered_sps: HashMap<String, String> = HashMap::new();
1085 for step in steps {
1086 if let Some(condition) = &step.condition {
1087 if !evaluate_condition(condition, vars)? {
1088 eprintln!(
1089 " skipping '{}' (condition not met: {condition})",
1090 step.name
1091 );
1092 if let Some(w) = session {
1093 let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
1094 }
1095 continue;
1096 }
1097 }
1098 let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
1099 prompts.insert(step.name.clone(), prompt);
1100 if let Some(sp) = resolve_role_system_prompt(step, roles, resources, vars, workflow_dir)? {
1101 rendered_sps.insert(step.name.clone(), sp);
1102 }
1103 active.push(*step);
1104 }
1105
1106 if active.is_empty() {
1107 return Ok(());
1108 }
1109
1110 eprintln!(" running {} steps in parallel...", active.len());
1111
1112 let mut start_times: HashMap<String, Instant> = HashMap::new();
1113 let mut handles: Vec<thread::JoinHandle<(String, Result<String, ZigError>)>> = Vec::new();
1114 for step in &active {
1115 let step_clone: Step = (*step).clone();
1116 let prompt = prompts.remove(&step.name).unwrap_or_default();
1117 let rendered_sp = rendered_sps.remove(&step.name);
1118 let workflow_name_owned = workflow_name.to_string();
1119 let name = step.name.clone();
1120 eprintln!(" starting '{name}'...");
1121 if let Some(w) = session {
1122 let zag_session_id = format!("zig-{workflow_name}-{name}");
1123 let _ = w.step_started(
1124 &name,
1125 tier_index,
1126 &zag_session_id,
1127 zag_command_name(&step.command),
1128 step.model.as_deref(),
1129 &prompt_preview(&prompt),
1130 );
1131 }
1132 start_times.insert(name.clone(), Instant::now());
1133 let session_clone = session.cloned();
1134 let wf_provider = workflow_provider.map(String::from);
1135 let wf_model = workflow_model.map(String::from);
1136 let handle = thread::spawn(move || {
1137 let res = run_step_attempts(
1138 &step_clone,
1139 &prompt,
1140 &workflow_name_owned,
1141 Some(&name),
1142 session_clone.as_ref(),
1143 rendered_sp.as_deref(),
1144 wf_provider.as_deref(),
1145 wf_model.as_deref(),
1146 );
1147 (name, res)
1148 });
1149 handles.push(handle);
1150 }
1151
1152 let mut results: HashMap<String, Result<String, ZigError>> = HashMap::new();
1153 for handle in handles {
1154 match handle.join() {
1155 Ok((name, res)) => {
1156 results.insert(name, res);
1157 }
1158 Err(_) => {
1159 return Err(ZigError::Execution("parallel step thread panicked".into()));
1160 }
1161 }
1162 }
1163
1164 let mut errors: Vec<String> = Vec::new();
1166 for step in &active {
1167 let Some(res) = results.remove(&step.name) else {
1168 continue;
1169 };
1170 let elapsed = start_times
1171 .remove(&step.name)
1172 .map(|t| t.elapsed().as_millis() as u64)
1173 .unwrap_or(0);
1174 match res {
1175 Ok(output) => {
1176 let mut saved_keys: Vec<String> = Vec::new();
1177 if !step.saves.is_empty() {
1178 let saved = extract_saves(&output, &step.saves)?;
1179 for (k, v) in &saved {
1180 eprintln!(" saved {k} = {v}");
1181 saved_keys.push(k.clone());
1182 }
1183 vars.extend(saved);
1184 }
1185 step_outputs.insert(step.name.clone(), output);
1186 eprintln!(" completed '{}'", step.name);
1187 if let Some(w) = session {
1188 let _ = w.step_completed(&step.name, 0, elapsed, saved_keys);
1189 }
1190 if step.next.is_some() && pending_next.is_none() {
1191 *pending_next = step.next.clone();
1192 }
1193 }
1194 Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1195 FailurePolicy::Continue => {
1196 eprintln!(" step '{}' failed (continuing): {e}", step.name);
1197 }
1198 FailurePolicy::Fail | FailurePolicy::Retry => {
1199 errors.push(format!("'{}': {e}", step.name));
1200 }
1201 },
1202 }
1203 }
1204
1205 if !errors.is_empty() {
1206 return Err(ZigError::Execution(format!(
1207 "parallel step(s) failed: {}",
1208 errors.join("; ")
1209 )));
1210 }
1211
1212 Ok(())
1213}
1214
1215fn init_vars(workflow: &Workflow) -> HashMap<String, String> {
1218 let mut vars = HashMap::new();
1219 for (name, var) in &workflow.vars {
1220 let value = match &var.default {
1221 Some(toml::Value::String(s)) => s.clone(),
1222 Some(toml::Value::Integer(n)) => n.to_string(),
1223 Some(toml::Value::Float(f)) => f.to_string(),
1224 Some(toml::Value::Boolean(b)) => b.to_string(),
1225 Some(other) => other.to_string(),
1226 None => String::new(),
1227 };
1228 vars.insert(name.clone(), value);
1229 }
1230 vars
1231}
1232
1233fn execute(
1235 workflow: &Workflow,
1236 workflow_path: &std::path::Path,
1237 user_prompt: Option<&str>,
1238 workflow_dir: &Path,
1239 disable_resources: bool,
1240) -> Result<(), ZigError> {
1241 let mut vars = init_vars(workflow);
1242
1243 let resource_collector = ResourceCollector::from_env(
1244 &workflow.workflow.name,
1245 &workflow.workflow.resources,
1246 workflow_dir,
1247 disable_resources,
1248 );
1249
1250 load_file_defaults(&mut vars, &workflow.vars, workflow_dir)?;
1252
1253 let prompt_var = workflow
1255 .vars
1256 .iter()
1257 .find(|(_, v)| v.from.as_deref() == Some("prompt"))
1258 .map(|(name, _)| name.clone());
1259
1260 if let Some(ref var_name) = prompt_var {
1261 if let Some(prompt) = user_prompt {
1262 vars.insert(var_name.clone(), prompt.to_string());
1263 }
1264 }
1265
1266 if let Err(errors) = validate::validate_var_values(&vars, &workflow.vars) {
1268 let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
1269 return Err(ZigError::Validation(msgs.join("; ")));
1270 }
1271
1272 let effective_user_prompt = if prompt_var.is_some() {
1274 None
1275 } else {
1276 user_prompt
1277 };
1278
1279 let mut step_outputs: HashMap<String, String> = HashMap::new();
1280
1281 let wf_provider = workflow.workflow.provider.as_deref();
1282 let wf_model = workflow.workflow.model.as_deref();
1283
1284 let tiers = topological_sort(&workflow.steps)?;
1285
1286 eprintln!(
1287 "running workflow '{}' ({} steps in {} tiers)",
1288 workflow.workflow.name,
1289 workflow.steps.len(),
1290 tiers.len()
1291 );
1292
1293 let coordinator = match SessionWriter::create(
1298 &workflow.workflow.name,
1299 &workflow_path.to_string_lossy(),
1300 user_prompt,
1301 tiers.len(),
1302 ) {
1303 Ok(writer) => {
1304 eprintln!("zig session: {}", writer.session_id());
1305 Some(SessionCoordinator::start(writer))
1306 }
1307 Err(e) => {
1308 eprintln!("warning: failed to open zig session log: {e}");
1309 None
1310 }
1311 };
1312 let session_writer: Option<Arc<SessionWriter>> = coordinator.as_ref().map(|c| c.writer());
1313 let session_ref = session_writer.as_ref();
1314
1315 let mut iteration = 0;
1316 let mut pending_next: Option<String> = None;
1317
1318 loop {
1319 let tiers_to_run = if let Some(ref next_step) = pending_next {
1320 let remaining: Vec<Vec<&Step>> = tiers
1322 .iter()
1323 .map(|tier| {
1324 tier.iter()
1325 .filter(|s| s.name == *next_step)
1326 .copied()
1327 .collect::<Vec<_>>()
1328 })
1329 .filter(|tier| !tier.is_empty())
1330 .collect();
1331 pending_next = None;
1332 remaining
1333 } else if iteration == 0 {
1334 tiers.clone()
1335 } else {
1336 break;
1337 };
1338
1339 for (tier_index, tier) in tiers_to_run.iter().enumerate() {
1340 let (non_race, race_groups) = partition_tier(tier);
1341
1342 if let Some(w) = session_ref {
1343 let names: Vec<String> = tier.iter().map(|s| s.name.clone()).collect();
1344 let _ = w.tier_started(tier_index, names);
1345 }
1346
1347 if non_race.len() <= 1 {
1351 for step in &non_race {
1352 execute_sequential_step(
1353 step,
1354 &mut vars,
1355 effective_user_prompt,
1356 &mut step_outputs,
1357 &workflow.workflow.name,
1358 &mut pending_next,
1359 tier_index,
1360 session_ref,
1361 &workflow.roles,
1362 &resource_collector,
1363 workflow_dir,
1364 wf_provider,
1365 wf_model,
1366 )?;
1367 }
1368 } else {
1369 execute_parallel_tier(
1370 &non_race,
1371 &mut vars,
1372 effective_user_prompt,
1373 &mut step_outputs,
1374 &workflow.workflow.name,
1375 &mut pending_next,
1376 tier_index,
1377 session_ref,
1378 &workflow.roles,
1379 &resource_collector,
1380 workflow_dir,
1381 wf_provider,
1382 wf_model,
1383 )?;
1384 }
1385
1386 for (group_name, race_steps) in &race_groups {
1388 eprintln!(" starting race group '{group_name}'...");
1389
1390 let mut prompts = HashMap::new();
1392 let mut race_sps: HashMap<String, String> = HashMap::new();
1393 let mut active_steps: Vec<&Step> = Vec::new();
1394 for step in race_steps {
1395 if let Some(condition) = &step.condition {
1396 if !evaluate_condition(condition, &vars)? {
1397 eprintln!(
1398 " skipping '{}' (condition not met: {condition})",
1399 step.name
1400 );
1401 continue;
1402 }
1403 }
1404 let prompt =
1405 render_step_prompt(step, &vars, effective_user_prompt, &step_outputs);
1406 prompts.insert(step.name.clone(), prompt);
1407 if let Some(sp) = resolve_role_system_prompt(
1408 step,
1409 &workflow.roles,
1410 &resource_collector,
1411 &vars,
1412 workflow_dir,
1413 )? {
1414 race_sps.insert(step.name.clone(), sp);
1415 }
1416 active_steps.push(step);
1417 }
1418
1419 if active_steps.is_empty() {
1420 continue;
1421 }
1422
1423 match execute_race_group(
1424 &active_steps,
1425 &prompts,
1426 &race_sps,
1427 &workflow.workflow.name,
1428 tier_index,
1429 session_ref,
1430 wf_provider,
1431 wf_model,
1432 ) {
1433 Ok((winner_name, output)) => {
1434 if let Some(winner) = active_steps.iter().find(|s| s.name == winner_name) {
1436 if !winner.saves.is_empty() {
1437 let saved = extract_saves(&output, &winner.saves)?;
1438 for (k, v) in &saved {
1439 eprintln!(" saved {k} = {v}");
1440 }
1441 vars.extend(saved);
1442 }
1443 if winner.next.is_some() {
1444 pending_next = winner.next.clone();
1445 }
1446 }
1447 step_outputs.insert(winner_name.clone(), output);
1448 eprintln!(
1449 " completed race group '{group_name}' (winner: '{winner_name}')"
1450 );
1451 }
1452 Err(e) => return Err(e),
1453 }
1454 }
1455 }
1456
1457 iteration += 1;
1458 if pending_next.is_none() || iteration >= MAX_LOOP_ITERATIONS {
1459 if iteration >= MAX_LOOP_ITERATIONS {
1460 eprintln!("warning: reached maximum loop iterations ({MAX_LOOP_ITERATIONS})");
1461 }
1462 break;
1463 }
1464 }
1465
1466 eprintln!("workflow '{}' completed", workflow.workflow.name);
1467 if let Some(c) = coordinator {
1468 let _ = c.finish(SessionStatus::Success);
1469 }
1470 Ok(())
1471}
1472
1473fn zag_command_name(cmd: &Option<StepCommand>) -> &'static str {
1476 match cmd {
1477 None => "run",
1478 Some(StepCommand::Review) => "review",
1479 Some(StepCommand::Plan) => "plan",
1480 Some(StepCommand::Pipe) => "pipe",
1481 Some(StepCommand::Collect) => "collect",
1482 Some(StepCommand::Summary) => "summary",
1483 }
1484}
1485
1486fn prompt_preview(prompt: &str) -> String {
1488 const MAX: usize = 200;
1489 let collapsed: String = prompt
1490 .chars()
1491 .map(|c| if c == '\n' { ' ' } else { c })
1492 .collect();
1493 if collapsed.chars().count() <= MAX {
1494 collapsed
1495 } else {
1496 let truncated: String = collapsed.chars().take(MAX).collect();
1497 format!("{truncated}…")
1498 }
1499}
1500
1501#[cfg(test)]
1502#[path = "run_tests.rs"]
1503mod tests;