1use std::collections::HashMap;
2use std::io::{BufRead, BufReader, Write};
3use std::path::{Path, PathBuf};
4use std::process::Command;
5use std::sync::{Arc, Mutex};
6use std::thread;
7use std::time::Duration;
8
9use std::time::Instant;
10
11use crate::error::ZigError;
12use crate::session::{OutputStream, SessionCoordinator, SessionStatus, SessionWriter};
13use crate::workflow::model::{FailurePolicy, Role, Step, StepCommand, Workflow};
14use crate::workflow::{parser, validate};
15
16const MAX_LOOP_ITERATIONS: usize = 100;
18
19pub fn run_workflow(workflow_path: &str, user_prompt: Option<&str>) -> Result<(), ZigError> {
25 check_zag()?;
26
27 let path = resolve_workflow_path(workflow_path)?;
28 let (workflow, source) = parser::parse_workflow(&path)?;
29
30 if let Err(errors) = validate::validate(&workflow) {
31 let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
32 return Err(ZigError::Validation(msgs.join("; ")));
33 }
34
35 execute(&workflow, &path, user_prompt, source.dir())
36}
37
38pub(crate) fn check_zag() -> Result<(), ZigError> {
40 let zag_available = Command::new("zag")
41 .arg("--version")
42 .output()
43 .is_ok_and(|o| o.status.success());
44
45 if !zag_available {
46 return Err(ZigError::Zag(
47 "zag is not installed or not in PATH. Install it from https://github.com/niclaslindstedt/zag".into(),
48 ));
49 }
50 Ok(())
51}
52
53pub fn resolve_workflow_path(workflow: &str) -> Result<PathBuf, ZigError> {
61 let mut candidates = vec![
62 PathBuf::from(workflow),
63 PathBuf::from(format!("{workflow}.zug")),
64 PathBuf::from(format!("workflows/{workflow}")),
65 PathBuf::from(format!("workflows/{workflow}.zug")),
66 ];
67
68 if let Some(global_dir) = crate::paths::global_workflows_dir() {
69 candidates.push(global_dir.join(workflow));
70 candidates.push(global_dir.join(format!("{workflow}.zug")));
71 }
72
73 for candidate in &candidates {
74 if candidate.exists() {
75 return Ok(candidate.clone());
76 }
77 }
78
79 Err(ZigError::Io(format!(
80 "workflow not found: '{workflow}' (tried: {})",
81 candidates
82 .iter()
83 .map(|p| p.display().to_string())
84 .collect::<Vec<_>>()
85 .join(", ")
86 )))
87}
88
89fn topological_sort(steps: &[Step]) -> Result<Vec<Vec<&Step>>, ZigError> {
95 let step_index: HashMap<&str, usize> = steps
96 .iter()
97 .enumerate()
98 .map(|(i, s)| (s.name.as_str(), i))
99 .collect();
100
101 let mut in_degree = vec![0usize; steps.len()];
102 for (i, step) in steps.iter().enumerate() {
103 for dep in &step.depends_on {
104 if step_index.contains_key(dep.as_str()) {
105 in_degree[i] += 1;
106 }
107 }
108 }
109
110 let mut tiers = Vec::new();
111 let mut remaining = in_degree.clone();
112 let mut completed: Vec<bool> = vec![false; steps.len()];
113
114 loop {
115 let tier: Vec<usize> = (0..steps.len())
116 .filter(|&i| !completed[i] && remaining[i] == 0)
117 .collect();
118
119 if tier.is_empty() {
120 break;
121 }
122
123 for &i in &tier {
124 completed[i] = true;
125 }
126
127 for &i in &tier {
129 for (j, step) in steps.iter().enumerate() {
130 if !completed[j] && step.depends_on.contains(&steps[i].name) {
131 remaining[j] -= 1;
132 }
133 }
134 }
135
136 tiers.push(tier.iter().map(|&i| &steps[i]).collect());
137 }
138
139 let completed_count: usize = completed.iter().filter(|&&c| c).count();
140 if completed_count != steps.len() {
141 return Err(ZigError::Execution(
142 "could not resolve all steps — possible undetected cycle".into(),
143 ));
144 }
145
146 Ok(tiers)
147}
148
149fn substitute_vars(template: &str, vars: &HashMap<String, String>) -> String {
155 let mut result = String::with_capacity(template.len());
156 let mut rest = template;
157
158 while let Some(start) = rest.find("${") {
159 result.push_str(&rest[..start]);
160 let after_start = &rest[start + 2..];
161
162 if let Some(end) = after_start.find('}') {
163 let var_expr = &after_start[..end];
164 let mut parts = var_expr.splitn(2, '.');
165 let root = parts.next().unwrap_or(var_expr);
166
167 if let Some(value) = vars.get(root) {
168 if let Some(path) = parts.next() {
169 if let Ok(json) = serde_json::from_str::<serde_json::Value>(value) {
171 let resolved = json_path_lookup(&json, path);
172 result.push_str(&resolved);
173 } else {
174 result.push_str(value);
175 }
176 } else {
177 result.push_str(value);
178 }
179 } else {
180 result.push_str(&rest[start..start + 2 + end + 1]);
182 }
183
184 rest = &after_start[end + 1..];
185 } else {
186 result.push_str(&rest[start..]);
187 rest = "";
188 }
189 }
190
191 result.push_str(rest);
192 result
193}
194
195fn json_path_lookup(value: &serde_json::Value, path: &str) -> String {
197 let mut current = value;
198 for key in path.split('.') {
199 match current.get(key) {
200 Some(v) => current = v,
201 None => return format!("${{?.{path}}}"),
202 }
203 }
204 match current {
205 serde_json::Value::String(s) => s.clone(),
206 other => other.to_string(),
207 }
208}
209
210fn resolve_role_system_prompt(
219 step: &Step,
220 roles: &HashMap<String, Role>,
221 vars: &HashMap<String, String>,
222 workflow_dir: &Path,
223) -> Result<Option<String>, ZigError> {
224 if let Some(ref sp) = step.system_prompt {
226 return Ok(Some(substitute_vars(sp, vars)));
227 }
228
229 if let Some(ref role_ref) = step.role {
231 let resolved_name = substitute_vars(role_ref, vars);
232 let role = roles.get(&resolved_name).ok_or_else(|| {
233 ZigError::Execution(format!(
234 "step '{}' references role '{}' which does not exist",
235 step.name, resolved_name
236 ))
237 })?;
238
239 let raw_prompt = if let Some(ref file_path) = role.system_prompt_file {
241 let full_path = workflow_dir.join(file_path);
242 std::fs::read_to_string(&full_path).map_err(|e| {
243 ZigError::Execution(format!(
244 "failed to read system_prompt_file '{}' for role '{}': {e}",
245 full_path.display(),
246 resolved_name
247 ))
248 })?
249 } else if let Some(ref sp) = role.system_prompt {
250 sp.clone()
251 } else {
252 return Ok(None);
253 };
254
255 return Ok(Some(substitute_vars(&raw_prompt, vars)));
256 }
257
258 Ok(None)
259}
260
261fn load_file_defaults(
266 vars: &mut HashMap<String, String>,
267 declarations: &HashMap<String, crate::workflow::model::Variable>,
268 workflow_dir: &Path,
269) -> Result<(), ZigError> {
270 for (name, decl) in declarations {
271 if decl.default.is_none() {
272 if let Some(ref file_path) = decl.default_file {
273 let full_path = workflow_dir.join(file_path);
274 let content = std::fs::read_to_string(&full_path).map_err(|e| {
275 ZigError::Execution(format!(
276 "failed to read default_file '{}' for variable '{name}': {e}",
277 full_path.display()
278 ))
279 })?;
280 vars.insert(name.clone(), content);
281 }
282 }
283 }
284 Ok(())
285}
286
287fn evaluate_condition(condition: &str, vars: &HashMap<String, String>) -> Result<bool, ZigError> {
294 let condition = condition.trim();
295
296 let operators = ["<=", ">=", "!=", "==", "<", ">"];
298 for op in &operators {
299 if let Some(pos) = condition.find(op) {
300 let lhs = resolve_operand(condition[..pos].trim(), vars);
301 let rhs = resolve_operand(condition[pos + op.len()..].trim(), vars);
302 return Ok(compare(&lhs, &rhs, op));
303 }
304 }
305
306 let value = vars.get(condition).map(|s| s.as_str()).unwrap_or("");
308 Ok(is_truthy(value))
309}
310
311fn resolve_operand(token: &str, vars: &HashMap<String, String>) -> String {
316 if (token.starts_with('"') && token.ends_with('"'))
318 || (token.starts_with('\'') && token.ends_with('\''))
319 {
320 return token[1..token.len() - 1].to_string();
321 }
322 if let Some(val) = vars.get(token) {
324 return val.clone();
325 }
326 token.to_string()
328}
329
330fn compare(lhs: &str, rhs: &str, op: &str) -> bool {
333 if let (Ok(l), Ok(r)) = (lhs.parse::<f64>(), rhs.parse::<f64>()) {
334 return match op {
335 "==" => (l - r).abs() < f64::EPSILON,
336 "!=" => (l - r).abs() >= f64::EPSILON,
337 "<" => l < r,
338 ">" => l > r,
339 "<=" => l <= r,
340 ">=" => l >= r,
341 _ => false,
342 };
343 }
344 match op {
345 "==" => lhs == rhs,
346 "!=" => lhs != rhs,
347 "<" => lhs < rhs,
348 ">" => lhs > rhs,
349 "<=" => lhs <= rhs,
350 ">=" => lhs >= rhs,
351 _ => false,
352 }
353}
354
355fn is_truthy(value: &str) -> bool {
357 !value.is_empty() && value != "false" && value != "0"
358}
359
360fn render_step_prompt(
363 step: &Step,
364 vars: &HashMap<String, String>,
365 user_prompt: Option<&str>,
366 dependency_outputs: &HashMap<String, String>,
367) -> String {
368 let mut prompt = String::new();
369
370 if let Some(ctx) = user_prompt {
372 prompt.push_str(&format!("User context: {ctx}\n\n"));
373 }
374
375 if step.inject_context {
377 for dep in &step.depends_on {
378 if let Some(output) = dependency_outputs.get(dep) {
379 prompt.push_str(&format!("--- Output from '{dep}' ---\n{output}\n\n"));
380 }
381 }
382 }
383
384 prompt.push_str(&substitute_vars(&step.prompt, vars));
386
387 prompt
388}
389
390fn build_zag_args(
404 step: &Step,
405 prompt: &str,
406 workflow_name: &str,
407 model_override: Option<&str>,
408 rendered_system_prompt: Option<&str>,
409 workflow_provider: Option<&str>,
410 workflow_model: Option<&str>,
411) -> Vec<String> {
412 let session_name = |dep: &str| format!("zig-{workflow_name}-{dep}");
413
414 let (mut args, accepts_agent_args) = match &step.command {
416 None => (vec!["run".to_string(), prompt.to_string()], true),
417 Some(StepCommand::Review) => {
418 let mut a = vec!["review".to_string()];
419 if !prompt.is_empty() {
420 a.push(prompt.to_string());
421 }
422 if step.uncommitted {
423 a.push("--uncommitted".into());
424 }
425 if let Some(base) = &step.base {
426 a.extend(["--base".into(), base.clone()]);
427 }
428 if let Some(commit) = &step.commit {
429 a.extend(["--commit".into(), commit.clone()]);
430 }
431 if let Some(title) = &step.title {
432 a.extend(["--title".into(), title.clone()]);
433 }
434 (a, true)
435 }
436 Some(StepCommand::Plan) => {
437 let mut a = vec!["plan".to_string(), prompt.to_string()];
438 if let Some(output) = &step.plan_output {
439 a.extend(["-o".into(), output.clone()]);
440 }
441 if let Some(instructions) = &step.instructions {
442 a.extend(["--instructions".into(), instructions.clone()]);
443 }
444 (a, true)
445 }
446 Some(StepCommand::Pipe) => {
447 let mut a = vec!["pipe".to_string()];
448 for dep in &step.depends_on {
449 a.push(session_name(dep));
450 }
451 a.push("--".into());
452 a.push(prompt.to_string());
453 (a, true)
454 }
455 Some(StepCommand::Collect) => {
456 let mut a = vec!["collect".to_string()];
457 for dep in &step.depends_on {
458 a.push(session_name(dep));
459 }
460 (a, false)
461 }
462 Some(StepCommand::Summary) => {
463 let mut a = vec!["summary".to_string()];
464 for dep in &step.depends_on {
465 a.push(session_name(dep));
466 }
467 (a, false)
468 }
469 };
470
471 if accepts_agent_args {
474 let effective_provider = step.provider.as_deref().or(workflow_provider);
475 if let Some(provider) = effective_provider {
476 args.extend(["--provider".into(), provider.to_string()]);
477 }
478
479 let effective_model = model_override.or(step.model.as_deref()).or(workflow_model);
480 if let Some(model) = effective_model {
481 args.extend(["--model".into(), model.to_string()]);
482 }
483
484 if let Some(sp) = rendered_system_prompt {
485 args.extend(["--system-prompt".into(), sp.to_string()]);
486 }
487 if let Some(max_turns) = step.max_turns {
488 args.extend(["--max-turns".into(), max_turns.to_string()]);
489 }
490
491 if let Some(output) = &step.output {
493 args.extend(["-o".into(), output.clone()]);
494 } else if step.json {
495 args.push("--json".into());
496 }
497 if let Some(schema) = &step.json_schema {
498 args.extend(["--json-schema".into(), schema.clone()]);
499 }
500
501 if let Some(mcp_config) = &step.mcp_config {
502 args.extend(["--mcp-config".into(), mcp_config.clone()]);
503 }
504
505 if step.auto_approve {
507 args.push("--auto-approve".into());
508 }
509 if let Some(root) = &step.root {
510 args.extend(["--root".into(), root.clone()]);
511 }
512 for dir in &step.add_dirs {
513 args.extend(["--add-dir".into(), dir.clone()]);
514 }
515 for (key, value) in &step.env {
516 args.extend(["--env".into(), format!("{key}={value}")]);
517 }
518 for file in &step.files {
519 args.extend(["--file".into(), file.clone()]);
520 }
521
522 for ctx in &step.context {
524 args.extend(["--context".into(), ctx.clone()]);
525 }
526 if let Some(plan) = &step.plan {
527 args.extend(["--plan".into(), plan.clone()]);
528 }
529
530 if step.worktree {
532 args.push("--worktree".into());
533 }
534 if let Some(sandbox) = &step.sandbox {
535 args.extend(["--sandbox".into(), sandbox.clone()]);
536 }
537 }
538
539 let name = session_name(&step.name);
541 args.extend(["--name".into(), name]);
542
543 if !step.description.is_empty() {
544 args.extend(["--description".into(), step.description.clone()]);
545 }
546
547 args.extend(["--tag".into(), "zig-workflow".into()]);
548 for tag in &step.tags {
549 args.extend(["--tag".into(), tag.clone()]);
550 }
551
552 if let Some(timeout) = &step.timeout {
553 args.extend(["--timeout".into(), timeout.clone()]);
554 }
555
556 args
557}
558
559fn run_zag_streaming(
565 args: &[String],
566 step_name: &str,
567 prefix: Option<&str>,
568 session: Option<&Arc<SessionWriter>>,
569) -> Result<(std::process::ExitStatus, String), ZigError> {
570 let mut cmd = Command::new("zag");
571 cmd.args(args)
572 .stdout(std::process::Stdio::piped())
573 .stderr(std::process::Stdio::piped());
574
575 let mut child = cmd
576 .spawn()
577 .map_err(|e| ZigError::Zag(format!("failed to launch zag for step '{step_name}': {e}")))?;
578
579 let stdout = child.stdout.take().expect("stdout was piped");
580 let stderr = child.stderr.take().expect("stderr was piped");
581
582 let buffer = Arc::new(Mutex::new(String::new()));
583 let buffer_clone = Arc::clone(&buffer);
584 let prefix_stdout = prefix.map(String::from);
585 let prefix_stderr = prefix.map(String::from);
586 let session_stdout = session.cloned();
587 let session_stderr = session.cloned();
588 let step_name_stdout = step_name.to_string();
589 let step_name_stderr = step_name.to_string();
590
591 let stdout_thread = thread::spawn(move || {
592 let reader = BufReader::new(stdout);
593 let stderr_handle = std::io::stderr();
594 for line in reader.lines().map_while(Result::ok) {
595 if let Ok(mut buf) = buffer_clone.lock() {
596 buf.push_str(&line);
597 buf.push('\n');
598 }
599 if let Some(w) = &session_stdout {
600 let _ = w.step_output(&step_name_stdout, OutputStream::Stdout, &line);
601 }
602 let mut h = stderr_handle.lock();
603 let _ = match &prefix_stdout {
604 Some(p) => writeln!(h, "[{p}] {line}"),
605 None => writeln!(h, "{line}"),
606 };
607 }
608 });
609
610 let stderr_thread = thread::spawn(move || {
611 let reader = BufReader::new(stderr);
612 let stderr_handle = std::io::stderr();
613 for line in reader.lines().map_while(Result::ok) {
614 if let Some(w) = &session_stderr {
615 let _ = w.step_output(&step_name_stderr, OutputStream::Stderr, &line);
616 }
617 let mut h = stderr_handle.lock();
618 let _ = match &prefix_stderr {
619 Some(p) => writeln!(h, "[{p}] {line}"),
620 None => writeln!(h, "{line}"),
621 };
622 }
623 });
624
625 let status = child
626 .wait()
627 .map_err(|e| ZigError::Execution(format!("failed to wait for child: {e}")))?;
628
629 let _ = stdout_thread.join();
630 let _ = stderr_thread.join();
631
632 let captured = Arc::try_unwrap(buffer)
633 .map_err(|_| ZigError::Execution("buffer still shared after threads joined".into()))?
634 .into_inner()
635 .map_err(|_| ZigError::Execution("output buffer poisoned".into()))?;
636
637 Ok((status, captured))
638}
639
640#[allow(clippy::too_many_arguments)]
646fn execute_step(
647 step: &Step,
648 prompt: &str,
649 workflow_name: &str,
650 model_override: Option<&str>,
651 prefix: Option<&str>,
652 session: Option<&Arc<SessionWriter>>,
653 rendered_system_prompt: Option<&str>,
654 workflow_provider: Option<&str>,
655 workflow_model: Option<&str>,
656) -> Result<String, ZigError> {
657 let args = build_zag_args(
658 step,
659 prompt,
660 workflow_name,
661 model_override,
662 rendered_system_prompt,
663 workflow_provider,
664 workflow_model,
665 );
666 let (status, stdout) = run_zag_streaming(&args, &step.name, prefix, session)?;
667
668 if !status.success() {
669 return Err(ZigError::Execution(format!(
670 "step '{}' failed (exit {})",
671 step.name, status,
672 )));
673 }
674
675 Ok(stdout)
676}
677
678#[allow(clippy::too_many_arguments)]
683fn run_step_attempts(
684 step: &Step,
685 prompt: &str,
686 workflow_name: &str,
687 prefix: Option<&str>,
688 session: Option<&Arc<SessionWriter>>,
689 rendered_system_prompt: Option<&str>,
690 workflow_provider: Option<&str>,
691 workflow_model: Option<&str>,
692) -> Result<String, ZigError> {
693 let mut attempts = 0;
694 let max_attempts = if step.on_failure.as_ref() == Some(&FailurePolicy::Retry) {
695 step.max_retries.unwrap_or(1) + 1
696 } else {
697 1
698 };
699
700 loop {
701 attempts += 1;
702 let model_override = if attempts > 1 {
703 step.retry_model.as_deref()
704 } else {
705 None
706 };
707 match execute_step(
708 step,
709 prompt,
710 workflow_name,
711 model_override,
712 prefix,
713 session,
714 rendered_system_prompt,
715 workflow_provider,
716 workflow_model,
717 ) {
718 Ok(output) => return Ok(output),
719 Err(e) => {
720 if let Some(w) = session {
721 let _ = w.step_failed(&step.name, None, attempts, &e.to_string());
722 }
723 if attempts < max_attempts {
724 eprintln!(
725 " retry {}/{} for step '{}'",
726 attempts,
727 max_attempts - 1,
728 step.name
729 );
730 continue;
731 }
732 return Err(e);
733 }
734 }
735 }
736}
737
738fn extract_saves(
745 output: &str,
746 saves: &HashMap<String, String>,
747) -> Result<HashMap<String, String>, ZigError> {
748 let mut extracted = HashMap::new();
749
750 for (var_name, selector) in saves {
751 let value = if selector == "$" {
752 output.trim().to_string()
753 } else if let Some(path) = selector.strip_prefix("$.") {
754 let json: serde_json::Value = serde_json::from_str(output.trim()).map_err(|e| {
755 ZigError::Execution(format!(
756 "saves selector '{selector}' requires JSON output, but got parse error: {e}"
757 ))
758 })?;
759 json_path_lookup(&json, path)
760 } else {
761 output.trim().to_string()
762 };
763
764 extracted.insert(var_name.clone(), value);
765 }
766
767 Ok(extracted)
768}
769
770fn partition_tier<'a>(tier: &[&'a Step]) -> (Vec<&'a Step>, HashMap<String, Vec<&'a Step>>) {
775 let mut sequential = Vec::new();
776 let mut race_groups: HashMap<String, Vec<&'a Step>> = HashMap::new();
777
778 for step in tier {
779 if let Some(group) = &step.race_group {
780 race_groups.entry(group.clone()).or_default().push(step);
781 } else {
782 sequential.push(*step);
783 }
784 }
785
786 (sequential, race_groups)
787}
788
789fn spawn_step(
791 step: &Step,
792 prompt: &str,
793 workflow_name: &str,
794 rendered_system_prompt: Option<&str>,
795 workflow_provider: Option<&str>,
796 workflow_model: Option<&str>,
797) -> Result<std::process::Child, ZigError> {
798 let args = build_zag_args(
799 step,
800 prompt,
801 workflow_name,
802 None,
803 rendered_system_prompt,
804 workflow_provider,
805 workflow_model,
806 );
807 let mut cmd = Command::new("zag");
808 cmd.args(&args)
809 .stdout(std::process::Stdio::piped())
810 .stderr(std::process::Stdio::piped());
811
812 cmd.spawn()
813 .map_err(|e| ZigError::Zag(format!("failed to spawn zag for step '{}': {e}", step.name)))
814}
815
816#[allow(clippy::too_many_arguments)]
821fn execute_race_group(
822 steps: &[&Step],
823 prompts: &HashMap<String, String>,
824 system_prompts: &HashMap<String, String>,
825 workflow_name: &str,
826 tier_index: usize,
827 session: Option<&Arc<SessionWriter>>,
828 workflow_provider: Option<&str>,
829 workflow_model: Option<&str>,
830) -> Result<(String, String), ZigError> {
831 if let Some(w) = session {
832 for step in steps {
833 let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
834 let preview = prompts
835 .get(&step.name)
836 .map(|p| prompt_preview(p))
837 .unwrap_or_default();
838 let _ = w.step_started(
839 &step.name,
840 tier_index,
841 &zag_session_id,
842 zag_command_name(&step.command),
843 step.model.as_deref(),
844 &preview,
845 );
846 }
847 }
848 let race_started = Instant::now();
849 let mut children: Vec<(String, std::process::Child)> = Vec::new();
850
851 for step in steps {
852 let prompt = prompts.get(&step.name).ok_or_else(|| {
853 ZigError::Execution(format!("missing prompt for step '{}'", step.name))
854 })?;
855 eprintln!(" racing step '{}'...", step.name);
856 let rendered_sp = system_prompts.get(&step.name).map(|s| s.as_str());
857 let child = spawn_step(
858 step,
859 prompt,
860 workflow_name,
861 rendered_sp,
862 workflow_provider,
863 workflow_model,
864 )?;
865 children.push((step.name.clone(), child));
866 }
867
868 loop {
870 for i in 0..children.len() {
871 let status = children[i]
872 .1
873 .try_wait()
874 .map_err(|e| ZigError::Execution(format!("failed to poll child: {e}")))?;
875
876 if let Some(exit_status) = status {
877 let (winner_name, winner_child) = children.remove(i);
878
879 for (name, mut child) in children {
881 eprintln!(" cancelling step '{name}' (race lost)");
882 let _ = child.kill();
883 let _ = child.wait();
884 }
885
886 let elapsed = race_started.elapsed().as_millis() as u64;
887 if !exit_status.success() {
888 let stderr = winner_child
889 .stderr
890 .map(|mut s| {
891 let mut buf = String::new();
892 std::io::Read::read_to_string(&mut s, &mut buf).ok();
893 buf
894 })
895 .unwrap_or_default();
896 let err_msg = format!(
897 "race winner '{}' failed (exit {}): {}",
898 winner_name,
899 exit_status,
900 stderr.trim()
901 );
902 if let Some(w) = session {
903 let _ = w.step_failed(&winner_name, exit_status.code(), 1, &err_msg);
904 }
905 return Err(ZigError::Execution(err_msg));
906 }
907
908 let stdout = winner_child
909 .stdout
910 .map(|mut s| {
911 let mut buf = String::new();
912 std::io::Read::read_to_string(&mut s, &mut buf).ok();
913 buf
914 })
915 .unwrap_or_default();
916
917 eprintln!(" race won by '{winner_name}'");
918 if let Some(w) = session {
919 let _ = w.step_completed(&winner_name, 0, elapsed, Vec::new());
920 }
921 return Ok((winner_name, stdout));
922 }
923 }
924
925 std::thread::sleep(Duration::from_millis(100));
926 }
927}
928
929#[allow(clippy::too_many_arguments)]
931fn execute_sequential_step(
932 step: &Step,
933 vars: &mut HashMap<String, String>,
934 user_prompt: Option<&str>,
935 step_outputs: &mut HashMap<String, String>,
936 workflow_name: &str,
937 pending_next: &mut Option<String>,
938 tier_index: usize,
939 session: Option<&Arc<SessionWriter>>,
940 roles: &HashMap<String, Role>,
941 workflow_dir: &Path,
942 workflow_provider: Option<&str>,
943 workflow_model: Option<&str>,
944) -> Result<(), ZigError> {
945 if let Some(condition) = &step.condition {
946 if !evaluate_condition(condition, vars)? {
947 eprintln!(
948 " skipping '{}' (condition not met: {condition})",
949 step.name
950 );
951 if let Some(w) = session {
952 let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
953 }
954 return Ok(());
955 }
956 }
957
958 eprintln!(" running step '{}'...", step.name);
959
960 let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
961 let rendered_sp = resolve_role_system_prompt(step, roles, vars, workflow_dir)?;
962 if let Some(w) = session {
963 let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
964 let _ = w.step_started(
965 &step.name,
966 tier_index,
967 &zag_session_id,
968 zag_command_name(&step.command),
969 step.model.as_deref(),
970 &prompt_preview(&prompt),
971 );
972 }
973 let started = Instant::now();
974 let result = run_step_attempts(
975 step,
976 &prompt,
977 workflow_name,
978 None,
979 session,
980 rendered_sp.as_deref(),
981 workflow_provider,
982 workflow_model,
983 );
984
985 match result {
986 Ok(output) => {
987 let mut saved_keys: Vec<String> = Vec::new();
988 if !step.saves.is_empty() {
989 let saved = extract_saves(&output, &step.saves)?;
990 for (k, v) in &saved {
991 eprintln!(" saved {k} = {v}");
992 saved_keys.push(k.clone());
993 }
994 vars.extend(saved);
995 }
996
997 step_outputs.insert(step.name.clone(), output);
998 eprintln!(" completed '{}'", step.name);
999 if let Some(w) = session {
1000 let _ = w.step_completed(
1001 &step.name,
1002 0,
1003 started.elapsed().as_millis() as u64,
1004 saved_keys,
1005 );
1006 }
1007
1008 if step.next.is_some() {
1009 *pending_next = step.next.clone();
1010 }
1011 }
1012 Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1013 FailurePolicy::Fail => return Err(e),
1014 FailurePolicy::Continue => {
1015 eprintln!(" step '{}' failed (continuing): {e}", step.name);
1016 }
1017 FailurePolicy::Retry => {
1018 return Err(e);
1019 }
1020 },
1021 }
1022
1023 Ok(())
1024}
1025
1026#[allow(clippy::too_many_arguments)]
1035fn execute_parallel_tier(
1036 steps: &[&Step],
1037 vars: &mut HashMap<String, String>,
1038 user_prompt: Option<&str>,
1039 step_outputs: &mut HashMap<String, String>,
1040 workflow_name: &str,
1041 pending_next: &mut Option<String>,
1042 tier_index: usize,
1043 session: Option<&Arc<SessionWriter>>,
1044 roles: &HashMap<String, Role>,
1045 workflow_dir: &Path,
1046 workflow_provider: Option<&str>,
1047 workflow_model: Option<&str>,
1048) -> Result<(), ZigError> {
1049 let mut active: Vec<&Step> = Vec::new();
1052 let mut prompts: HashMap<String, String> = HashMap::new();
1053 let mut rendered_sps: HashMap<String, String> = HashMap::new();
1054 for step in steps {
1055 if let Some(condition) = &step.condition {
1056 if !evaluate_condition(condition, vars)? {
1057 eprintln!(
1058 " skipping '{}' (condition not met: {condition})",
1059 step.name
1060 );
1061 if let Some(w) = session {
1062 let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
1063 }
1064 continue;
1065 }
1066 }
1067 let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
1068 prompts.insert(step.name.clone(), prompt);
1069 if let Some(sp) = resolve_role_system_prompt(step, roles, vars, workflow_dir)? {
1070 rendered_sps.insert(step.name.clone(), sp);
1071 }
1072 active.push(*step);
1073 }
1074
1075 if active.is_empty() {
1076 return Ok(());
1077 }
1078
1079 eprintln!(" running {} steps in parallel...", active.len());
1080
1081 let mut start_times: HashMap<String, Instant> = HashMap::new();
1082 let mut handles: Vec<thread::JoinHandle<(String, Result<String, ZigError>)>> = Vec::new();
1083 for step in &active {
1084 let step_clone: Step = (*step).clone();
1085 let prompt = prompts.remove(&step.name).unwrap_or_default();
1086 let rendered_sp = rendered_sps.remove(&step.name);
1087 let workflow_name_owned = workflow_name.to_string();
1088 let name = step.name.clone();
1089 eprintln!(" starting '{name}'...");
1090 if let Some(w) = session {
1091 let zag_session_id = format!("zig-{workflow_name}-{name}");
1092 let _ = w.step_started(
1093 &name,
1094 tier_index,
1095 &zag_session_id,
1096 zag_command_name(&step.command),
1097 step.model.as_deref(),
1098 &prompt_preview(&prompt),
1099 );
1100 }
1101 start_times.insert(name.clone(), Instant::now());
1102 let session_clone = session.cloned();
1103 let wf_provider = workflow_provider.map(String::from);
1104 let wf_model = workflow_model.map(String::from);
1105 let handle = thread::spawn(move || {
1106 let res = run_step_attempts(
1107 &step_clone,
1108 &prompt,
1109 &workflow_name_owned,
1110 Some(&name),
1111 session_clone.as_ref(),
1112 rendered_sp.as_deref(),
1113 wf_provider.as_deref(),
1114 wf_model.as_deref(),
1115 );
1116 (name, res)
1117 });
1118 handles.push(handle);
1119 }
1120
1121 let mut results: HashMap<String, Result<String, ZigError>> = HashMap::new();
1122 for handle in handles {
1123 match handle.join() {
1124 Ok((name, res)) => {
1125 results.insert(name, res);
1126 }
1127 Err(_) => {
1128 return Err(ZigError::Execution("parallel step thread panicked".into()));
1129 }
1130 }
1131 }
1132
1133 let mut errors: Vec<String> = Vec::new();
1135 for step in &active {
1136 let Some(res) = results.remove(&step.name) else {
1137 continue;
1138 };
1139 let elapsed = start_times
1140 .remove(&step.name)
1141 .map(|t| t.elapsed().as_millis() as u64)
1142 .unwrap_or(0);
1143 match res {
1144 Ok(output) => {
1145 let mut saved_keys: Vec<String> = Vec::new();
1146 if !step.saves.is_empty() {
1147 let saved = extract_saves(&output, &step.saves)?;
1148 for (k, v) in &saved {
1149 eprintln!(" saved {k} = {v}");
1150 saved_keys.push(k.clone());
1151 }
1152 vars.extend(saved);
1153 }
1154 step_outputs.insert(step.name.clone(), output);
1155 eprintln!(" completed '{}'", step.name);
1156 if let Some(w) = session {
1157 let _ = w.step_completed(&step.name, 0, elapsed, saved_keys);
1158 }
1159 if step.next.is_some() && pending_next.is_none() {
1160 *pending_next = step.next.clone();
1161 }
1162 }
1163 Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1164 FailurePolicy::Continue => {
1165 eprintln!(" step '{}' failed (continuing): {e}", step.name);
1166 }
1167 FailurePolicy::Fail | FailurePolicy::Retry => {
1168 errors.push(format!("'{}': {e}", step.name));
1169 }
1170 },
1171 }
1172 }
1173
1174 if !errors.is_empty() {
1175 return Err(ZigError::Execution(format!(
1176 "parallel step(s) failed: {}",
1177 errors.join("; ")
1178 )));
1179 }
1180
1181 Ok(())
1182}
1183
1184fn init_vars(workflow: &Workflow) -> HashMap<String, String> {
1187 let mut vars = HashMap::new();
1188 for (name, var) in &workflow.vars {
1189 let value = match &var.default {
1190 Some(toml::Value::String(s)) => s.clone(),
1191 Some(toml::Value::Integer(n)) => n.to_string(),
1192 Some(toml::Value::Float(f)) => f.to_string(),
1193 Some(toml::Value::Boolean(b)) => b.to_string(),
1194 Some(other) => other.to_string(),
1195 None => String::new(),
1196 };
1197 vars.insert(name.clone(), value);
1198 }
1199 vars
1200}
1201
1202fn execute(
1204 workflow: &Workflow,
1205 workflow_path: &std::path::Path,
1206 user_prompt: Option<&str>,
1207 workflow_dir: &Path,
1208) -> Result<(), ZigError> {
1209 let mut vars = init_vars(workflow);
1210
1211 load_file_defaults(&mut vars, &workflow.vars, workflow_dir)?;
1213
1214 let prompt_var = workflow
1216 .vars
1217 .iter()
1218 .find(|(_, v)| v.from.as_deref() == Some("prompt"))
1219 .map(|(name, _)| name.clone());
1220
1221 if let Some(ref var_name) = prompt_var {
1222 if let Some(prompt) = user_prompt {
1223 vars.insert(var_name.clone(), prompt.to_string());
1224 }
1225 }
1226
1227 if let Err(errors) = validate::validate_var_values(&vars, &workflow.vars) {
1229 let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
1230 return Err(ZigError::Validation(msgs.join("; ")));
1231 }
1232
1233 let effective_user_prompt = if prompt_var.is_some() {
1235 None
1236 } else {
1237 user_prompt
1238 };
1239
1240 let mut step_outputs: HashMap<String, String> = HashMap::new();
1241
1242 let wf_provider = workflow.workflow.provider.as_deref();
1243 let wf_model = workflow.workflow.model.as_deref();
1244
1245 let tiers = topological_sort(&workflow.steps)?;
1246
1247 eprintln!(
1248 "running workflow '{}' ({} steps in {} tiers)",
1249 workflow.workflow.name,
1250 workflow.steps.len(),
1251 tiers.len()
1252 );
1253
1254 let coordinator = match SessionWriter::create(
1259 &workflow.workflow.name,
1260 &workflow_path.to_string_lossy(),
1261 user_prompt,
1262 tiers.len(),
1263 ) {
1264 Ok(writer) => {
1265 eprintln!("zig session: {}", writer.session_id());
1266 Some(SessionCoordinator::start(writer))
1267 }
1268 Err(e) => {
1269 eprintln!("warning: failed to open zig session log: {e}");
1270 None
1271 }
1272 };
1273 let session_writer: Option<Arc<SessionWriter>> = coordinator.as_ref().map(|c| c.writer());
1274 let session_ref = session_writer.as_ref();
1275
1276 let mut iteration = 0;
1277 let mut pending_next: Option<String> = None;
1278
1279 loop {
1280 let tiers_to_run = if let Some(ref next_step) = pending_next {
1281 let remaining: Vec<Vec<&Step>> = tiers
1283 .iter()
1284 .map(|tier| {
1285 tier.iter()
1286 .filter(|s| s.name == *next_step)
1287 .copied()
1288 .collect::<Vec<_>>()
1289 })
1290 .filter(|tier| !tier.is_empty())
1291 .collect();
1292 pending_next = None;
1293 remaining
1294 } else if iteration == 0 {
1295 tiers.clone()
1296 } else {
1297 break;
1298 };
1299
1300 for (tier_index, tier) in tiers_to_run.iter().enumerate() {
1301 let (non_race, race_groups) = partition_tier(tier);
1302
1303 if let Some(w) = session_ref {
1304 let names: Vec<String> = tier.iter().map(|s| s.name.clone()).collect();
1305 let _ = w.tier_started(tier_index, names);
1306 }
1307
1308 if non_race.len() <= 1 {
1312 for step in &non_race {
1313 execute_sequential_step(
1314 step,
1315 &mut vars,
1316 effective_user_prompt,
1317 &mut step_outputs,
1318 &workflow.workflow.name,
1319 &mut pending_next,
1320 tier_index,
1321 session_ref,
1322 &workflow.roles,
1323 workflow_dir,
1324 wf_provider,
1325 wf_model,
1326 )?;
1327 }
1328 } else {
1329 execute_parallel_tier(
1330 &non_race,
1331 &mut vars,
1332 effective_user_prompt,
1333 &mut step_outputs,
1334 &workflow.workflow.name,
1335 &mut pending_next,
1336 tier_index,
1337 session_ref,
1338 &workflow.roles,
1339 workflow_dir,
1340 wf_provider,
1341 wf_model,
1342 )?;
1343 }
1344
1345 for (group_name, race_steps) in &race_groups {
1347 eprintln!(" starting race group '{group_name}'...");
1348
1349 let mut prompts = HashMap::new();
1351 let mut race_sps: HashMap<String, String> = HashMap::new();
1352 let mut active_steps: Vec<&Step> = Vec::new();
1353 for step in race_steps {
1354 if let Some(condition) = &step.condition {
1355 if !evaluate_condition(condition, &vars)? {
1356 eprintln!(
1357 " skipping '{}' (condition not met: {condition})",
1358 step.name
1359 );
1360 continue;
1361 }
1362 }
1363 let prompt =
1364 render_step_prompt(step, &vars, effective_user_prompt, &step_outputs);
1365 prompts.insert(step.name.clone(), prompt);
1366 if let Some(sp) =
1367 resolve_role_system_prompt(step, &workflow.roles, &vars, workflow_dir)?
1368 {
1369 race_sps.insert(step.name.clone(), sp);
1370 }
1371 active_steps.push(step);
1372 }
1373
1374 if active_steps.is_empty() {
1375 continue;
1376 }
1377
1378 match execute_race_group(
1379 &active_steps,
1380 &prompts,
1381 &race_sps,
1382 &workflow.workflow.name,
1383 tier_index,
1384 session_ref,
1385 wf_provider,
1386 wf_model,
1387 ) {
1388 Ok((winner_name, output)) => {
1389 if let Some(winner) = active_steps.iter().find(|s| s.name == winner_name) {
1391 if !winner.saves.is_empty() {
1392 let saved = extract_saves(&output, &winner.saves)?;
1393 for (k, v) in &saved {
1394 eprintln!(" saved {k} = {v}");
1395 }
1396 vars.extend(saved);
1397 }
1398 if winner.next.is_some() {
1399 pending_next = winner.next.clone();
1400 }
1401 }
1402 step_outputs.insert(winner_name.clone(), output);
1403 eprintln!(
1404 " completed race group '{group_name}' (winner: '{winner_name}')"
1405 );
1406 }
1407 Err(e) => return Err(e),
1408 }
1409 }
1410 }
1411
1412 iteration += 1;
1413 if pending_next.is_none() || iteration >= MAX_LOOP_ITERATIONS {
1414 if iteration >= MAX_LOOP_ITERATIONS {
1415 eprintln!("warning: reached maximum loop iterations ({MAX_LOOP_ITERATIONS})");
1416 }
1417 break;
1418 }
1419 }
1420
1421 eprintln!("workflow '{}' completed", workflow.workflow.name);
1422 if let Some(c) = coordinator {
1423 let _ = c.finish(SessionStatus::Success);
1424 }
1425 Ok(())
1426}
1427
1428fn zag_command_name(cmd: &Option<StepCommand>) -> &'static str {
1431 match cmd {
1432 None => "run",
1433 Some(StepCommand::Review) => "review",
1434 Some(StepCommand::Plan) => "plan",
1435 Some(StepCommand::Pipe) => "pipe",
1436 Some(StepCommand::Collect) => "collect",
1437 Some(StepCommand::Summary) => "summary",
1438 }
1439}
1440
1441fn prompt_preview(prompt: &str) -> String {
1443 const MAX: usize = 200;
1444 let collapsed: String = prompt
1445 .chars()
1446 .map(|c| if c == '\n' { ' ' } else { c })
1447 .collect();
1448 if collapsed.chars().count() <= MAX {
1449 collapsed
1450 } else {
1451 let truncated: String = collapsed.chars().take(MAX).collect();
1452 format!("{truncated}…")
1453 }
1454}
1455
1456#[cfg(test)]
1457#[path = "run_tests.rs"]
1458mod tests;