1use std::collections::HashMap;
2use std::io::{BufRead, BufReader, Write};
3use std::path::{Path, PathBuf};
4use std::process::Command;
5use std::sync::{Arc, Mutex};
6use std::thread;
7use std::time::Duration;
8
9use std::time::Instant;
10
11use crate::config::ZigConfig;
12use crate::error::ZigError;
13use crate::memory::{MemoryCollector, render_memory_block};
14use crate::paths::expand_path;
15use crate::resources::{ResourceCollector, render_system_block};
16use crate::session::{OutputStream, SessionCoordinator, SessionStatus, SessionWriter};
17use crate::workflow::model::{FailurePolicy, MemoryMode, Role, Step, StepCommand, Workflow};
18use crate::workflow::{parser, validate};
19
20const MAX_LOOP_ITERATIONS: usize = 100;
22
23pub fn run_workflow(
36 workflow_path: &str,
37 user_prompt: Option<&str>,
38 disable_resources: bool,
39 disable_memory: bool,
40) -> Result<(), ZigError> {
41 check_zag()?;
42
43 let path = resolve_workflow_path(workflow_path)?;
44 let (workflow, source) = parser::parse_workflow(&path)?;
45
46 if let Err(errors) = validate::validate(&workflow) {
47 let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
48 return Err(ZigError::Validation(msgs.join("; ")));
49 }
50
51 execute(
52 &workflow,
53 &path,
54 user_prompt,
55 source.dir(),
56 disable_resources,
57 disable_memory,
58 )
59}
60
61pub(crate) fn check_zag() -> Result<(), ZigError> {
63 let zag_available = Command::new("zag")
64 .arg("--version")
65 .output()
66 .is_ok_and(|o| o.status.success());
67
68 if !zag_available {
69 return Err(ZigError::Zag(
70 "zag is not installed or not in PATH. Install it from https://github.com/niclaslindstedt/zag".into(),
71 ));
72 }
73 Ok(())
74}
75
76pub fn resolve_workflow_path(workflow: &str) -> Result<PathBuf, ZigError> {
89 let mut candidates = vec![
90 PathBuf::from(workflow),
91 PathBuf::from(format!("{workflow}.zwf")),
92 PathBuf::from(format!("{workflow}.zwfz")),
93 ];
94
95 if let Some(local_dir) = crate::paths::cwd_workflows_dir() {
96 candidates.push(local_dir.join(workflow));
97 candidates.push(local_dir.join(format!("{workflow}.zwf")));
98 candidates.push(local_dir.join(format!("{workflow}.zwfz")));
99 }
100
101 if let Some(global_dir) = crate::paths::global_workflows_dir() {
102 candidates.push(global_dir.join(workflow));
103 candidates.push(global_dir.join(format!("{workflow}.zwf")));
104 candidates.push(global_dir.join(format!("{workflow}.zwfz")));
105 }
106
107 for candidate in &candidates {
108 if candidate.exists() {
109 return Ok(candidate.clone());
110 }
111 }
112
113 Err(ZigError::Io(format!(
114 "workflow not found: '{workflow}' (tried: {})",
115 candidates
116 .iter()
117 .map(|p| p.display().to_string())
118 .collect::<Vec<_>>()
119 .join(", ")
120 )))
121}
122
123fn topological_sort(steps: &[Step]) -> Result<Vec<Vec<&Step>>, ZigError> {
129 let step_index: HashMap<&str, usize> = steps
130 .iter()
131 .enumerate()
132 .map(|(i, s)| (s.name.as_str(), i))
133 .collect();
134
135 let mut in_degree = vec![0usize; steps.len()];
136 for (i, step) in steps.iter().enumerate() {
137 for dep in &step.depends_on {
138 if step_index.contains_key(dep.as_str()) {
139 in_degree[i] += 1;
140 }
141 }
142 }
143
144 let mut tiers = Vec::new();
145 let mut remaining = in_degree.clone();
146 let mut completed: Vec<bool> = vec![false; steps.len()];
147
148 loop {
149 let tier: Vec<usize> = (0..steps.len())
150 .filter(|&i| !completed[i] && remaining[i] == 0)
151 .collect();
152
153 if tier.is_empty() {
154 break;
155 }
156
157 for &i in &tier {
158 completed[i] = true;
159 }
160
161 for &i in &tier {
163 for (j, step) in steps.iter().enumerate() {
164 if !completed[j] && step.depends_on.contains(&steps[i].name) {
165 remaining[j] -= 1;
166 }
167 }
168 }
169
170 tiers.push(tier.iter().map(|&i| &steps[i]).collect());
171 }
172
173 let completed_count: usize = completed.iter().filter(|&&c| c).count();
174 if completed_count != steps.len() {
175 return Err(ZigError::Execution(
176 "could not resolve all steps — possible undetected cycle".into(),
177 ));
178 }
179
180 Ok(tiers)
181}
182
183fn substitute_vars(template: &str, vars: &HashMap<String, String>) -> String {
189 let mut result = String::with_capacity(template.len());
190 let mut rest = template;
191
192 while let Some(start) = rest.find("${") {
193 result.push_str(&rest[..start]);
194 let after_start = &rest[start + 2..];
195
196 if let Some(end) = after_start.find('}') {
197 let var_expr = &after_start[..end];
198 let mut parts = var_expr.splitn(2, '.');
199 let root = parts.next().unwrap_or(var_expr);
200
201 if let Some(value) = vars.get(root) {
202 if let Some(path) = parts.next() {
203 if let Ok(json) = serde_json::from_str::<serde_json::Value>(value) {
205 let resolved = json_path_lookup(&json, path);
206 result.push_str(&resolved);
207 } else {
208 result.push_str(value);
209 }
210 } else {
211 result.push_str(value);
212 }
213 } else {
214 result.push_str(&rest[start..start + 2 + end + 1]);
216 }
217
218 rest = &after_start[end + 1..];
219 } else {
220 result.push_str(&rest[start..]);
221 rest = "";
222 }
223 }
224
225 result.push_str(rest);
226 result
227}
228
229fn json_path_lookup(value: &serde_json::Value, path: &str) -> String {
231 let mut current = value;
232 for key in path.split('.') {
233 match current.get(key) {
234 Some(v) => current = v,
235 None => return format!("${{?.{path}}}"),
236 }
237 }
238 match current {
239 serde_json::Value::String(s) => s.clone(),
240 other => other.to_string(),
241 }
242}
243
244fn resolve_role_system_prompt(
261 step: &Step,
262 roles: &HashMap<String, Role>,
263 resources: &ResourceCollector,
264 memory: &MemoryCollector,
265 vars: &HashMap<String, String>,
266 workflow_dir: &Path,
267 workflow_name: &str,
268) -> Result<Option<String>, ZigError> {
269 let base_prompt: Option<String> = if let Some(ref sp) = step.system_prompt {
271 Some(substitute_vars(sp, vars))
272 } else if let Some(ref role_ref) = step.role {
273 let resolved_name = substitute_vars(role_ref, vars);
274 let role = roles.get(&resolved_name).ok_or_else(|| {
275 ZigError::Execution(format!(
276 "step '{}' references role '{}' which does not exist",
277 step.name, resolved_name
278 ))
279 })?;
280
281 let raw_prompt = if let Some(ref file_path) = role.system_prompt_file {
282 let full_path = workflow_dir.join(expand_path(file_path));
283 Some(std::fs::read_to_string(&full_path).map_err(|e| {
284 ZigError::Execution(format!(
285 "failed to read system_prompt_file '{}' for role '{}': {e}",
286 full_path.display(),
287 resolved_name
288 ))
289 })?)
290 } else {
291 role.system_prompt.clone()
292 };
293
294 raw_prompt.map(|p| substitute_vars(&p, vars))
295 } else {
296 None
297 };
298
299 let set = resources.collect_for_step(&step.resources)?;
301 let resource_block = render_system_block(&set);
302
303 let memory_entries = memory.collect_for_step(step.memory.as_deref())?;
305 let memory_block = render_memory_block(&memory_entries, workflow_name, Some(&step.name));
306
307 let prefix = format!("{resource_block}{memory_block}");
308
309 match (prefix.is_empty(), base_prompt) {
310 (true, None) => Ok(None),
311 (true, Some(p)) => Ok(Some(p)),
312 (false, None) => Ok(Some(prefix.trim_end().to_string())),
313 (false, Some(p)) => Ok(Some(format!("{prefix}{p}"))),
314 }
315}
316
317fn load_file_defaults(
322 vars: &mut HashMap<String, String>,
323 declarations: &HashMap<String, crate::workflow::model::Variable>,
324 workflow_dir: &Path,
325) -> Result<(), ZigError> {
326 for (name, decl) in declarations {
327 if decl.default.is_none() {
328 if let Some(ref file_path) = decl.default_file {
329 let full_path = workflow_dir.join(expand_path(file_path));
330 let content = std::fs::read_to_string(&full_path).map_err(|e| {
331 ZigError::Execution(format!(
332 "failed to read default_file '{}' for variable '{name}': {e}",
333 full_path.display()
334 ))
335 })?;
336 vars.insert(name.clone(), content);
337 }
338 }
339 }
340 Ok(())
341}
342
343fn evaluate_condition(condition: &str, vars: &HashMap<String, String>) -> Result<bool, ZigError> {
350 let condition = condition.trim();
351
352 let operators = ["<=", ">=", "!=", "==", "<", ">"];
354 for op in &operators {
355 if let Some(pos) = condition.find(op) {
356 let lhs = resolve_operand(condition[..pos].trim(), vars);
357 let rhs = resolve_operand(condition[pos + op.len()..].trim(), vars);
358 return Ok(compare(&lhs, &rhs, op));
359 }
360 }
361
362 let value = vars.get(condition).map(|s| s.as_str()).unwrap_or("");
364 Ok(is_truthy(value))
365}
366
367fn resolve_operand(token: &str, vars: &HashMap<String, String>) -> String {
372 if (token.starts_with('"') && token.ends_with('"'))
374 || (token.starts_with('\'') && token.ends_with('\''))
375 {
376 return token[1..token.len() - 1].to_string();
377 }
378 if let Some(val) = vars.get(token) {
380 return val.clone();
381 }
382 token.to_string()
384}
385
386fn compare(lhs: &str, rhs: &str, op: &str) -> bool {
389 if let (Ok(l), Ok(r)) = (lhs.parse::<f64>(), rhs.parse::<f64>()) {
390 return match op {
391 "==" => (l - r).abs() < f64::EPSILON,
392 "!=" => (l - r).abs() >= f64::EPSILON,
393 "<" => l < r,
394 ">" => l > r,
395 "<=" => l <= r,
396 ">=" => l >= r,
397 _ => false,
398 };
399 }
400 match op {
401 "==" => lhs == rhs,
402 "!=" => lhs != rhs,
403 "<" => lhs < rhs,
404 ">" => lhs > rhs,
405 "<=" => lhs <= rhs,
406 ">=" => lhs >= rhs,
407 _ => false,
408 }
409}
410
411fn is_truthy(value: &str) -> bool {
413 !value.is_empty() && value != "false" && value != "0"
414}
415
416fn render_step_prompt(
419 step: &Step,
420 vars: &HashMap<String, String>,
421 user_prompt: Option<&str>,
422 dependency_outputs: &HashMap<String, String>,
423) -> String {
424 let mut prompt = String::new();
425
426 if let Some(ctx) = user_prompt {
428 prompt.push_str(&format!("User context: {ctx}\n\n"));
429 }
430
431 if step.inject_context {
433 for dep in &step.depends_on {
434 if let Some(output) = dependency_outputs.get(dep) {
435 prompt.push_str(&format!("--- Output from '{dep}' ---\n{output}\n\n"));
436 }
437 }
438 }
439
440 prompt.push_str(&substitute_vars(&step.prompt, vars));
442
443 prompt
444}
445
446fn build_zag_args(
460 step: &Step,
461 prompt: &str,
462 workflow_name: &str,
463 model_override: Option<&str>,
464 rendered_system_prompt: Option<&str>,
465 workflow_provider: Option<&str>,
466 workflow_model: Option<&str>,
467) -> Vec<String> {
468 let session_name = |dep: &str| format!("zig-{workflow_name}-{dep}");
469
470 let (mut args, accepts_agent_args) = match &step.command {
472 None => (vec!["run".to_string(), prompt.to_string()], true),
473 Some(StepCommand::Review) => {
474 let mut a = vec!["review".to_string()];
475 if !prompt.is_empty() {
476 a.push(prompt.to_string());
477 }
478 if step.uncommitted {
479 a.push("--uncommitted".into());
480 }
481 if let Some(base) = &step.base {
482 a.extend(["--base".into(), base.clone()]);
483 }
484 if let Some(commit) = &step.commit {
485 a.extend(["--commit".into(), commit.clone()]);
486 }
487 if let Some(title) = &step.title {
488 a.extend(["--title".into(), title.clone()]);
489 }
490 (a, true)
491 }
492 Some(StepCommand::Plan) => {
493 let mut a = vec!["plan".to_string(), prompt.to_string()];
494 if let Some(output) = &step.plan_output {
495 a.extend(["-o".into(), expand_path(output)]);
496 }
497 if let Some(instructions) = &step.instructions {
498 a.extend(["--instructions".into(), instructions.clone()]);
499 }
500 (a, true)
501 }
502 Some(StepCommand::Pipe) => {
503 let mut a = vec!["pipe".to_string()];
504 for dep in &step.depends_on {
505 a.push(session_name(dep));
506 }
507 a.push("--".into());
508 a.push(prompt.to_string());
509 (a, true)
510 }
511 Some(StepCommand::Collect) => {
512 let mut a = vec!["collect".to_string()];
513 for dep in &step.depends_on {
514 a.push(session_name(dep));
515 }
516 (a, false)
517 }
518 Some(StepCommand::Summary) => {
519 let mut a = vec!["summary".to_string()];
520 for dep in &step.depends_on {
521 a.push(session_name(dep));
522 }
523 (a, false)
524 }
525 };
526
527 if accepts_agent_args {
530 let effective_provider = step.provider.as_deref().or(workflow_provider);
531 if let Some(provider) = effective_provider {
532 args.extend(["--provider".into(), provider.to_string()]);
533 }
534
535 let effective_model = model_override.or(step.model.as_deref()).or(workflow_model);
536 if let Some(model) = effective_model {
537 args.extend(["--model".into(), model.to_string()]);
538 }
539
540 if let Some(sp) = rendered_system_prompt {
541 args.extend(["--system-prompt".into(), sp.to_string()]);
542 }
543 if let Some(max_turns) = step.max_turns {
544 args.extend(["--max-turns".into(), max_turns.to_string()]);
545 }
546
547 if let Some(output) = &step.output {
549 args.extend(["-o".into(), output.clone()]);
550 } else if step.json {
551 args.push("--json".into());
552 }
553 if let Some(schema) = &step.json_schema {
554 args.extend(["--json-schema".into(), schema.clone()]);
555 }
556
557 if let Some(mcp_config) = &step.mcp_config {
558 args.extend(["--mcp-config".into(), expand_path(mcp_config)]);
559 }
560
561 if step.auto_approve {
563 args.push("--auto-approve".into());
564 }
565 if let Some(root) = &step.root {
566 args.extend(["--root".into(), expand_path(root)]);
567 }
568 for dir in &step.add_dirs {
569 args.extend(["--add-dir".into(), expand_path(dir)]);
570 }
571 for (key, value) in &step.env {
572 args.extend(["--env".into(), format!("{key}={value}")]);
573 }
574 for file in &step.files {
575 args.extend(["--file".into(), expand_path(file)]);
576 }
577
578 for ctx in &step.context {
580 args.extend(["--context".into(), ctx.clone()]);
581 }
582 if let Some(plan) = &step.plan {
583 args.extend(["--plan".into(), expand_path(plan)]);
584 }
585
586 if step.worktree {
588 args.push("--worktree".into());
589 }
590 if let Some(sandbox) = &step.sandbox {
591 args.extend(["--sandbox".into(), sandbox.clone()]);
592 }
593 }
594
595 let name = session_name(&step.name);
597 args.extend(["--name".into(), name]);
598
599 if !step.description.is_empty() {
600 args.extend(["--description".into(), step.description.clone()]);
601 }
602
603 args.extend(["--tag".into(), "zig-workflow".into()]);
604 for tag in &step.tags {
605 args.extend(["--tag".into(), tag.clone()]);
606 }
607
608 if let Some(timeout) = &step.timeout {
609 args.extend(["--timeout".into(), timeout.clone()]);
610 }
611
612 args
613}
614
615fn run_zag_streaming(
621 args: &[String],
622 step_name: &str,
623 prefix: Option<&str>,
624 session: Option<&Arc<SessionWriter>>,
625) -> Result<(std::process::ExitStatus, String), ZigError> {
626 let mut cmd = Command::new("zag");
627 cmd.args(args)
628 .stdout(std::process::Stdio::piped())
629 .stderr(std::process::Stdio::piped());
630
631 let mut child = cmd
632 .spawn()
633 .map_err(|e| ZigError::Zag(format!("failed to launch zag for step '{step_name}': {e}")))?;
634
635 let stdout = child.stdout.take().expect("stdout was piped");
636 let stderr = child.stderr.take().expect("stderr was piped");
637
638 let buffer = Arc::new(Mutex::new(String::new()));
639 let buffer_clone = Arc::clone(&buffer);
640 let prefix_stdout = prefix.map(String::from);
641 let prefix_stderr = prefix.map(String::from);
642 let session_stdout = session.cloned();
643 let session_stderr = session.cloned();
644 let step_name_stdout = step_name.to_string();
645 let step_name_stderr = step_name.to_string();
646
647 let stdout_thread = thread::spawn(move || {
648 let reader = BufReader::new(stdout);
649 let stderr_handle = std::io::stderr();
650 for line in reader.lines().map_while(Result::ok) {
651 if let Ok(mut buf) = buffer_clone.lock() {
652 buf.push_str(&line);
653 buf.push('\n');
654 }
655 if let Some(w) = &session_stdout {
656 let _ = w.step_output(&step_name_stdout, OutputStream::Stdout, &line);
657 }
658 let mut h = stderr_handle.lock();
659 let _ = match &prefix_stdout {
660 Some(p) => writeln!(h, "[{p}] {line}"),
661 None => writeln!(h, "{line}"),
662 };
663 }
664 });
665
666 let stderr_thread = thread::spawn(move || {
667 let reader = BufReader::new(stderr);
668 let stderr_handle = std::io::stderr();
669 for line in reader.lines().map_while(Result::ok) {
670 if let Some(w) = &session_stderr {
671 let _ = w.step_output(&step_name_stderr, OutputStream::Stderr, &line);
672 }
673 let mut h = stderr_handle.lock();
674 let _ = match &prefix_stderr {
675 Some(p) => writeln!(h, "[{p}] {line}"),
676 None => writeln!(h, "{line}"),
677 };
678 }
679 });
680
681 let status = child
682 .wait()
683 .map_err(|e| ZigError::Execution(format!("failed to wait for child: {e}")))?;
684
685 let _ = stdout_thread.join();
686 let _ = stderr_thread.join();
687
688 let captured = Arc::try_unwrap(buffer)
689 .map_err(|_| ZigError::Execution("buffer still shared after threads joined".into()))?
690 .into_inner()
691 .map_err(|_| ZigError::Execution("output buffer poisoned".into()))?;
692
693 Ok((status, captured))
694}
695
696#[allow(clippy::too_many_arguments)]
702fn execute_step(
703 step: &Step,
704 prompt: &str,
705 workflow_name: &str,
706 model_override: Option<&str>,
707 prefix: Option<&str>,
708 session: Option<&Arc<SessionWriter>>,
709 rendered_system_prompt: Option<&str>,
710 workflow_provider: Option<&str>,
711 workflow_model: Option<&str>,
712) -> Result<String, ZigError> {
713 let args = build_zag_args(
714 step,
715 prompt,
716 workflow_name,
717 model_override,
718 rendered_system_prompt,
719 workflow_provider,
720 workflow_model,
721 );
722 let (status, stdout) = run_zag_streaming(&args, &step.name, prefix, session)?;
723
724 if !status.success() {
725 return Err(ZigError::Execution(format!(
726 "step '{}' failed (exit {})",
727 step.name, status,
728 )));
729 }
730
731 Ok(stdout)
732}
733
734#[allow(clippy::too_many_arguments)]
739fn run_step_attempts(
740 step: &Step,
741 prompt: &str,
742 workflow_name: &str,
743 prefix: Option<&str>,
744 session: Option<&Arc<SessionWriter>>,
745 rendered_system_prompt: Option<&str>,
746 workflow_provider: Option<&str>,
747 workflow_model: Option<&str>,
748) -> Result<String, ZigError> {
749 let mut attempts = 0;
750 let max_attempts = if step.on_failure.as_ref() == Some(&FailurePolicy::Retry) {
751 step.max_retries.unwrap_or(1) + 1
752 } else {
753 1
754 };
755
756 loop {
757 attempts += 1;
758 let model_override = if attempts > 1 {
759 step.retry_model.as_deref()
760 } else {
761 None
762 };
763 match execute_step(
764 step,
765 prompt,
766 workflow_name,
767 model_override,
768 prefix,
769 session,
770 rendered_system_prompt,
771 workflow_provider,
772 workflow_model,
773 ) {
774 Ok(output) => return Ok(output),
775 Err(e) => {
776 if let Some(w) = session {
777 let _ = w.step_failed(&step.name, None, attempts, &e.to_string());
778 }
779 if attempts < max_attempts {
780 eprintln!(
781 " retry {}/{} for step '{}'",
782 attempts,
783 max_attempts - 1,
784 step.name
785 );
786 continue;
787 }
788 return Err(e);
789 }
790 }
791 }
792}
793
794fn extract_saves(
801 output: &str,
802 saves: &HashMap<String, String>,
803) -> Result<HashMap<String, String>, ZigError> {
804 let mut extracted = HashMap::new();
805
806 for (var_name, selector) in saves {
807 let value = if selector == "$" {
808 output.trim().to_string()
809 } else if let Some(path) = selector.strip_prefix("$.") {
810 let json: serde_json::Value = serde_json::from_str(output.trim()).map_err(|e| {
811 ZigError::Execution(format!(
812 "saves selector '{selector}' requires JSON output, but got parse error: {e}"
813 ))
814 })?;
815 json_path_lookup(&json, path)
816 } else {
817 output.trim().to_string()
818 };
819
820 extracted.insert(var_name.clone(), value);
821 }
822
823 Ok(extracted)
824}
825
826fn partition_tier<'a>(tier: &[&'a Step]) -> (Vec<&'a Step>, HashMap<String, Vec<&'a Step>>) {
831 let mut sequential = Vec::new();
832 let mut race_groups: HashMap<String, Vec<&'a Step>> = HashMap::new();
833
834 for step in tier {
835 if let Some(group) = &step.race_group {
836 race_groups.entry(group.clone()).or_default().push(step);
837 } else {
838 sequential.push(*step);
839 }
840 }
841
842 (sequential, race_groups)
843}
844
845fn spawn_step(
847 step: &Step,
848 prompt: &str,
849 workflow_name: &str,
850 rendered_system_prompt: Option<&str>,
851 workflow_provider: Option<&str>,
852 workflow_model: Option<&str>,
853) -> Result<std::process::Child, ZigError> {
854 let args = build_zag_args(
855 step,
856 prompt,
857 workflow_name,
858 None,
859 rendered_system_prompt,
860 workflow_provider,
861 workflow_model,
862 );
863 let mut cmd = Command::new("zag");
864 cmd.args(&args)
865 .stdout(std::process::Stdio::piped())
866 .stderr(std::process::Stdio::piped());
867
868 cmd.spawn()
869 .map_err(|e| ZigError::Zag(format!("failed to spawn zag for step '{}': {e}", step.name)))
870}
871
872#[allow(clippy::too_many_arguments)]
877fn execute_race_group(
878 steps: &[&Step],
879 prompts: &HashMap<String, String>,
880 system_prompts: &HashMap<String, String>,
881 workflow_name: &str,
882 tier_index: usize,
883 session: Option<&Arc<SessionWriter>>,
884 workflow_provider: Option<&str>,
885 workflow_model: Option<&str>,
886) -> Result<(String, String), ZigError> {
887 if let Some(w) = session {
888 for step in steps {
889 let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
890 let preview = prompts
891 .get(&step.name)
892 .map(|p| prompt_preview(p))
893 .unwrap_or_default();
894 let _ = w.step_started(
895 &step.name,
896 tier_index,
897 &zag_session_id,
898 zag_command_name(&step.command),
899 step.model.as_deref(),
900 &preview,
901 );
902 }
903 }
904 let race_started = Instant::now();
905 let mut children: Vec<(String, std::process::Child)> = Vec::new();
906
907 for step in steps {
908 let prompt = prompts.get(&step.name).ok_or_else(|| {
909 ZigError::Execution(format!("missing prompt for step '{}'", step.name))
910 })?;
911 eprintln!(" racing step '{}'...", step.name);
912 let rendered_sp = system_prompts.get(&step.name).map(|s| s.as_str());
913 let child = spawn_step(
914 step,
915 prompt,
916 workflow_name,
917 rendered_sp,
918 workflow_provider,
919 workflow_model,
920 )?;
921 children.push((step.name.clone(), child));
922 }
923
924 loop {
926 for i in 0..children.len() {
927 let status = children[i]
928 .1
929 .try_wait()
930 .map_err(|e| ZigError::Execution(format!("failed to poll child: {e}")))?;
931
932 if let Some(exit_status) = status {
933 let (winner_name, winner_child) = children.remove(i);
934
935 for (name, mut child) in children {
937 eprintln!(" cancelling step '{name}' (race lost)");
938 let _ = child.kill();
939 let _ = child.wait();
940 }
941
942 let elapsed = race_started.elapsed().as_millis() as u64;
943 if !exit_status.success() {
944 let stderr = winner_child
945 .stderr
946 .map(|mut s| {
947 let mut buf = String::new();
948 std::io::Read::read_to_string(&mut s, &mut buf).ok();
949 buf
950 })
951 .unwrap_or_default();
952 let err_msg = format!(
953 "race winner '{}' failed (exit {}): {}",
954 winner_name,
955 exit_status,
956 stderr.trim()
957 );
958 if let Some(w) = session {
959 let _ = w.step_failed(&winner_name, exit_status.code(), 1, &err_msg);
960 }
961 return Err(ZigError::Execution(err_msg));
962 }
963
964 let stdout = winner_child
965 .stdout
966 .map(|mut s| {
967 let mut buf = String::new();
968 std::io::Read::read_to_string(&mut s, &mut buf).ok();
969 buf
970 })
971 .unwrap_or_default();
972
973 eprintln!(" race won by '{winner_name}'");
974 if let Some(w) = session {
975 let _ = w.step_completed(&winner_name, 0, elapsed, Vec::new());
976 }
977 return Ok((winner_name, stdout));
978 }
979 }
980
981 std::thread::sleep(Duration::from_millis(100));
982 }
983}
984
985#[allow(clippy::too_many_arguments)]
987fn execute_sequential_step(
988 step: &Step,
989 vars: &mut HashMap<String, String>,
990 user_prompt: Option<&str>,
991 step_outputs: &mut HashMap<String, String>,
992 workflow_name: &str,
993 pending_next: &mut Option<String>,
994 tier_index: usize,
995 session: Option<&Arc<SessionWriter>>,
996 roles: &HashMap<String, Role>,
997 resources: &ResourceCollector,
998 memory: &MemoryCollector,
999 workflow_dir: &Path,
1000 workflow_provider: Option<&str>,
1001 workflow_model: Option<&str>,
1002) -> Result<(), ZigError> {
1003 if let Some(condition) = &step.condition {
1004 if !evaluate_condition(condition, vars)? {
1005 eprintln!(
1006 " skipping '{}' (condition not met: {condition})",
1007 step.name
1008 );
1009 if let Some(w) = session {
1010 let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
1011 }
1012 return Ok(());
1013 }
1014 }
1015
1016 eprintln!(" running step '{}'...", step.name);
1017
1018 let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
1019 let rendered_sp = resolve_role_system_prompt(
1020 step,
1021 roles,
1022 resources,
1023 memory,
1024 vars,
1025 workflow_dir,
1026 workflow_name,
1027 )?;
1028 if let Some(w) = session {
1029 let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
1030 let _ = w.step_started(
1031 &step.name,
1032 tier_index,
1033 &zag_session_id,
1034 zag_command_name(&step.command),
1035 step.model.as_deref(),
1036 &prompt_preview(&prompt),
1037 );
1038 }
1039 let started = Instant::now();
1040 let result = run_step_attempts(
1041 step,
1042 &prompt,
1043 workflow_name,
1044 None,
1045 session,
1046 rendered_sp.as_deref(),
1047 workflow_provider,
1048 workflow_model,
1049 );
1050
1051 match result {
1052 Ok(output) => {
1053 let mut saved_keys: Vec<String> = Vec::new();
1054 if !step.saves.is_empty() {
1055 let saved = extract_saves(&output, &step.saves)?;
1056 for (k, v) in &saved {
1057 eprintln!(" saved {k} = {v}");
1058 saved_keys.push(k.clone());
1059 }
1060 vars.extend(saved);
1061 }
1062
1063 step_outputs.insert(step.name.clone(), output);
1064 eprintln!(" completed '{}'", step.name);
1065 if let Some(w) = session {
1066 let _ = w.step_completed(
1067 &step.name,
1068 0,
1069 started.elapsed().as_millis() as u64,
1070 saved_keys,
1071 );
1072 }
1073
1074 if step.next.is_some() {
1075 *pending_next = step.next.clone();
1076 }
1077 }
1078 Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1079 FailurePolicy::Fail => return Err(e),
1080 FailurePolicy::Continue => {
1081 eprintln!(" step '{}' failed (continuing): {e}", step.name);
1082 }
1083 FailurePolicy::Retry => {
1084 return Err(e);
1085 }
1086 },
1087 }
1088
1089 Ok(())
1090}
1091
1092#[allow(clippy::too_many_arguments)]
1101fn execute_parallel_tier(
1102 steps: &[&Step],
1103 vars: &mut HashMap<String, String>,
1104 user_prompt: Option<&str>,
1105 step_outputs: &mut HashMap<String, String>,
1106 workflow_name: &str,
1107 pending_next: &mut Option<String>,
1108 tier_index: usize,
1109 session: Option<&Arc<SessionWriter>>,
1110 roles: &HashMap<String, Role>,
1111 resources: &ResourceCollector,
1112 memory: &MemoryCollector,
1113 workflow_dir: &Path,
1114 workflow_provider: Option<&str>,
1115 workflow_model: Option<&str>,
1116) -> Result<(), ZigError> {
1117 let mut active: Vec<&Step> = Vec::new();
1120 let mut prompts: HashMap<String, String> = HashMap::new();
1121 let mut rendered_sps: HashMap<String, String> = HashMap::new();
1122 for step in steps {
1123 if let Some(condition) = &step.condition {
1124 if !evaluate_condition(condition, vars)? {
1125 eprintln!(
1126 " skipping '{}' (condition not met: {condition})",
1127 step.name
1128 );
1129 if let Some(w) = session {
1130 let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
1131 }
1132 continue;
1133 }
1134 }
1135 let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
1136 prompts.insert(step.name.clone(), prompt);
1137 if let Some(sp) = resolve_role_system_prompt(
1138 step,
1139 roles,
1140 resources,
1141 memory,
1142 vars,
1143 workflow_dir,
1144 workflow_name,
1145 )? {
1146 rendered_sps.insert(step.name.clone(), sp);
1147 }
1148 active.push(*step);
1149 }
1150
1151 if active.is_empty() {
1152 return Ok(());
1153 }
1154
1155 eprintln!(" running {} steps in parallel...", active.len());
1156
1157 let mut start_times: HashMap<String, Instant> = HashMap::new();
1158 let mut handles: Vec<thread::JoinHandle<(String, Result<String, ZigError>)>> = Vec::new();
1159 for step in &active {
1160 let step_clone: Step = (*step).clone();
1161 let prompt = prompts.remove(&step.name).unwrap_or_default();
1162 let rendered_sp = rendered_sps.remove(&step.name);
1163 let workflow_name_owned = workflow_name.to_string();
1164 let name = step.name.clone();
1165 eprintln!(" starting '{name}'...");
1166 if let Some(w) = session {
1167 let zag_session_id = format!("zig-{workflow_name}-{name}");
1168 let _ = w.step_started(
1169 &name,
1170 tier_index,
1171 &zag_session_id,
1172 zag_command_name(&step.command),
1173 step.model.as_deref(),
1174 &prompt_preview(&prompt),
1175 );
1176 }
1177 start_times.insert(name.clone(), Instant::now());
1178 let session_clone = session.cloned();
1179 let wf_provider = workflow_provider.map(String::from);
1180 let wf_model = workflow_model.map(String::from);
1181 let handle = thread::spawn(move || {
1182 let res = run_step_attempts(
1183 &step_clone,
1184 &prompt,
1185 &workflow_name_owned,
1186 Some(&name),
1187 session_clone.as_ref(),
1188 rendered_sp.as_deref(),
1189 wf_provider.as_deref(),
1190 wf_model.as_deref(),
1191 );
1192 (name, res)
1193 });
1194 handles.push(handle);
1195 }
1196
1197 let mut results: HashMap<String, Result<String, ZigError>> = HashMap::new();
1198 for handle in handles {
1199 match handle.join() {
1200 Ok((name, res)) => {
1201 results.insert(name, res);
1202 }
1203 Err(_) => {
1204 return Err(ZigError::Execution("parallel step thread panicked".into()));
1205 }
1206 }
1207 }
1208
1209 let mut errors: Vec<String> = Vec::new();
1211 for step in &active {
1212 let Some(res) = results.remove(&step.name) else {
1213 continue;
1214 };
1215 let elapsed = start_times
1216 .remove(&step.name)
1217 .map(|t| t.elapsed().as_millis() as u64)
1218 .unwrap_or(0);
1219 match res {
1220 Ok(output) => {
1221 let mut saved_keys: Vec<String> = Vec::new();
1222 if !step.saves.is_empty() {
1223 let saved = extract_saves(&output, &step.saves)?;
1224 for (k, v) in &saved {
1225 eprintln!(" saved {k} = {v}");
1226 saved_keys.push(k.clone());
1227 }
1228 vars.extend(saved);
1229 }
1230 step_outputs.insert(step.name.clone(), output);
1231 eprintln!(" completed '{}'", step.name);
1232 if let Some(w) = session {
1233 let _ = w.step_completed(&step.name, 0, elapsed, saved_keys);
1234 }
1235 if step.next.is_some() && pending_next.is_none() {
1236 *pending_next = step.next.clone();
1237 }
1238 }
1239 Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1240 FailurePolicy::Continue => {
1241 eprintln!(" step '{}' failed (continuing): {e}", step.name);
1242 }
1243 FailurePolicy::Fail | FailurePolicy::Retry => {
1244 errors.push(format!("'{}': {e}", step.name));
1245 }
1246 },
1247 }
1248 }
1249
1250 if !errors.is_empty() {
1251 return Err(ZigError::Execution(format!(
1252 "parallel step(s) failed: {}",
1253 errors.join("; ")
1254 )));
1255 }
1256
1257 Ok(())
1258}
1259
1260fn init_vars(workflow: &Workflow) -> HashMap<String, String> {
1263 let mut vars = HashMap::new();
1264 for (name, var) in &workflow.vars {
1265 let value = match &var.default {
1266 Some(toml::Value::String(s)) => s.clone(),
1267 Some(toml::Value::Integer(n)) => n.to_string(),
1268 Some(toml::Value::Float(f)) => f.to_string(),
1269 Some(toml::Value::Boolean(b)) => b.to_string(),
1270 Some(other) => other.to_string(),
1271 None => String::new(),
1272 };
1273 vars.insert(name.clone(), value);
1274 }
1275 vars
1276}
1277
1278fn execute(
1280 workflow: &Workflow,
1281 workflow_path: &std::path::Path,
1282 user_prompt: Option<&str>,
1283 workflow_dir: &Path,
1284 disable_resources: bool,
1285 disable_memory: bool,
1286) -> Result<(), ZigError> {
1287 let mut vars = init_vars(workflow);
1288
1289 let resource_collector = ResourceCollector::from_env(
1290 &workflow.workflow.name,
1291 &workflow.workflow.resources,
1292 workflow_dir,
1293 disable_resources,
1294 );
1295
1296 let config = ZigConfig::load();
1297 let workflow_memory_mode = MemoryMode::from_str_opt(workflow.workflow.memory.as_deref());
1298 let memory_collector = MemoryCollector::from_env(
1299 &workflow.workflow.name,
1300 workflow_memory_mode,
1301 &config,
1302 disable_memory,
1303 );
1304
1305 load_file_defaults(&mut vars, &workflow.vars, workflow_dir)?;
1307
1308 let prompt_var = workflow
1310 .vars
1311 .iter()
1312 .find(|(_, v)| v.from.as_deref() == Some("prompt"))
1313 .map(|(name, _)| name.clone());
1314
1315 if let Some(ref var_name) = prompt_var {
1316 if let Some(prompt) = user_prompt {
1317 vars.insert(var_name.clone(), prompt.to_string());
1318 }
1319 }
1320
1321 if let Err(errors) = validate::validate_var_values(&vars, &workflow.vars) {
1323 let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
1324 return Err(ZigError::Validation(msgs.join("; ")));
1325 }
1326
1327 let effective_user_prompt = if prompt_var.is_some() {
1329 None
1330 } else {
1331 user_prompt
1332 };
1333
1334 let mut step_outputs: HashMap<String, String> = HashMap::new();
1335
1336 let wf_provider = workflow.workflow.provider.as_deref();
1337 let wf_model = workflow.workflow.model.as_deref();
1338
1339 let tiers = topological_sort(&workflow.steps)?;
1340
1341 eprintln!(
1342 "running workflow '{}' ({} steps in {} tiers)",
1343 workflow.workflow.name,
1344 workflow.steps.len(),
1345 tiers.len()
1346 );
1347
1348 let coordinator = match SessionWriter::create(
1353 &workflow.workflow.name,
1354 &workflow_path.to_string_lossy(),
1355 user_prompt,
1356 tiers.len(),
1357 ) {
1358 Ok(writer) => {
1359 eprintln!("zig session: {}", writer.session_id());
1360 Some(SessionCoordinator::start(writer))
1361 }
1362 Err(e) => {
1363 eprintln!("warning: failed to open zig session log: {e}");
1364 None
1365 }
1366 };
1367 let session_writer: Option<Arc<SessionWriter>> = coordinator.as_ref().map(|c| c.writer());
1368 let session_ref = session_writer.as_ref();
1369
1370 let mut iteration = 0;
1371 let mut pending_next: Option<String> = None;
1372
1373 loop {
1374 let tiers_to_run = if let Some(ref next_step) = pending_next {
1375 let remaining: Vec<Vec<&Step>> = tiers
1377 .iter()
1378 .map(|tier| {
1379 tier.iter()
1380 .filter(|s| s.name == *next_step)
1381 .copied()
1382 .collect::<Vec<_>>()
1383 })
1384 .filter(|tier| !tier.is_empty())
1385 .collect();
1386 pending_next = None;
1387 remaining
1388 } else if iteration == 0 {
1389 tiers.clone()
1390 } else {
1391 break;
1392 };
1393
1394 for (tier_index, tier) in tiers_to_run.iter().enumerate() {
1395 let (non_race, race_groups) = partition_tier(tier);
1396
1397 if let Some(w) = session_ref {
1398 let names: Vec<String> = tier.iter().map(|s| s.name.clone()).collect();
1399 let _ = w.tier_started(tier_index, names);
1400 }
1401
1402 if non_race.len() <= 1 {
1406 for step in &non_race {
1407 execute_sequential_step(
1408 step,
1409 &mut vars,
1410 effective_user_prompt,
1411 &mut step_outputs,
1412 &workflow.workflow.name,
1413 &mut pending_next,
1414 tier_index,
1415 session_ref,
1416 &workflow.roles,
1417 &resource_collector,
1418 &memory_collector,
1419 workflow_dir,
1420 wf_provider,
1421 wf_model,
1422 )?;
1423 }
1424 } else {
1425 execute_parallel_tier(
1426 &non_race,
1427 &mut vars,
1428 effective_user_prompt,
1429 &mut step_outputs,
1430 &workflow.workflow.name,
1431 &mut pending_next,
1432 tier_index,
1433 session_ref,
1434 &workflow.roles,
1435 &resource_collector,
1436 &memory_collector,
1437 workflow_dir,
1438 wf_provider,
1439 wf_model,
1440 )?;
1441 }
1442
1443 for (group_name, race_steps) in &race_groups {
1445 eprintln!(" starting race group '{group_name}'...");
1446
1447 let mut prompts = HashMap::new();
1449 let mut race_sps: HashMap<String, String> = HashMap::new();
1450 let mut active_steps: Vec<&Step> = Vec::new();
1451 for step in race_steps {
1452 if let Some(condition) = &step.condition {
1453 if !evaluate_condition(condition, &vars)? {
1454 eprintln!(
1455 " skipping '{}' (condition not met: {condition})",
1456 step.name
1457 );
1458 continue;
1459 }
1460 }
1461 let prompt =
1462 render_step_prompt(step, &vars, effective_user_prompt, &step_outputs);
1463 prompts.insert(step.name.clone(), prompt);
1464 if let Some(sp) = resolve_role_system_prompt(
1465 step,
1466 &workflow.roles,
1467 &resource_collector,
1468 &memory_collector,
1469 &vars,
1470 workflow_dir,
1471 &workflow.workflow.name,
1472 )? {
1473 race_sps.insert(step.name.clone(), sp);
1474 }
1475 active_steps.push(step);
1476 }
1477
1478 if active_steps.is_empty() {
1479 continue;
1480 }
1481
1482 match execute_race_group(
1483 &active_steps,
1484 &prompts,
1485 &race_sps,
1486 &workflow.workflow.name,
1487 tier_index,
1488 session_ref,
1489 wf_provider,
1490 wf_model,
1491 ) {
1492 Ok((winner_name, output)) => {
1493 if let Some(winner) = active_steps.iter().find(|s| s.name == winner_name) {
1495 if !winner.saves.is_empty() {
1496 let saved = extract_saves(&output, &winner.saves)?;
1497 for (k, v) in &saved {
1498 eprintln!(" saved {k} = {v}");
1499 }
1500 vars.extend(saved);
1501 }
1502 if winner.next.is_some() {
1503 pending_next = winner.next.clone();
1504 }
1505 }
1506 step_outputs.insert(winner_name.clone(), output);
1507 eprintln!(
1508 " completed race group '{group_name}' (winner: '{winner_name}')"
1509 );
1510 }
1511 Err(e) => return Err(e),
1512 }
1513 }
1514 }
1515
1516 iteration += 1;
1517 if pending_next.is_none() || iteration >= MAX_LOOP_ITERATIONS {
1518 if iteration >= MAX_LOOP_ITERATIONS {
1519 eprintln!("warning: reached maximum loop iterations ({MAX_LOOP_ITERATIONS})");
1520 }
1521 break;
1522 }
1523 }
1524
1525 eprintln!("workflow '{}' completed", workflow.workflow.name);
1526 if let Some(c) = coordinator {
1527 let _ = c.finish(SessionStatus::Success);
1528 }
1529 Ok(())
1530}
1531
1532fn zag_command_name(cmd: &Option<StepCommand>) -> &'static str {
1535 match cmd {
1536 None => "run",
1537 Some(StepCommand::Review) => "review",
1538 Some(StepCommand::Plan) => "plan",
1539 Some(StepCommand::Pipe) => "pipe",
1540 Some(StepCommand::Collect) => "collect",
1541 Some(StepCommand::Summary) => "summary",
1542 }
1543}
1544
1545fn prompt_preview(prompt: &str) -> String {
1547 const MAX: usize = 200;
1548 let collapsed: String = prompt
1549 .chars()
1550 .map(|c| if c == '\n' { ' ' } else { c })
1551 .collect();
1552 if collapsed.chars().count() <= MAX {
1553 collapsed
1554 } else {
1555 let truncated: String = collapsed.chars().take(MAX).collect();
1556 format!("{truncated}…")
1557 }
1558}
1559
1560#[cfg(test)]
1561#[path = "run_tests.rs"]
1562mod tests;