1use std::collections::HashMap;
2use std::io::{BufRead, BufReader, Write};
3use std::path::{Path, PathBuf};
4use std::process::Command;
5use std::sync::{Arc, Mutex};
6use std::thread;
7use std::time::Duration;
8
9use std::time::Instant;
10
11use crate::config::ZigConfig;
12use crate::error::ZigError;
13use crate::memory::{MemoryCollector, render_memory_block};
14use crate::resources::{ResourceCollector, render_system_block};
15use crate::session::{OutputStream, SessionCoordinator, SessionStatus, SessionWriter};
16use crate::workflow::model::{FailurePolicy, MemoryMode, Role, Step, StepCommand, Workflow};
17use crate::workflow::{parser, validate};
18
19const MAX_LOOP_ITERATIONS: usize = 100;
21
22pub fn run_workflow(
35 workflow_path: &str,
36 user_prompt: Option<&str>,
37 disable_resources: bool,
38 disable_memory: bool,
39) -> Result<(), ZigError> {
40 check_zag()?;
41
42 let path = resolve_workflow_path(workflow_path)?;
43 let (workflow, source) = parser::parse_workflow(&path)?;
44
45 if let Err(errors) = validate::validate(&workflow) {
46 let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
47 return Err(ZigError::Validation(msgs.join("; ")));
48 }
49
50 execute(
51 &workflow,
52 &path,
53 user_prompt,
54 source.dir(),
55 disable_resources,
56 disable_memory,
57 )
58}
59
60pub(crate) fn check_zag() -> Result<(), ZigError> {
62 let zag_available = Command::new("zag")
63 .arg("--version")
64 .output()
65 .is_ok_and(|o| o.status.success());
66
67 if !zag_available {
68 return Err(ZigError::Zag(
69 "zag is not installed or not in PATH. Install it from https://github.com/niclaslindstedt/zag".into(),
70 ));
71 }
72 Ok(())
73}
74
75pub fn resolve_workflow_path(workflow: &str) -> Result<PathBuf, ZigError> {
88 let mut candidates = vec![
89 PathBuf::from(workflow),
90 PathBuf::from(format!("{workflow}.zwf")),
91 PathBuf::from(format!("{workflow}.zwfz")),
92 ];
93
94 if let Some(local_dir) = crate::paths::cwd_workflows_dir() {
95 candidates.push(local_dir.join(workflow));
96 candidates.push(local_dir.join(format!("{workflow}.zwf")));
97 candidates.push(local_dir.join(format!("{workflow}.zwfz")));
98 }
99
100 if let Some(global_dir) = crate::paths::global_workflows_dir() {
101 candidates.push(global_dir.join(workflow));
102 candidates.push(global_dir.join(format!("{workflow}.zwf")));
103 candidates.push(global_dir.join(format!("{workflow}.zwfz")));
104 }
105
106 for candidate in &candidates {
107 if candidate.exists() {
108 return Ok(candidate.clone());
109 }
110 }
111
112 Err(ZigError::Io(format!(
113 "workflow not found: '{workflow}' (tried: {})",
114 candidates
115 .iter()
116 .map(|p| p.display().to_string())
117 .collect::<Vec<_>>()
118 .join(", ")
119 )))
120}
121
122fn topological_sort(steps: &[Step]) -> Result<Vec<Vec<&Step>>, ZigError> {
128 let step_index: HashMap<&str, usize> = steps
129 .iter()
130 .enumerate()
131 .map(|(i, s)| (s.name.as_str(), i))
132 .collect();
133
134 let mut in_degree = vec![0usize; steps.len()];
135 for (i, step) in steps.iter().enumerate() {
136 for dep in &step.depends_on {
137 if step_index.contains_key(dep.as_str()) {
138 in_degree[i] += 1;
139 }
140 }
141 }
142
143 let mut tiers = Vec::new();
144 let mut remaining = in_degree.clone();
145 let mut completed: Vec<bool> = vec![false; steps.len()];
146
147 loop {
148 let tier: Vec<usize> = (0..steps.len())
149 .filter(|&i| !completed[i] && remaining[i] == 0)
150 .collect();
151
152 if tier.is_empty() {
153 break;
154 }
155
156 for &i in &tier {
157 completed[i] = true;
158 }
159
160 for &i in &tier {
162 for (j, step) in steps.iter().enumerate() {
163 if !completed[j] && step.depends_on.contains(&steps[i].name) {
164 remaining[j] -= 1;
165 }
166 }
167 }
168
169 tiers.push(tier.iter().map(|&i| &steps[i]).collect());
170 }
171
172 let completed_count: usize = completed.iter().filter(|&&c| c).count();
173 if completed_count != steps.len() {
174 return Err(ZigError::Execution(
175 "could not resolve all steps — possible undetected cycle".into(),
176 ));
177 }
178
179 Ok(tiers)
180}
181
182fn substitute_vars(template: &str, vars: &HashMap<String, String>) -> String {
188 let mut result = String::with_capacity(template.len());
189 let mut rest = template;
190
191 while let Some(start) = rest.find("${") {
192 result.push_str(&rest[..start]);
193 let after_start = &rest[start + 2..];
194
195 if let Some(end) = after_start.find('}') {
196 let var_expr = &after_start[..end];
197 let mut parts = var_expr.splitn(2, '.');
198 let root = parts.next().unwrap_or(var_expr);
199
200 if let Some(value) = vars.get(root) {
201 if let Some(path) = parts.next() {
202 if let Ok(json) = serde_json::from_str::<serde_json::Value>(value) {
204 let resolved = json_path_lookup(&json, path);
205 result.push_str(&resolved);
206 } else {
207 result.push_str(value);
208 }
209 } else {
210 result.push_str(value);
211 }
212 } else {
213 result.push_str(&rest[start..start + 2 + end + 1]);
215 }
216
217 rest = &after_start[end + 1..];
218 } else {
219 result.push_str(&rest[start..]);
220 rest = "";
221 }
222 }
223
224 result.push_str(rest);
225 result
226}
227
228fn json_path_lookup(value: &serde_json::Value, path: &str) -> String {
230 let mut current = value;
231 for key in path.split('.') {
232 match current.get(key) {
233 Some(v) => current = v,
234 None => return format!("${{?.{path}}}"),
235 }
236 }
237 match current {
238 serde_json::Value::String(s) => s.clone(),
239 other => other.to_string(),
240 }
241}
242
243fn resolve_role_system_prompt(
260 step: &Step,
261 roles: &HashMap<String, Role>,
262 resources: &ResourceCollector,
263 memory: &MemoryCollector,
264 vars: &HashMap<String, String>,
265 workflow_dir: &Path,
266 workflow_name: &str,
267) -> Result<Option<String>, ZigError> {
268 let base_prompt: Option<String> = if let Some(ref sp) = step.system_prompt {
270 Some(substitute_vars(sp, vars))
271 } else if let Some(ref role_ref) = step.role {
272 let resolved_name = substitute_vars(role_ref, vars);
273 let role = roles.get(&resolved_name).ok_or_else(|| {
274 ZigError::Execution(format!(
275 "step '{}' references role '{}' which does not exist",
276 step.name, resolved_name
277 ))
278 })?;
279
280 let raw_prompt = if let Some(ref file_path) = role.system_prompt_file {
281 let full_path = workflow_dir.join(file_path);
282 Some(std::fs::read_to_string(&full_path).map_err(|e| {
283 ZigError::Execution(format!(
284 "failed to read system_prompt_file '{}' for role '{}': {e}",
285 full_path.display(),
286 resolved_name
287 ))
288 })?)
289 } else {
290 role.system_prompt.clone()
291 };
292
293 raw_prompt.map(|p| substitute_vars(&p, vars))
294 } else {
295 None
296 };
297
298 let set = resources.collect_for_step(&step.resources)?;
300 let resource_block = render_system_block(&set);
301
302 let memory_entries = memory.collect_for_step(step.memory.as_deref())?;
304 let memory_block = render_memory_block(&memory_entries, workflow_name, Some(&step.name));
305
306 let prefix = format!("{resource_block}{memory_block}");
307
308 match (prefix.is_empty(), base_prompt) {
309 (true, None) => Ok(None),
310 (true, Some(p)) => Ok(Some(p)),
311 (false, None) => Ok(Some(prefix.trim_end().to_string())),
312 (false, Some(p)) => Ok(Some(format!("{prefix}{p}"))),
313 }
314}
315
316fn load_file_defaults(
321 vars: &mut HashMap<String, String>,
322 declarations: &HashMap<String, crate::workflow::model::Variable>,
323 workflow_dir: &Path,
324) -> Result<(), ZigError> {
325 for (name, decl) in declarations {
326 if decl.default.is_none() {
327 if let Some(ref file_path) = decl.default_file {
328 let full_path = workflow_dir.join(file_path);
329 let content = std::fs::read_to_string(&full_path).map_err(|e| {
330 ZigError::Execution(format!(
331 "failed to read default_file '{}' for variable '{name}': {e}",
332 full_path.display()
333 ))
334 })?;
335 vars.insert(name.clone(), content);
336 }
337 }
338 }
339 Ok(())
340}
341
342fn evaluate_condition(condition: &str, vars: &HashMap<String, String>) -> Result<bool, ZigError> {
349 let condition = condition.trim();
350
351 let operators = ["<=", ">=", "!=", "==", "<", ">"];
353 for op in &operators {
354 if let Some(pos) = condition.find(op) {
355 let lhs = resolve_operand(condition[..pos].trim(), vars);
356 let rhs = resolve_operand(condition[pos + op.len()..].trim(), vars);
357 return Ok(compare(&lhs, &rhs, op));
358 }
359 }
360
361 let value = vars.get(condition).map(|s| s.as_str()).unwrap_or("");
363 Ok(is_truthy(value))
364}
365
366fn resolve_operand(token: &str, vars: &HashMap<String, String>) -> String {
371 if (token.starts_with('"') && token.ends_with('"'))
373 || (token.starts_with('\'') && token.ends_with('\''))
374 {
375 return token[1..token.len() - 1].to_string();
376 }
377 if let Some(val) = vars.get(token) {
379 return val.clone();
380 }
381 token.to_string()
383}
384
385fn compare(lhs: &str, rhs: &str, op: &str) -> bool {
388 if let (Ok(l), Ok(r)) = (lhs.parse::<f64>(), rhs.parse::<f64>()) {
389 return match op {
390 "==" => (l - r).abs() < f64::EPSILON,
391 "!=" => (l - r).abs() >= f64::EPSILON,
392 "<" => l < r,
393 ">" => l > r,
394 "<=" => l <= r,
395 ">=" => l >= r,
396 _ => false,
397 };
398 }
399 match op {
400 "==" => lhs == rhs,
401 "!=" => lhs != rhs,
402 "<" => lhs < rhs,
403 ">" => lhs > rhs,
404 "<=" => lhs <= rhs,
405 ">=" => lhs >= rhs,
406 _ => false,
407 }
408}
409
410fn is_truthy(value: &str) -> bool {
412 !value.is_empty() && value != "false" && value != "0"
413}
414
415fn render_step_prompt(
418 step: &Step,
419 vars: &HashMap<String, String>,
420 user_prompt: Option<&str>,
421 dependency_outputs: &HashMap<String, String>,
422) -> String {
423 let mut prompt = String::new();
424
425 if let Some(ctx) = user_prompt {
427 prompt.push_str(&format!("User context: {ctx}\n\n"));
428 }
429
430 if step.inject_context {
432 for dep in &step.depends_on {
433 if let Some(output) = dependency_outputs.get(dep) {
434 prompt.push_str(&format!("--- Output from '{dep}' ---\n{output}\n\n"));
435 }
436 }
437 }
438
439 prompt.push_str(&substitute_vars(&step.prompt, vars));
441
442 prompt
443}
444
445fn build_zag_args(
459 step: &Step,
460 prompt: &str,
461 workflow_name: &str,
462 model_override: Option<&str>,
463 rendered_system_prompt: Option<&str>,
464 workflow_provider: Option<&str>,
465 workflow_model: Option<&str>,
466) -> Vec<String> {
467 let session_name = |dep: &str| format!("zig-{workflow_name}-{dep}");
468
469 let (mut args, accepts_agent_args) = match &step.command {
471 None => (vec!["run".to_string(), prompt.to_string()], true),
472 Some(StepCommand::Review) => {
473 let mut a = vec!["review".to_string()];
474 if !prompt.is_empty() {
475 a.push(prompt.to_string());
476 }
477 if step.uncommitted {
478 a.push("--uncommitted".into());
479 }
480 if let Some(base) = &step.base {
481 a.extend(["--base".into(), base.clone()]);
482 }
483 if let Some(commit) = &step.commit {
484 a.extend(["--commit".into(), commit.clone()]);
485 }
486 if let Some(title) = &step.title {
487 a.extend(["--title".into(), title.clone()]);
488 }
489 (a, true)
490 }
491 Some(StepCommand::Plan) => {
492 let mut a = vec!["plan".to_string(), prompt.to_string()];
493 if let Some(output) = &step.plan_output {
494 a.extend(["-o".into(), output.clone()]);
495 }
496 if let Some(instructions) = &step.instructions {
497 a.extend(["--instructions".into(), instructions.clone()]);
498 }
499 (a, true)
500 }
501 Some(StepCommand::Pipe) => {
502 let mut a = vec!["pipe".to_string()];
503 for dep in &step.depends_on {
504 a.push(session_name(dep));
505 }
506 a.push("--".into());
507 a.push(prompt.to_string());
508 (a, true)
509 }
510 Some(StepCommand::Collect) => {
511 let mut a = vec!["collect".to_string()];
512 for dep in &step.depends_on {
513 a.push(session_name(dep));
514 }
515 (a, false)
516 }
517 Some(StepCommand::Summary) => {
518 let mut a = vec!["summary".to_string()];
519 for dep in &step.depends_on {
520 a.push(session_name(dep));
521 }
522 (a, false)
523 }
524 };
525
526 if accepts_agent_args {
529 let effective_provider = step.provider.as_deref().or(workflow_provider);
530 if let Some(provider) = effective_provider {
531 args.extend(["--provider".into(), provider.to_string()]);
532 }
533
534 let effective_model = model_override.or(step.model.as_deref()).or(workflow_model);
535 if let Some(model) = effective_model {
536 args.extend(["--model".into(), model.to_string()]);
537 }
538
539 if let Some(sp) = rendered_system_prompt {
540 args.extend(["--system-prompt".into(), sp.to_string()]);
541 }
542 if let Some(max_turns) = step.max_turns {
543 args.extend(["--max-turns".into(), max_turns.to_string()]);
544 }
545
546 if let Some(output) = &step.output {
548 args.extend(["-o".into(), output.clone()]);
549 } else if step.json {
550 args.push("--json".into());
551 }
552 if let Some(schema) = &step.json_schema {
553 args.extend(["--json-schema".into(), schema.clone()]);
554 }
555
556 if let Some(mcp_config) = &step.mcp_config {
557 args.extend(["--mcp-config".into(), mcp_config.clone()]);
558 }
559
560 if step.auto_approve {
562 args.push("--auto-approve".into());
563 }
564 if let Some(root) = &step.root {
565 args.extend(["--root".into(), root.clone()]);
566 }
567 for dir in &step.add_dirs {
568 args.extend(["--add-dir".into(), dir.clone()]);
569 }
570 for (key, value) in &step.env {
571 args.extend(["--env".into(), format!("{key}={value}")]);
572 }
573 for file in &step.files {
574 args.extend(["--file".into(), file.clone()]);
575 }
576
577 for ctx in &step.context {
579 args.extend(["--context".into(), ctx.clone()]);
580 }
581 if let Some(plan) = &step.plan {
582 args.extend(["--plan".into(), plan.clone()]);
583 }
584
585 if step.worktree {
587 args.push("--worktree".into());
588 }
589 if let Some(sandbox) = &step.sandbox {
590 args.extend(["--sandbox".into(), sandbox.clone()]);
591 }
592 }
593
594 let name = session_name(&step.name);
596 args.extend(["--name".into(), name]);
597
598 if !step.description.is_empty() {
599 args.extend(["--description".into(), step.description.clone()]);
600 }
601
602 args.extend(["--tag".into(), "zig-workflow".into()]);
603 for tag in &step.tags {
604 args.extend(["--tag".into(), tag.clone()]);
605 }
606
607 if let Some(timeout) = &step.timeout {
608 args.extend(["--timeout".into(), timeout.clone()]);
609 }
610
611 args
612}
613
614fn run_zag_streaming(
620 args: &[String],
621 step_name: &str,
622 prefix: Option<&str>,
623 session: Option<&Arc<SessionWriter>>,
624) -> Result<(std::process::ExitStatus, String), ZigError> {
625 let mut cmd = Command::new("zag");
626 cmd.args(args)
627 .stdout(std::process::Stdio::piped())
628 .stderr(std::process::Stdio::piped());
629
630 let mut child = cmd
631 .spawn()
632 .map_err(|e| ZigError::Zag(format!("failed to launch zag for step '{step_name}': {e}")))?;
633
634 let stdout = child.stdout.take().expect("stdout was piped");
635 let stderr = child.stderr.take().expect("stderr was piped");
636
637 let buffer = Arc::new(Mutex::new(String::new()));
638 let buffer_clone = Arc::clone(&buffer);
639 let prefix_stdout = prefix.map(String::from);
640 let prefix_stderr = prefix.map(String::from);
641 let session_stdout = session.cloned();
642 let session_stderr = session.cloned();
643 let step_name_stdout = step_name.to_string();
644 let step_name_stderr = step_name.to_string();
645
646 let stdout_thread = thread::spawn(move || {
647 let reader = BufReader::new(stdout);
648 let stderr_handle = std::io::stderr();
649 for line in reader.lines().map_while(Result::ok) {
650 if let Ok(mut buf) = buffer_clone.lock() {
651 buf.push_str(&line);
652 buf.push('\n');
653 }
654 if let Some(w) = &session_stdout {
655 let _ = w.step_output(&step_name_stdout, OutputStream::Stdout, &line);
656 }
657 let mut h = stderr_handle.lock();
658 let _ = match &prefix_stdout {
659 Some(p) => writeln!(h, "[{p}] {line}"),
660 None => writeln!(h, "{line}"),
661 };
662 }
663 });
664
665 let stderr_thread = thread::spawn(move || {
666 let reader = BufReader::new(stderr);
667 let stderr_handle = std::io::stderr();
668 for line in reader.lines().map_while(Result::ok) {
669 if let Some(w) = &session_stderr {
670 let _ = w.step_output(&step_name_stderr, OutputStream::Stderr, &line);
671 }
672 let mut h = stderr_handle.lock();
673 let _ = match &prefix_stderr {
674 Some(p) => writeln!(h, "[{p}] {line}"),
675 None => writeln!(h, "{line}"),
676 };
677 }
678 });
679
680 let status = child
681 .wait()
682 .map_err(|e| ZigError::Execution(format!("failed to wait for child: {e}")))?;
683
684 let _ = stdout_thread.join();
685 let _ = stderr_thread.join();
686
687 let captured = Arc::try_unwrap(buffer)
688 .map_err(|_| ZigError::Execution("buffer still shared after threads joined".into()))?
689 .into_inner()
690 .map_err(|_| ZigError::Execution("output buffer poisoned".into()))?;
691
692 Ok((status, captured))
693}
694
695#[allow(clippy::too_many_arguments)]
701fn execute_step(
702 step: &Step,
703 prompt: &str,
704 workflow_name: &str,
705 model_override: Option<&str>,
706 prefix: Option<&str>,
707 session: Option<&Arc<SessionWriter>>,
708 rendered_system_prompt: Option<&str>,
709 workflow_provider: Option<&str>,
710 workflow_model: Option<&str>,
711) -> Result<String, ZigError> {
712 let args = build_zag_args(
713 step,
714 prompt,
715 workflow_name,
716 model_override,
717 rendered_system_prompt,
718 workflow_provider,
719 workflow_model,
720 );
721 let (status, stdout) = run_zag_streaming(&args, &step.name, prefix, session)?;
722
723 if !status.success() {
724 return Err(ZigError::Execution(format!(
725 "step '{}' failed (exit {})",
726 step.name, status,
727 )));
728 }
729
730 Ok(stdout)
731}
732
733#[allow(clippy::too_many_arguments)]
738fn run_step_attempts(
739 step: &Step,
740 prompt: &str,
741 workflow_name: &str,
742 prefix: Option<&str>,
743 session: Option<&Arc<SessionWriter>>,
744 rendered_system_prompt: Option<&str>,
745 workflow_provider: Option<&str>,
746 workflow_model: Option<&str>,
747) -> Result<String, ZigError> {
748 let mut attempts = 0;
749 let max_attempts = if step.on_failure.as_ref() == Some(&FailurePolicy::Retry) {
750 step.max_retries.unwrap_or(1) + 1
751 } else {
752 1
753 };
754
755 loop {
756 attempts += 1;
757 let model_override = if attempts > 1 {
758 step.retry_model.as_deref()
759 } else {
760 None
761 };
762 match execute_step(
763 step,
764 prompt,
765 workflow_name,
766 model_override,
767 prefix,
768 session,
769 rendered_system_prompt,
770 workflow_provider,
771 workflow_model,
772 ) {
773 Ok(output) => return Ok(output),
774 Err(e) => {
775 if let Some(w) = session {
776 let _ = w.step_failed(&step.name, None, attempts, &e.to_string());
777 }
778 if attempts < max_attempts {
779 eprintln!(
780 " retry {}/{} for step '{}'",
781 attempts,
782 max_attempts - 1,
783 step.name
784 );
785 continue;
786 }
787 return Err(e);
788 }
789 }
790 }
791}
792
793fn extract_saves(
800 output: &str,
801 saves: &HashMap<String, String>,
802) -> Result<HashMap<String, String>, ZigError> {
803 let mut extracted = HashMap::new();
804
805 for (var_name, selector) in saves {
806 let value = if selector == "$" {
807 output.trim().to_string()
808 } else if let Some(path) = selector.strip_prefix("$.") {
809 let json: serde_json::Value = serde_json::from_str(output.trim()).map_err(|e| {
810 ZigError::Execution(format!(
811 "saves selector '{selector}' requires JSON output, but got parse error: {e}"
812 ))
813 })?;
814 json_path_lookup(&json, path)
815 } else {
816 output.trim().to_string()
817 };
818
819 extracted.insert(var_name.clone(), value);
820 }
821
822 Ok(extracted)
823}
824
825fn partition_tier<'a>(tier: &[&'a Step]) -> (Vec<&'a Step>, HashMap<String, Vec<&'a Step>>) {
830 let mut sequential = Vec::new();
831 let mut race_groups: HashMap<String, Vec<&'a Step>> = HashMap::new();
832
833 for step in tier {
834 if let Some(group) = &step.race_group {
835 race_groups.entry(group.clone()).or_default().push(step);
836 } else {
837 sequential.push(*step);
838 }
839 }
840
841 (sequential, race_groups)
842}
843
844fn spawn_step(
846 step: &Step,
847 prompt: &str,
848 workflow_name: &str,
849 rendered_system_prompt: Option<&str>,
850 workflow_provider: Option<&str>,
851 workflow_model: Option<&str>,
852) -> Result<std::process::Child, ZigError> {
853 let args = build_zag_args(
854 step,
855 prompt,
856 workflow_name,
857 None,
858 rendered_system_prompt,
859 workflow_provider,
860 workflow_model,
861 );
862 let mut cmd = Command::new("zag");
863 cmd.args(&args)
864 .stdout(std::process::Stdio::piped())
865 .stderr(std::process::Stdio::piped());
866
867 cmd.spawn()
868 .map_err(|e| ZigError::Zag(format!("failed to spawn zag for step '{}': {e}", step.name)))
869}
870
871#[allow(clippy::too_many_arguments)]
876fn execute_race_group(
877 steps: &[&Step],
878 prompts: &HashMap<String, String>,
879 system_prompts: &HashMap<String, String>,
880 workflow_name: &str,
881 tier_index: usize,
882 session: Option<&Arc<SessionWriter>>,
883 workflow_provider: Option<&str>,
884 workflow_model: Option<&str>,
885) -> Result<(String, String), ZigError> {
886 if let Some(w) = session {
887 for step in steps {
888 let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
889 let preview = prompts
890 .get(&step.name)
891 .map(|p| prompt_preview(p))
892 .unwrap_or_default();
893 let _ = w.step_started(
894 &step.name,
895 tier_index,
896 &zag_session_id,
897 zag_command_name(&step.command),
898 step.model.as_deref(),
899 &preview,
900 );
901 }
902 }
903 let race_started = Instant::now();
904 let mut children: Vec<(String, std::process::Child)> = Vec::new();
905
906 for step in steps {
907 let prompt = prompts.get(&step.name).ok_or_else(|| {
908 ZigError::Execution(format!("missing prompt for step '{}'", step.name))
909 })?;
910 eprintln!(" racing step '{}'...", step.name);
911 let rendered_sp = system_prompts.get(&step.name).map(|s| s.as_str());
912 let child = spawn_step(
913 step,
914 prompt,
915 workflow_name,
916 rendered_sp,
917 workflow_provider,
918 workflow_model,
919 )?;
920 children.push((step.name.clone(), child));
921 }
922
923 loop {
925 for i in 0..children.len() {
926 let status = children[i]
927 .1
928 .try_wait()
929 .map_err(|e| ZigError::Execution(format!("failed to poll child: {e}")))?;
930
931 if let Some(exit_status) = status {
932 let (winner_name, winner_child) = children.remove(i);
933
934 for (name, mut child) in children {
936 eprintln!(" cancelling step '{name}' (race lost)");
937 let _ = child.kill();
938 let _ = child.wait();
939 }
940
941 let elapsed = race_started.elapsed().as_millis() as u64;
942 if !exit_status.success() {
943 let stderr = winner_child
944 .stderr
945 .map(|mut s| {
946 let mut buf = String::new();
947 std::io::Read::read_to_string(&mut s, &mut buf).ok();
948 buf
949 })
950 .unwrap_or_default();
951 let err_msg = format!(
952 "race winner '{}' failed (exit {}): {}",
953 winner_name,
954 exit_status,
955 stderr.trim()
956 );
957 if let Some(w) = session {
958 let _ = w.step_failed(&winner_name, exit_status.code(), 1, &err_msg);
959 }
960 return Err(ZigError::Execution(err_msg));
961 }
962
963 let stdout = winner_child
964 .stdout
965 .map(|mut s| {
966 let mut buf = String::new();
967 std::io::Read::read_to_string(&mut s, &mut buf).ok();
968 buf
969 })
970 .unwrap_or_default();
971
972 eprintln!(" race won by '{winner_name}'");
973 if let Some(w) = session {
974 let _ = w.step_completed(&winner_name, 0, elapsed, Vec::new());
975 }
976 return Ok((winner_name, stdout));
977 }
978 }
979
980 std::thread::sleep(Duration::from_millis(100));
981 }
982}
983
984#[allow(clippy::too_many_arguments)]
986fn execute_sequential_step(
987 step: &Step,
988 vars: &mut HashMap<String, String>,
989 user_prompt: Option<&str>,
990 step_outputs: &mut HashMap<String, String>,
991 workflow_name: &str,
992 pending_next: &mut Option<String>,
993 tier_index: usize,
994 session: Option<&Arc<SessionWriter>>,
995 roles: &HashMap<String, Role>,
996 resources: &ResourceCollector,
997 memory: &MemoryCollector,
998 workflow_dir: &Path,
999 workflow_provider: Option<&str>,
1000 workflow_model: Option<&str>,
1001) -> Result<(), ZigError> {
1002 if let Some(condition) = &step.condition {
1003 if !evaluate_condition(condition, vars)? {
1004 eprintln!(
1005 " skipping '{}' (condition not met: {condition})",
1006 step.name
1007 );
1008 if let Some(w) = session {
1009 let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
1010 }
1011 return Ok(());
1012 }
1013 }
1014
1015 eprintln!(" running step '{}'...", step.name);
1016
1017 let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
1018 let rendered_sp = resolve_role_system_prompt(
1019 step,
1020 roles,
1021 resources,
1022 memory,
1023 vars,
1024 workflow_dir,
1025 workflow_name,
1026 )?;
1027 if let Some(w) = session {
1028 let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
1029 let _ = w.step_started(
1030 &step.name,
1031 tier_index,
1032 &zag_session_id,
1033 zag_command_name(&step.command),
1034 step.model.as_deref(),
1035 &prompt_preview(&prompt),
1036 );
1037 }
1038 let started = Instant::now();
1039 let result = run_step_attempts(
1040 step,
1041 &prompt,
1042 workflow_name,
1043 None,
1044 session,
1045 rendered_sp.as_deref(),
1046 workflow_provider,
1047 workflow_model,
1048 );
1049
1050 match result {
1051 Ok(output) => {
1052 let mut saved_keys: Vec<String> = Vec::new();
1053 if !step.saves.is_empty() {
1054 let saved = extract_saves(&output, &step.saves)?;
1055 for (k, v) in &saved {
1056 eprintln!(" saved {k} = {v}");
1057 saved_keys.push(k.clone());
1058 }
1059 vars.extend(saved);
1060 }
1061
1062 step_outputs.insert(step.name.clone(), output);
1063 eprintln!(" completed '{}'", step.name);
1064 if let Some(w) = session {
1065 let _ = w.step_completed(
1066 &step.name,
1067 0,
1068 started.elapsed().as_millis() as u64,
1069 saved_keys,
1070 );
1071 }
1072
1073 if step.next.is_some() {
1074 *pending_next = step.next.clone();
1075 }
1076 }
1077 Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1078 FailurePolicy::Fail => return Err(e),
1079 FailurePolicy::Continue => {
1080 eprintln!(" step '{}' failed (continuing): {e}", step.name);
1081 }
1082 FailurePolicy::Retry => {
1083 return Err(e);
1084 }
1085 },
1086 }
1087
1088 Ok(())
1089}
1090
1091#[allow(clippy::too_many_arguments)]
1100fn execute_parallel_tier(
1101 steps: &[&Step],
1102 vars: &mut HashMap<String, String>,
1103 user_prompt: Option<&str>,
1104 step_outputs: &mut HashMap<String, String>,
1105 workflow_name: &str,
1106 pending_next: &mut Option<String>,
1107 tier_index: usize,
1108 session: Option<&Arc<SessionWriter>>,
1109 roles: &HashMap<String, Role>,
1110 resources: &ResourceCollector,
1111 memory: &MemoryCollector,
1112 workflow_dir: &Path,
1113 workflow_provider: Option<&str>,
1114 workflow_model: Option<&str>,
1115) -> Result<(), ZigError> {
1116 let mut active: Vec<&Step> = Vec::new();
1119 let mut prompts: HashMap<String, String> = HashMap::new();
1120 let mut rendered_sps: HashMap<String, String> = HashMap::new();
1121 for step in steps {
1122 if let Some(condition) = &step.condition {
1123 if !evaluate_condition(condition, vars)? {
1124 eprintln!(
1125 " skipping '{}' (condition not met: {condition})",
1126 step.name
1127 );
1128 if let Some(w) = session {
1129 let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
1130 }
1131 continue;
1132 }
1133 }
1134 let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
1135 prompts.insert(step.name.clone(), prompt);
1136 if let Some(sp) = resolve_role_system_prompt(
1137 step,
1138 roles,
1139 resources,
1140 memory,
1141 vars,
1142 workflow_dir,
1143 workflow_name,
1144 )? {
1145 rendered_sps.insert(step.name.clone(), sp);
1146 }
1147 active.push(*step);
1148 }
1149
1150 if active.is_empty() {
1151 return Ok(());
1152 }
1153
1154 eprintln!(" running {} steps in parallel...", active.len());
1155
1156 let mut start_times: HashMap<String, Instant> = HashMap::new();
1157 let mut handles: Vec<thread::JoinHandle<(String, Result<String, ZigError>)>> = Vec::new();
1158 for step in &active {
1159 let step_clone: Step = (*step).clone();
1160 let prompt = prompts.remove(&step.name).unwrap_or_default();
1161 let rendered_sp = rendered_sps.remove(&step.name);
1162 let workflow_name_owned = workflow_name.to_string();
1163 let name = step.name.clone();
1164 eprintln!(" starting '{name}'...");
1165 if let Some(w) = session {
1166 let zag_session_id = format!("zig-{workflow_name}-{name}");
1167 let _ = w.step_started(
1168 &name,
1169 tier_index,
1170 &zag_session_id,
1171 zag_command_name(&step.command),
1172 step.model.as_deref(),
1173 &prompt_preview(&prompt),
1174 );
1175 }
1176 start_times.insert(name.clone(), Instant::now());
1177 let session_clone = session.cloned();
1178 let wf_provider = workflow_provider.map(String::from);
1179 let wf_model = workflow_model.map(String::from);
1180 let handle = thread::spawn(move || {
1181 let res = run_step_attempts(
1182 &step_clone,
1183 &prompt,
1184 &workflow_name_owned,
1185 Some(&name),
1186 session_clone.as_ref(),
1187 rendered_sp.as_deref(),
1188 wf_provider.as_deref(),
1189 wf_model.as_deref(),
1190 );
1191 (name, res)
1192 });
1193 handles.push(handle);
1194 }
1195
1196 let mut results: HashMap<String, Result<String, ZigError>> = HashMap::new();
1197 for handle in handles {
1198 match handle.join() {
1199 Ok((name, res)) => {
1200 results.insert(name, res);
1201 }
1202 Err(_) => {
1203 return Err(ZigError::Execution("parallel step thread panicked".into()));
1204 }
1205 }
1206 }
1207
1208 let mut errors: Vec<String> = Vec::new();
1210 for step in &active {
1211 let Some(res) = results.remove(&step.name) else {
1212 continue;
1213 };
1214 let elapsed = start_times
1215 .remove(&step.name)
1216 .map(|t| t.elapsed().as_millis() as u64)
1217 .unwrap_or(0);
1218 match res {
1219 Ok(output) => {
1220 let mut saved_keys: Vec<String> = Vec::new();
1221 if !step.saves.is_empty() {
1222 let saved = extract_saves(&output, &step.saves)?;
1223 for (k, v) in &saved {
1224 eprintln!(" saved {k} = {v}");
1225 saved_keys.push(k.clone());
1226 }
1227 vars.extend(saved);
1228 }
1229 step_outputs.insert(step.name.clone(), output);
1230 eprintln!(" completed '{}'", step.name);
1231 if let Some(w) = session {
1232 let _ = w.step_completed(&step.name, 0, elapsed, saved_keys);
1233 }
1234 if step.next.is_some() && pending_next.is_none() {
1235 *pending_next = step.next.clone();
1236 }
1237 }
1238 Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1239 FailurePolicy::Continue => {
1240 eprintln!(" step '{}' failed (continuing): {e}", step.name);
1241 }
1242 FailurePolicy::Fail | FailurePolicy::Retry => {
1243 errors.push(format!("'{}': {e}", step.name));
1244 }
1245 },
1246 }
1247 }
1248
1249 if !errors.is_empty() {
1250 return Err(ZigError::Execution(format!(
1251 "parallel step(s) failed: {}",
1252 errors.join("; ")
1253 )));
1254 }
1255
1256 Ok(())
1257}
1258
1259fn init_vars(workflow: &Workflow) -> HashMap<String, String> {
1262 let mut vars = HashMap::new();
1263 for (name, var) in &workflow.vars {
1264 let value = match &var.default {
1265 Some(toml::Value::String(s)) => s.clone(),
1266 Some(toml::Value::Integer(n)) => n.to_string(),
1267 Some(toml::Value::Float(f)) => f.to_string(),
1268 Some(toml::Value::Boolean(b)) => b.to_string(),
1269 Some(other) => other.to_string(),
1270 None => String::new(),
1271 };
1272 vars.insert(name.clone(), value);
1273 }
1274 vars
1275}
1276
1277fn execute(
1279 workflow: &Workflow,
1280 workflow_path: &std::path::Path,
1281 user_prompt: Option<&str>,
1282 workflow_dir: &Path,
1283 disable_resources: bool,
1284 disable_memory: bool,
1285) -> Result<(), ZigError> {
1286 let mut vars = init_vars(workflow);
1287
1288 let resource_collector = ResourceCollector::from_env(
1289 &workflow.workflow.name,
1290 &workflow.workflow.resources,
1291 workflow_dir,
1292 disable_resources,
1293 );
1294
1295 let config = ZigConfig::load();
1296 let workflow_memory_mode = MemoryMode::from_str_opt(workflow.workflow.memory.as_deref());
1297 let memory_collector = MemoryCollector::from_env(
1298 &workflow.workflow.name,
1299 workflow_memory_mode,
1300 &config,
1301 disable_memory,
1302 );
1303
1304 load_file_defaults(&mut vars, &workflow.vars, workflow_dir)?;
1306
1307 let prompt_var = workflow
1309 .vars
1310 .iter()
1311 .find(|(_, v)| v.from.as_deref() == Some("prompt"))
1312 .map(|(name, _)| name.clone());
1313
1314 if let Some(ref var_name) = prompt_var {
1315 if let Some(prompt) = user_prompt {
1316 vars.insert(var_name.clone(), prompt.to_string());
1317 }
1318 }
1319
1320 if let Err(errors) = validate::validate_var_values(&vars, &workflow.vars) {
1322 let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
1323 return Err(ZigError::Validation(msgs.join("; ")));
1324 }
1325
1326 let effective_user_prompt = if prompt_var.is_some() {
1328 None
1329 } else {
1330 user_prompt
1331 };
1332
1333 let mut step_outputs: HashMap<String, String> = HashMap::new();
1334
1335 let wf_provider = workflow.workflow.provider.as_deref();
1336 let wf_model = workflow.workflow.model.as_deref();
1337
1338 let tiers = topological_sort(&workflow.steps)?;
1339
1340 eprintln!(
1341 "running workflow '{}' ({} steps in {} tiers)",
1342 workflow.workflow.name,
1343 workflow.steps.len(),
1344 tiers.len()
1345 );
1346
1347 let coordinator = match SessionWriter::create(
1352 &workflow.workflow.name,
1353 &workflow_path.to_string_lossy(),
1354 user_prompt,
1355 tiers.len(),
1356 ) {
1357 Ok(writer) => {
1358 eprintln!("zig session: {}", writer.session_id());
1359 Some(SessionCoordinator::start(writer))
1360 }
1361 Err(e) => {
1362 eprintln!("warning: failed to open zig session log: {e}");
1363 None
1364 }
1365 };
1366 let session_writer: Option<Arc<SessionWriter>> = coordinator.as_ref().map(|c| c.writer());
1367 let session_ref = session_writer.as_ref();
1368
1369 let mut iteration = 0;
1370 let mut pending_next: Option<String> = None;
1371
1372 loop {
1373 let tiers_to_run = if let Some(ref next_step) = pending_next {
1374 let remaining: Vec<Vec<&Step>> = tiers
1376 .iter()
1377 .map(|tier| {
1378 tier.iter()
1379 .filter(|s| s.name == *next_step)
1380 .copied()
1381 .collect::<Vec<_>>()
1382 })
1383 .filter(|tier| !tier.is_empty())
1384 .collect();
1385 pending_next = None;
1386 remaining
1387 } else if iteration == 0 {
1388 tiers.clone()
1389 } else {
1390 break;
1391 };
1392
1393 for (tier_index, tier) in tiers_to_run.iter().enumerate() {
1394 let (non_race, race_groups) = partition_tier(tier);
1395
1396 if let Some(w) = session_ref {
1397 let names: Vec<String> = tier.iter().map(|s| s.name.clone()).collect();
1398 let _ = w.tier_started(tier_index, names);
1399 }
1400
1401 if non_race.len() <= 1 {
1405 for step in &non_race {
1406 execute_sequential_step(
1407 step,
1408 &mut vars,
1409 effective_user_prompt,
1410 &mut step_outputs,
1411 &workflow.workflow.name,
1412 &mut pending_next,
1413 tier_index,
1414 session_ref,
1415 &workflow.roles,
1416 &resource_collector,
1417 &memory_collector,
1418 workflow_dir,
1419 wf_provider,
1420 wf_model,
1421 )?;
1422 }
1423 } else {
1424 execute_parallel_tier(
1425 &non_race,
1426 &mut vars,
1427 effective_user_prompt,
1428 &mut step_outputs,
1429 &workflow.workflow.name,
1430 &mut pending_next,
1431 tier_index,
1432 session_ref,
1433 &workflow.roles,
1434 &resource_collector,
1435 &memory_collector,
1436 workflow_dir,
1437 wf_provider,
1438 wf_model,
1439 )?;
1440 }
1441
1442 for (group_name, race_steps) in &race_groups {
1444 eprintln!(" starting race group '{group_name}'...");
1445
1446 let mut prompts = HashMap::new();
1448 let mut race_sps: HashMap<String, String> = HashMap::new();
1449 let mut active_steps: Vec<&Step> = Vec::new();
1450 for step in race_steps {
1451 if let Some(condition) = &step.condition {
1452 if !evaluate_condition(condition, &vars)? {
1453 eprintln!(
1454 " skipping '{}' (condition not met: {condition})",
1455 step.name
1456 );
1457 continue;
1458 }
1459 }
1460 let prompt =
1461 render_step_prompt(step, &vars, effective_user_prompt, &step_outputs);
1462 prompts.insert(step.name.clone(), prompt);
1463 if let Some(sp) = resolve_role_system_prompt(
1464 step,
1465 &workflow.roles,
1466 &resource_collector,
1467 &memory_collector,
1468 &vars,
1469 workflow_dir,
1470 &workflow.workflow.name,
1471 )? {
1472 race_sps.insert(step.name.clone(), sp);
1473 }
1474 active_steps.push(step);
1475 }
1476
1477 if active_steps.is_empty() {
1478 continue;
1479 }
1480
1481 match execute_race_group(
1482 &active_steps,
1483 &prompts,
1484 &race_sps,
1485 &workflow.workflow.name,
1486 tier_index,
1487 session_ref,
1488 wf_provider,
1489 wf_model,
1490 ) {
1491 Ok((winner_name, output)) => {
1492 if let Some(winner) = active_steps.iter().find(|s| s.name == winner_name) {
1494 if !winner.saves.is_empty() {
1495 let saved = extract_saves(&output, &winner.saves)?;
1496 for (k, v) in &saved {
1497 eprintln!(" saved {k} = {v}");
1498 }
1499 vars.extend(saved);
1500 }
1501 if winner.next.is_some() {
1502 pending_next = winner.next.clone();
1503 }
1504 }
1505 step_outputs.insert(winner_name.clone(), output);
1506 eprintln!(
1507 " completed race group '{group_name}' (winner: '{winner_name}')"
1508 );
1509 }
1510 Err(e) => return Err(e),
1511 }
1512 }
1513 }
1514
1515 iteration += 1;
1516 if pending_next.is_none() || iteration >= MAX_LOOP_ITERATIONS {
1517 if iteration >= MAX_LOOP_ITERATIONS {
1518 eprintln!("warning: reached maximum loop iterations ({MAX_LOOP_ITERATIONS})");
1519 }
1520 break;
1521 }
1522 }
1523
1524 eprintln!("workflow '{}' completed", workflow.workflow.name);
1525 if let Some(c) = coordinator {
1526 let _ = c.finish(SessionStatus::Success);
1527 }
1528 Ok(())
1529}
1530
1531fn zag_command_name(cmd: &Option<StepCommand>) -> &'static str {
1534 match cmd {
1535 None => "run",
1536 Some(StepCommand::Review) => "review",
1537 Some(StepCommand::Plan) => "plan",
1538 Some(StepCommand::Pipe) => "pipe",
1539 Some(StepCommand::Collect) => "collect",
1540 Some(StepCommand::Summary) => "summary",
1541 }
1542}
1543
1544fn prompt_preview(prompt: &str) -> String {
1546 const MAX: usize = 200;
1547 let collapsed: String = prompt
1548 .chars()
1549 .map(|c| if c == '\n' { ' ' } else { c })
1550 .collect();
1551 if collapsed.chars().count() <= MAX {
1552 collapsed
1553 } else {
1554 let truncated: String = collapsed.chars().take(MAX).collect();
1555 format!("{truncated}…")
1556 }
1557}
1558
1559#[cfg(test)]
1560#[path = "run_tests.rs"]
1561mod tests;