1use std::collections::HashMap;
2use std::io::{BufRead, BufReader, Write};
3use std::path::{Path, PathBuf};
4use std::process::Command;
5use std::sync::{Arc, Mutex};
6use std::thread;
7use std::time::Duration;
8
9use std::time::Instant;
10
11use crate::config::ZigConfig;
12use crate::error::ZigError;
13use crate::memory::{MemoryCollector, render_memory_block};
14use crate::paths::expand_path;
15use crate::resources::{ResourceCollector, render_system_block};
16use crate::session::{OutputStream, SessionCoordinator, SessionStatus, SessionWriter};
17use crate::storage::{FilesystemBackend, StorageManager};
18use crate::workflow::model::{FailurePolicy, MemoryMode, Role, Step, StepCommand, Workflow};
19use crate::workflow::{parser, validate};
20
21const MAX_LOOP_ITERATIONS: usize = 100;
23
24pub fn run_workflow(
37 workflow_path: &str,
38 user_prompt: Option<&str>,
39 disable_resources: bool,
40 disable_memory: bool,
41) -> Result<(), ZigError> {
42 check_zag()?;
43
44 let path = resolve_workflow_path(workflow_path)?;
45 let (workflow, source) = parser::parse_workflow(&path)?;
46
47 if let Err(errors) = validate::validate(&workflow) {
48 let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
49 return Err(ZigError::Validation(msgs.join("; ")));
50 }
51
52 execute(
53 &workflow,
54 &path,
55 user_prompt,
56 source.dir(),
57 disable_resources,
58 disable_memory,
59 )
60}
61
62pub(crate) fn check_zag() -> Result<(), ZigError> {
64 let zag_available = Command::new("zag")
65 .arg("--version")
66 .output()
67 .is_ok_and(|o| o.status.success());
68
69 if !zag_available {
70 return Err(ZigError::Zag(
71 "zag is not installed or not in PATH. Install it from https://github.com/niclaslindstedt/zag".into(),
72 ));
73 }
74 Ok(())
75}
76
77pub fn resolve_workflow_path(workflow: &str) -> Result<PathBuf, ZigError> {
90 let mut candidates = vec![
91 PathBuf::from(workflow),
92 PathBuf::from(format!("{workflow}.zwf")),
93 PathBuf::from(format!("{workflow}.zwfz")),
94 ];
95
96 if let Some(local_dir) = crate::paths::cwd_workflows_dir() {
97 candidates.push(local_dir.join(workflow));
98 candidates.push(local_dir.join(format!("{workflow}.zwf")));
99 candidates.push(local_dir.join(format!("{workflow}.zwfz")));
100 }
101
102 if let Some(global_dir) = crate::paths::global_workflows_dir() {
103 candidates.push(global_dir.join(workflow));
104 candidates.push(global_dir.join(format!("{workflow}.zwf")));
105 candidates.push(global_dir.join(format!("{workflow}.zwfz")));
106 }
107
108 for candidate in &candidates {
109 if candidate.exists() {
110 return Ok(candidate.clone());
111 }
112 }
113
114 Err(ZigError::Io(format!(
115 "workflow not found: '{workflow}' (tried: {})",
116 candidates
117 .iter()
118 .map(|p| p.display().to_string())
119 .collect::<Vec<_>>()
120 .join(", ")
121 )))
122}
123
124fn topological_sort(steps: &[Step]) -> Result<Vec<Vec<&Step>>, ZigError> {
130 let step_index: HashMap<&str, usize> = steps
131 .iter()
132 .enumerate()
133 .map(|(i, s)| (s.name.as_str(), i))
134 .collect();
135
136 let mut in_degree = vec![0usize; steps.len()];
137 for (i, step) in steps.iter().enumerate() {
138 for dep in &step.depends_on {
139 if step_index.contains_key(dep.as_str()) {
140 in_degree[i] += 1;
141 }
142 }
143 }
144
145 let mut tiers = Vec::new();
146 let mut remaining = in_degree.clone();
147 let mut completed: Vec<bool> = vec![false; steps.len()];
148
149 loop {
150 let tier: Vec<usize> = (0..steps.len())
151 .filter(|&i| !completed[i] && remaining[i] == 0)
152 .collect();
153
154 if tier.is_empty() {
155 break;
156 }
157
158 for &i in &tier {
159 completed[i] = true;
160 }
161
162 for &i in &tier {
164 for (j, step) in steps.iter().enumerate() {
165 if !completed[j] && step.depends_on.contains(&steps[i].name) {
166 remaining[j] -= 1;
167 }
168 }
169 }
170
171 tiers.push(tier.iter().map(|&i| &steps[i]).collect());
172 }
173
174 let completed_count: usize = completed.iter().filter(|&&c| c).count();
175 if completed_count != steps.len() {
176 return Err(ZigError::Execution(
177 "could not resolve all steps — possible undetected cycle".into(),
178 ));
179 }
180
181 Ok(tiers)
182}
183
184fn substitute_vars(template: &str, vars: &HashMap<String, String>) -> String {
190 let mut result = String::with_capacity(template.len());
191 let mut rest = template;
192
193 while let Some(start) = rest.find("${") {
194 result.push_str(&rest[..start]);
195 let after_start = &rest[start + 2..];
196
197 if let Some(end) = after_start.find('}') {
198 let var_expr = &after_start[..end];
199 let mut parts = var_expr.splitn(2, '.');
200 let root = parts.next().unwrap_or(var_expr);
201
202 if let Some(value) = vars.get(root) {
203 if let Some(path) = parts.next() {
204 if let Ok(json) = serde_json::from_str::<serde_json::Value>(value) {
206 let resolved = json_path_lookup(&json, path);
207 result.push_str(&resolved);
208 } else {
209 result.push_str(value);
210 }
211 } else {
212 result.push_str(value);
213 }
214 } else {
215 result.push_str(&rest[start..start + 2 + end + 1]);
217 }
218
219 rest = &after_start[end + 1..];
220 } else {
221 result.push_str(&rest[start..]);
222 rest = "";
223 }
224 }
225
226 result.push_str(rest);
227 result
228}
229
230fn json_path_lookup(value: &serde_json::Value, path: &str) -> String {
232 let mut current = value;
233 for key in path.split('.') {
234 match current.get(key) {
235 Some(v) => current = v,
236 None => return format!("${{?.{path}}}"),
237 }
238 }
239 match current {
240 serde_json::Value::String(s) => s.clone(),
241 other => other.to_string(),
242 }
243}
244
245#[allow(clippy::too_many_arguments)]
262fn resolve_role_system_prompt(
263 step: &Step,
264 roles: &HashMap<String, Role>,
265 resources: &ResourceCollector,
266 memory: &MemoryCollector,
267 storage: &StorageManager,
268 vars: &HashMap<String, String>,
269 workflow_dir: &Path,
270 workflow_name: &str,
271) -> Result<Option<String>, ZigError> {
272 let base_prompt: Option<String> = if let Some(ref sp) = step.system_prompt {
274 Some(substitute_vars(sp, vars))
275 } else if let Some(ref role_ref) = step.role {
276 let resolved_name = substitute_vars(role_ref, vars);
277 let role = roles.get(&resolved_name).ok_or_else(|| {
278 ZigError::Execution(format!(
279 "step '{}' references role '{}' which does not exist",
280 step.name, resolved_name
281 ))
282 })?;
283
284 let raw_prompt = if let Some(ref file_path) = role.system_prompt_file {
285 let full_path = workflow_dir.join(expand_path(file_path));
286 Some(std::fs::read_to_string(&full_path).map_err(|e| {
287 ZigError::Execution(format!(
288 "failed to read system_prompt_file '{}' for role '{}': {e}",
289 full_path.display(),
290 resolved_name
291 ))
292 })?)
293 } else {
294 role.system_prompt.clone()
295 };
296
297 raw_prompt.map(|p| substitute_vars(&p, vars))
298 } else {
299 None
300 };
301
302 let set = resources.collect_for_step(&step.resources)?;
304 let resource_block = render_system_block(&set);
305
306 let memory_entries = memory.collect_for_step(step.memory.as_deref())?;
308 let memory_block = render_memory_block(&memory_entries, workflow_name, Some(&step.name));
309
310 let storage_block = match storage.render_block(step.storage.as_deref())? {
312 Some(mut s) => {
313 s.push('\n');
314 s
315 }
316 None => String::new(),
317 };
318
319 let prefix = format!("{resource_block}{memory_block}{storage_block}");
320
321 match (prefix.is_empty(), base_prompt) {
322 (true, None) => Ok(None),
323 (true, Some(p)) => Ok(Some(p)),
324 (false, None) => Ok(Some(prefix.trim_end().to_string())),
325 (false, Some(p)) => Ok(Some(format!("{prefix}{p}"))),
326 }
327}
328
329fn load_file_defaults(
334 vars: &mut HashMap<String, String>,
335 declarations: &HashMap<String, crate::workflow::model::Variable>,
336 workflow_dir: &Path,
337) -> Result<(), ZigError> {
338 for (name, decl) in declarations {
339 if decl.default.is_none() {
340 if let Some(ref file_path) = decl.default_file {
341 let full_path = workflow_dir.join(expand_path(file_path));
342 let content = std::fs::read_to_string(&full_path).map_err(|e| {
343 ZigError::Execution(format!(
344 "failed to read default_file '{}' for variable '{name}': {e}",
345 full_path.display()
346 ))
347 })?;
348 vars.insert(name.clone(), content);
349 }
350 }
351 }
352 Ok(())
353}
354
355fn evaluate_condition(condition: &str, vars: &HashMap<String, String>) -> Result<bool, ZigError> {
362 let condition = condition.trim();
363
364 let operators = ["<=", ">=", "!=", "==", "<", ">"];
366 for op in &operators {
367 if let Some(pos) = condition.find(op) {
368 let lhs = resolve_operand(condition[..pos].trim(), vars);
369 let rhs = resolve_operand(condition[pos + op.len()..].trim(), vars);
370 return Ok(compare(&lhs, &rhs, op));
371 }
372 }
373
374 let value = vars.get(condition).map(|s| s.as_str()).unwrap_or("");
376 Ok(is_truthy(value))
377}
378
379fn resolve_operand(token: &str, vars: &HashMap<String, String>) -> String {
384 if (token.starts_with('"') && token.ends_with('"'))
386 || (token.starts_with('\'') && token.ends_with('\''))
387 {
388 return token[1..token.len() - 1].to_string();
389 }
390 if let Some(val) = vars.get(token) {
392 return val.clone();
393 }
394 token.to_string()
396}
397
398fn compare(lhs: &str, rhs: &str, op: &str) -> bool {
401 if let (Ok(l), Ok(r)) = (lhs.parse::<f64>(), rhs.parse::<f64>()) {
402 return match op {
403 "==" => (l - r).abs() < f64::EPSILON,
404 "!=" => (l - r).abs() >= f64::EPSILON,
405 "<" => l < r,
406 ">" => l > r,
407 "<=" => l <= r,
408 ">=" => l >= r,
409 _ => false,
410 };
411 }
412 match op {
413 "==" => lhs == rhs,
414 "!=" => lhs != rhs,
415 "<" => lhs < rhs,
416 ">" => lhs > rhs,
417 "<=" => lhs <= rhs,
418 ">=" => lhs >= rhs,
419 _ => false,
420 }
421}
422
423fn is_truthy(value: &str) -> bool {
425 !value.is_empty() && value != "false" && value != "0"
426}
427
428fn render_step_prompt(
431 step: &Step,
432 vars: &HashMap<String, String>,
433 user_prompt: Option<&str>,
434 dependency_outputs: &HashMap<String, String>,
435) -> String {
436 let mut prompt = String::new();
437
438 if let Some(ctx) = user_prompt {
440 prompt.push_str(&format!("User context: {ctx}\n\n"));
441 }
442
443 if step.inject_context {
445 for dep in &step.depends_on {
446 if let Some(output) = dependency_outputs.get(dep) {
447 prompt.push_str(&format!("--- Output from '{dep}' ---\n{output}\n\n"));
448 }
449 }
450 }
451
452 prompt.push_str(&substitute_vars(&step.prompt, vars));
454
455 prompt
456}
457
458#[allow(clippy::too_many_arguments)]
472fn build_zag_args(
473 step: &Step,
474 prompt: &str,
475 workflow_name: &str,
476 model_override: Option<&str>,
477 rendered_system_prompt: Option<&str>,
478 workflow_provider: Option<&str>,
479 workflow_model: Option<&str>,
480 extra_add_dirs: &[std::path::PathBuf],
481) -> Vec<String> {
482 let session_name = |dep: &str| format!("zig-{workflow_name}-{dep}");
483
484 let (mut args, accepts_agent_args) = match &step.command {
486 None => (vec!["run".to_string(), prompt.to_string()], true),
487 Some(StepCommand::Review) => {
488 let mut a = vec!["review".to_string()];
489 if !prompt.is_empty() {
490 a.push(prompt.to_string());
491 }
492 if step.uncommitted {
493 a.push("--uncommitted".into());
494 }
495 if let Some(base) = &step.base {
496 a.extend(["--base".into(), base.clone()]);
497 }
498 if let Some(commit) = &step.commit {
499 a.extend(["--commit".into(), commit.clone()]);
500 }
501 if let Some(title) = &step.title {
502 a.extend(["--title".into(), title.clone()]);
503 }
504 (a, true)
505 }
506 Some(StepCommand::Plan) => {
507 let mut a = vec!["plan".to_string(), prompt.to_string()];
508 if let Some(output) = &step.plan_output {
509 a.extend(["-o".into(), expand_path(output)]);
510 }
511 if let Some(instructions) = &step.instructions {
512 a.extend(["--instructions".into(), instructions.clone()]);
513 }
514 (a, true)
515 }
516 Some(StepCommand::Pipe) => {
517 let mut a = vec!["pipe".to_string()];
518 for dep in &step.depends_on {
519 a.push(session_name(dep));
520 }
521 a.push("--".into());
522 a.push(prompt.to_string());
523 (a, true)
524 }
525 Some(StepCommand::Collect) => {
526 let mut a = vec!["collect".to_string()];
527 for dep in &step.depends_on {
528 a.push(session_name(dep));
529 }
530 (a, false)
531 }
532 Some(StepCommand::Summary) => {
533 let mut a = vec!["summary".to_string()];
534 for dep in &step.depends_on {
535 a.push(session_name(dep));
536 }
537 (a, false)
538 }
539 };
540
541 if accepts_agent_args {
544 let effective_provider = step.provider.as_deref().or(workflow_provider);
545 if let Some(provider) = effective_provider {
546 args.extend(["--provider".into(), provider.to_string()]);
547 }
548
549 let effective_model = model_override.or(step.model.as_deref()).or(workflow_model);
550 if let Some(model) = effective_model {
551 args.extend(["--model".into(), model.to_string()]);
552 }
553
554 if let Some(sp) = rendered_system_prompt {
555 args.extend(["--system-prompt".into(), sp.to_string()]);
556 }
557 if let Some(max_turns) = step.max_turns {
558 args.extend(["--max-turns".into(), max_turns.to_string()]);
559 }
560
561 if let Some(output) = &step.output {
563 args.extend(["-o".into(), output.clone()]);
564 } else if step.json {
565 args.push("--json".into());
566 }
567 if let Some(schema) = &step.json_schema {
568 args.extend(["--json-schema".into(), schema.clone()]);
569 }
570
571 if let Some(mcp_config) = &step.mcp_config {
572 args.extend(["--mcp-config".into(), expand_path(mcp_config)]);
573 }
574
575 if step.auto_approve {
577 args.push("--auto-approve".into());
578 }
579 if let Some(root) = &step.root {
580 args.extend(["--root".into(), expand_path(root)]);
581 }
582 for dir in &step.add_dirs {
583 args.extend(["--add-dir".into(), expand_path(dir)]);
584 }
585 for dir in extra_add_dirs {
586 args.extend(["--add-dir".into(), dir.display().to_string()]);
587 }
588 for (key, value) in &step.env {
589 args.extend(["--env".into(), format!("{key}={value}")]);
590 }
591 for file in &step.files {
592 args.extend(["--file".into(), expand_path(file)]);
593 }
594
595 for ctx in &step.context {
597 args.extend(["--context".into(), ctx.clone()]);
598 }
599 if let Some(plan) = &step.plan {
600 args.extend(["--plan".into(), expand_path(plan)]);
601 }
602
603 if step.worktree {
605 args.push("--worktree".into());
606 }
607 if let Some(sandbox) = &step.sandbox {
608 args.extend(["--sandbox".into(), sandbox.clone()]);
609 }
610 }
611
612 let name = session_name(&step.name);
614 args.extend(["--name".into(), name]);
615
616 if !step.description.is_empty() {
617 args.extend(["--description".into(), step.description.clone()]);
618 }
619
620 args.extend(["--tag".into(), "zig-workflow".into()]);
621 for tag in &step.tags {
622 args.extend(["--tag".into(), tag.clone()]);
623 }
624
625 if let Some(timeout) = &step.timeout {
626 args.extend(["--timeout".into(), timeout.clone()]);
627 }
628
629 args
630}
631
632fn run_zag_streaming(
638 args: &[String],
639 step_name: &str,
640 prefix: Option<&str>,
641 session: Option<&Arc<SessionWriter>>,
642) -> Result<(std::process::ExitStatus, String), ZigError> {
643 let mut cmd = Command::new("zag");
644 cmd.args(args)
645 .stdout(std::process::Stdio::piped())
646 .stderr(std::process::Stdio::piped());
647
648 let mut child = cmd
649 .spawn()
650 .map_err(|e| ZigError::Zag(format!("failed to launch zag for step '{step_name}': {e}")))?;
651
652 let stdout = child.stdout.take().expect("stdout was piped");
653 let stderr = child.stderr.take().expect("stderr was piped");
654
655 let buffer = Arc::new(Mutex::new(String::new()));
656 let buffer_clone = Arc::clone(&buffer);
657 let prefix_stdout = prefix.map(String::from);
658 let prefix_stderr = prefix.map(String::from);
659 let session_stdout = session.cloned();
660 let session_stderr = session.cloned();
661 let step_name_stdout = step_name.to_string();
662 let step_name_stderr = step_name.to_string();
663
664 let stdout_thread = thread::spawn(move || {
665 let reader = BufReader::new(stdout);
666 let stderr_handle = std::io::stderr();
667 for line in reader.lines().map_while(Result::ok) {
668 if let Ok(mut buf) = buffer_clone.lock() {
669 buf.push_str(&line);
670 buf.push('\n');
671 }
672 if let Some(w) = &session_stdout {
673 let _ = w.step_output(&step_name_stdout, OutputStream::Stdout, &line);
674 }
675 let mut h = stderr_handle.lock();
676 let _ = match &prefix_stdout {
677 Some(p) => writeln!(h, "[{p}] {line}"),
678 None => writeln!(h, "{line}"),
679 };
680 }
681 });
682
683 let stderr_thread = thread::spawn(move || {
684 let reader = BufReader::new(stderr);
685 let stderr_handle = std::io::stderr();
686 for line in reader.lines().map_while(Result::ok) {
687 if let Some(w) = &session_stderr {
688 let _ = w.step_output(&step_name_stderr, OutputStream::Stderr, &line);
689 }
690 let mut h = stderr_handle.lock();
691 let _ = match &prefix_stderr {
692 Some(p) => writeln!(h, "[{p}] {line}"),
693 None => writeln!(h, "{line}"),
694 };
695 }
696 });
697
698 let status = child
699 .wait()
700 .map_err(|e| ZigError::Execution(format!("failed to wait for child: {e}")))?;
701
702 let _ = stdout_thread.join();
703 let _ = stderr_thread.join();
704
705 let captured = Arc::try_unwrap(buffer)
706 .map_err(|_| ZigError::Execution("buffer still shared after threads joined".into()))?
707 .into_inner()
708 .map_err(|_| ZigError::Execution("output buffer poisoned".into()))?;
709
710 Ok((status, captured))
711}
712
713#[allow(clippy::too_many_arguments)]
719fn execute_step(
720 step: &Step,
721 prompt: &str,
722 workflow_name: &str,
723 model_override: Option<&str>,
724 prefix: Option<&str>,
725 session: Option<&Arc<SessionWriter>>,
726 rendered_system_prompt: Option<&str>,
727 workflow_provider: Option<&str>,
728 workflow_model: Option<&str>,
729 extra_add_dirs: &[std::path::PathBuf],
730) -> Result<String, ZigError> {
731 let args = build_zag_args(
732 step,
733 prompt,
734 workflow_name,
735 model_override,
736 rendered_system_prompt,
737 workflow_provider,
738 workflow_model,
739 extra_add_dirs,
740 );
741 let (status, stdout) = run_zag_streaming(&args, &step.name, prefix, session)?;
742
743 if !status.success() {
744 return Err(ZigError::Execution(format!(
745 "step '{}' failed (exit {})",
746 step.name, status,
747 )));
748 }
749
750 Ok(stdout)
751}
752
753#[allow(clippy::too_many_arguments)]
758fn run_step_attempts(
759 step: &Step,
760 prompt: &str,
761 workflow_name: &str,
762 prefix: Option<&str>,
763 session: Option<&Arc<SessionWriter>>,
764 rendered_system_prompt: Option<&str>,
765 workflow_provider: Option<&str>,
766 workflow_model: Option<&str>,
767 extra_add_dirs: &[std::path::PathBuf],
768) -> Result<String, ZigError> {
769 let mut attempts = 0;
770 let max_attempts = if step.on_failure.as_ref() == Some(&FailurePolicy::Retry) {
771 step.max_retries.unwrap_or(1) + 1
772 } else {
773 1
774 };
775
776 loop {
777 attempts += 1;
778 let model_override = if attempts > 1 {
779 step.retry_model.as_deref()
780 } else {
781 None
782 };
783 match execute_step(
784 step,
785 prompt,
786 workflow_name,
787 model_override,
788 prefix,
789 session,
790 rendered_system_prompt,
791 workflow_provider,
792 workflow_model,
793 extra_add_dirs,
794 ) {
795 Ok(output) => return Ok(output),
796 Err(e) => {
797 if let Some(w) = session {
798 let _ = w.step_failed(&step.name, None, attempts, &e.to_string());
799 }
800 if attempts < max_attempts {
801 eprintln!(
802 " retry {}/{} for step '{}'",
803 attempts,
804 max_attempts - 1,
805 step.name
806 );
807 continue;
808 }
809 return Err(e);
810 }
811 }
812 }
813}
814
815fn extract_saves(
822 output: &str,
823 saves: &HashMap<String, String>,
824) -> Result<HashMap<String, String>, ZigError> {
825 let mut extracted = HashMap::new();
826
827 for (var_name, selector) in saves {
828 let value = if selector == "$" {
829 output.trim().to_string()
830 } else if let Some(path) = selector.strip_prefix("$.") {
831 let json: serde_json::Value = serde_json::from_str(output.trim()).map_err(|e| {
832 ZigError::Execution(format!(
833 "saves selector '{selector}' requires JSON output, but got parse error: {e}"
834 ))
835 })?;
836 json_path_lookup(&json, path)
837 } else {
838 output.trim().to_string()
839 };
840
841 extracted.insert(var_name.clone(), value);
842 }
843
844 Ok(extracted)
845}
846
847fn partition_tier<'a>(tier: &[&'a Step]) -> (Vec<&'a Step>, HashMap<String, Vec<&'a Step>>) {
852 let mut sequential = Vec::new();
853 let mut race_groups: HashMap<String, Vec<&'a Step>> = HashMap::new();
854
855 for step in tier {
856 if let Some(group) = &step.race_group {
857 race_groups.entry(group.clone()).or_default().push(step);
858 } else {
859 sequential.push(*step);
860 }
861 }
862
863 (sequential, race_groups)
864}
865
866fn spawn_step(
868 step: &Step,
869 prompt: &str,
870 workflow_name: &str,
871 rendered_system_prompt: Option<&str>,
872 workflow_provider: Option<&str>,
873 workflow_model: Option<&str>,
874 extra_add_dirs: &[std::path::PathBuf],
875) -> Result<std::process::Child, ZigError> {
876 let args = build_zag_args(
877 step,
878 prompt,
879 workflow_name,
880 None,
881 rendered_system_prompt,
882 workflow_provider,
883 workflow_model,
884 extra_add_dirs,
885 );
886 let mut cmd = Command::new("zag");
887 cmd.args(&args)
888 .stdout(std::process::Stdio::piped())
889 .stderr(std::process::Stdio::piped());
890
891 cmd.spawn()
892 .map_err(|e| ZigError::Zag(format!("failed to spawn zag for step '{}': {e}", step.name)))
893}
894
895#[allow(clippy::too_many_arguments)]
900fn execute_race_group(
901 steps: &[&Step],
902 prompts: &HashMap<String, String>,
903 system_prompts: &HashMap<String, String>,
904 workflow_name: &str,
905 tier_index: usize,
906 session: Option<&Arc<SessionWriter>>,
907 workflow_provider: Option<&str>,
908 workflow_model: Option<&str>,
909 storage_dirs: &HashMap<String, Vec<std::path::PathBuf>>,
910) -> Result<(String, String), ZigError> {
911 if let Some(w) = session {
912 for step in steps {
913 let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
914 let preview = prompts
915 .get(&step.name)
916 .map(|p| prompt_preview(p))
917 .unwrap_or_default();
918 let _ = w.step_started(
919 &step.name,
920 tier_index,
921 &zag_session_id,
922 zag_command_name(&step.command),
923 step.model.as_deref(),
924 &preview,
925 );
926 }
927 }
928 let race_started = Instant::now();
929 let mut children: Vec<(String, std::process::Child)> = Vec::new();
930
931 for step in steps {
932 let prompt = prompts.get(&step.name).ok_or_else(|| {
933 ZigError::Execution(format!("missing prompt for step '{}'", step.name))
934 })?;
935 eprintln!(" racing step '{}'...", step.name);
936 let rendered_sp = system_prompts.get(&step.name).map(|s| s.as_str());
937 let empty: Vec<std::path::PathBuf> = Vec::new();
938 let extra_dirs = storage_dirs.get(&step.name).unwrap_or(&empty);
939 let child = spawn_step(
940 step,
941 prompt,
942 workflow_name,
943 rendered_sp,
944 workflow_provider,
945 workflow_model,
946 extra_dirs,
947 )?;
948 children.push((step.name.clone(), child));
949 }
950
951 loop {
953 for i in 0..children.len() {
954 let status = children[i]
955 .1
956 .try_wait()
957 .map_err(|e| ZigError::Execution(format!("failed to poll child: {e}")))?;
958
959 if let Some(exit_status) = status {
960 let (winner_name, winner_child) = children.remove(i);
961
962 for (name, mut child) in children {
964 eprintln!(" cancelling step '{name}' (race lost)");
965 let _ = child.kill();
966 let _ = child.wait();
967 }
968
969 let elapsed = race_started.elapsed().as_millis() as u64;
970 if !exit_status.success() {
971 let stderr = winner_child
972 .stderr
973 .map(|mut s| {
974 let mut buf = String::new();
975 std::io::Read::read_to_string(&mut s, &mut buf).ok();
976 buf
977 })
978 .unwrap_or_default();
979 let err_msg = format!(
980 "race winner '{}' failed (exit {}): {}",
981 winner_name,
982 exit_status,
983 stderr.trim()
984 );
985 if let Some(w) = session {
986 let _ = w.step_failed(&winner_name, exit_status.code(), 1, &err_msg);
987 }
988 return Err(ZigError::Execution(err_msg));
989 }
990
991 let stdout = winner_child
992 .stdout
993 .map(|mut s| {
994 let mut buf = String::new();
995 std::io::Read::read_to_string(&mut s, &mut buf).ok();
996 buf
997 })
998 .unwrap_or_default();
999
1000 eprintln!(" race won by '{winner_name}'");
1001 if let Some(w) = session {
1002 let _ = w.step_completed(&winner_name, 0, elapsed, Vec::new());
1003 }
1004 return Ok((winner_name, stdout));
1005 }
1006 }
1007
1008 std::thread::sleep(Duration::from_millis(100));
1009 }
1010}
1011
1012#[allow(clippy::too_many_arguments)]
1014fn execute_sequential_step(
1015 step: &Step,
1016 vars: &mut HashMap<String, String>,
1017 user_prompt: Option<&str>,
1018 step_outputs: &mut HashMap<String, String>,
1019 workflow_name: &str,
1020 pending_next: &mut Option<String>,
1021 tier_index: usize,
1022 session: Option<&Arc<SessionWriter>>,
1023 roles: &HashMap<String, Role>,
1024 resources: &ResourceCollector,
1025 memory: &MemoryCollector,
1026 storage: &StorageManager,
1027 workflow_dir: &Path,
1028 workflow_provider: Option<&str>,
1029 workflow_model: Option<&str>,
1030) -> Result<(), ZigError> {
1031 if let Some(condition) = &step.condition {
1032 if !evaluate_condition(condition, vars)? {
1033 eprintln!(
1034 " skipping '{}' (condition not met: {condition})",
1035 step.name
1036 );
1037 if let Some(w) = session {
1038 let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
1039 }
1040 return Ok(());
1041 }
1042 }
1043
1044 eprintln!(" running step '{}'...", step.name);
1045
1046 let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
1047 let rendered_sp = resolve_role_system_prompt(
1048 step,
1049 roles,
1050 resources,
1051 memory,
1052 storage,
1053 vars,
1054 workflow_dir,
1055 workflow_name,
1056 )?;
1057 let storage_dirs = storage.add_dirs_for_step(step.storage.as_deref());
1058 if let Some(w) = session {
1059 let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
1060 let _ = w.step_started(
1061 &step.name,
1062 tier_index,
1063 &zag_session_id,
1064 zag_command_name(&step.command),
1065 step.model.as_deref(),
1066 &prompt_preview(&prompt),
1067 );
1068 }
1069 let started = Instant::now();
1070 let result = run_step_attempts(
1071 step,
1072 &prompt,
1073 workflow_name,
1074 None,
1075 session,
1076 rendered_sp.as_deref(),
1077 workflow_provider,
1078 workflow_model,
1079 &storage_dirs,
1080 );
1081
1082 match result {
1083 Ok(output) => {
1084 let mut saved_keys: Vec<String> = Vec::new();
1085 if !step.saves.is_empty() {
1086 let saved = extract_saves(&output, &step.saves)?;
1087 for (k, v) in &saved {
1088 eprintln!(" saved {k} = {v}");
1089 saved_keys.push(k.clone());
1090 }
1091 vars.extend(saved);
1092 }
1093
1094 step_outputs.insert(step.name.clone(), output);
1095 eprintln!(" completed '{}'", step.name);
1096 if let Some(w) = session {
1097 let _ = w.step_completed(
1098 &step.name,
1099 0,
1100 started.elapsed().as_millis() as u64,
1101 saved_keys,
1102 );
1103 }
1104
1105 if step.next.is_some() {
1106 *pending_next = step.next.clone();
1107 }
1108 }
1109 Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1110 FailurePolicy::Fail => return Err(e),
1111 FailurePolicy::Continue => {
1112 eprintln!(" step '{}' failed (continuing): {e}", step.name);
1113 }
1114 FailurePolicy::Retry => {
1115 return Err(e);
1116 }
1117 },
1118 }
1119
1120 Ok(())
1121}
1122
1123#[allow(clippy::too_many_arguments)]
1132fn execute_parallel_tier(
1133 steps: &[&Step],
1134 vars: &mut HashMap<String, String>,
1135 user_prompt: Option<&str>,
1136 step_outputs: &mut HashMap<String, String>,
1137 workflow_name: &str,
1138 pending_next: &mut Option<String>,
1139 tier_index: usize,
1140 session: Option<&Arc<SessionWriter>>,
1141 roles: &HashMap<String, Role>,
1142 resources: &ResourceCollector,
1143 memory: &MemoryCollector,
1144 storage: &StorageManager,
1145 workflow_dir: &Path,
1146 workflow_provider: Option<&str>,
1147 workflow_model: Option<&str>,
1148) -> Result<(), ZigError> {
1149 let mut active: Vec<&Step> = Vec::new();
1152 let mut prompts: HashMap<String, String> = HashMap::new();
1153 let mut rendered_sps: HashMap<String, String> = HashMap::new();
1154 let mut storage_dirs_map: HashMap<String, Vec<std::path::PathBuf>> = HashMap::new();
1155 for step in steps {
1156 if let Some(condition) = &step.condition {
1157 if !evaluate_condition(condition, vars)? {
1158 eprintln!(
1159 " skipping '{}' (condition not met: {condition})",
1160 step.name
1161 );
1162 if let Some(w) = session {
1163 let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
1164 }
1165 continue;
1166 }
1167 }
1168 let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
1169 prompts.insert(step.name.clone(), prompt);
1170 if let Some(sp) = resolve_role_system_prompt(
1171 step,
1172 roles,
1173 resources,
1174 memory,
1175 storage,
1176 vars,
1177 workflow_dir,
1178 workflow_name,
1179 )? {
1180 rendered_sps.insert(step.name.clone(), sp);
1181 }
1182 storage_dirs_map.insert(
1183 step.name.clone(),
1184 storage.add_dirs_for_step(step.storage.as_deref()),
1185 );
1186 active.push(*step);
1187 }
1188
1189 if active.is_empty() {
1190 return Ok(());
1191 }
1192
1193 eprintln!(" running {} steps in parallel...", active.len());
1194
1195 let mut start_times: HashMap<String, Instant> = HashMap::new();
1196 let mut handles: Vec<thread::JoinHandle<(String, Result<String, ZigError>)>> = Vec::new();
1197 for step in &active {
1198 let step_clone: Step = (*step).clone();
1199 let prompt = prompts.remove(&step.name).unwrap_or_default();
1200 let rendered_sp = rendered_sps.remove(&step.name);
1201 let workflow_name_owned = workflow_name.to_string();
1202 let name = step.name.clone();
1203 eprintln!(" starting '{name}'...");
1204 if let Some(w) = session {
1205 let zag_session_id = format!("zig-{workflow_name}-{name}");
1206 let _ = w.step_started(
1207 &name,
1208 tier_index,
1209 &zag_session_id,
1210 zag_command_name(&step.command),
1211 step.model.as_deref(),
1212 &prompt_preview(&prompt),
1213 );
1214 }
1215 start_times.insert(name.clone(), Instant::now());
1216 let session_clone = session.cloned();
1217 let wf_provider = workflow_provider.map(String::from);
1218 let wf_model = workflow_model.map(String::from);
1219 let storage_dirs = storage_dirs_map.remove(&step.name).unwrap_or_default();
1220 let handle = thread::spawn(move || {
1221 let res = run_step_attempts(
1222 &step_clone,
1223 &prompt,
1224 &workflow_name_owned,
1225 Some(&name),
1226 session_clone.as_ref(),
1227 rendered_sp.as_deref(),
1228 wf_provider.as_deref(),
1229 wf_model.as_deref(),
1230 &storage_dirs,
1231 );
1232 (name, res)
1233 });
1234 handles.push(handle);
1235 }
1236
1237 let mut results: HashMap<String, Result<String, ZigError>> = HashMap::new();
1238 for handle in handles {
1239 match handle.join() {
1240 Ok((name, res)) => {
1241 results.insert(name, res);
1242 }
1243 Err(_) => {
1244 return Err(ZigError::Execution("parallel step thread panicked".into()));
1245 }
1246 }
1247 }
1248
1249 let mut errors: Vec<String> = Vec::new();
1251 for step in &active {
1252 let Some(res) = results.remove(&step.name) else {
1253 continue;
1254 };
1255 let elapsed = start_times
1256 .remove(&step.name)
1257 .map(|t| t.elapsed().as_millis() as u64)
1258 .unwrap_or(0);
1259 match res {
1260 Ok(output) => {
1261 let mut saved_keys: Vec<String> = Vec::new();
1262 if !step.saves.is_empty() {
1263 let saved = extract_saves(&output, &step.saves)?;
1264 for (k, v) in &saved {
1265 eprintln!(" saved {k} = {v}");
1266 saved_keys.push(k.clone());
1267 }
1268 vars.extend(saved);
1269 }
1270 step_outputs.insert(step.name.clone(), output);
1271 eprintln!(" completed '{}'", step.name);
1272 if let Some(w) = session {
1273 let _ = w.step_completed(&step.name, 0, elapsed, saved_keys);
1274 }
1275 if step.next.is_some() && pending_next.is_none() {
1276 *pending_next = step.next.clone();
1277 }
1278 }
1279 Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1280 FailurePolicy::Continue => {
1281 eprintln!(" step '{}' failed (continuing): {e}", step.name);
1282 }
1283 FailurePolicy::Fail | FailurePolicy::Retry => {
1284 errors.push(format!("'{}': {e}", step.name));
1285 }
1286 },
1287 }
1288 }
1289
1290 if !errors.is_empty() {
1291 return Err(ZigError::Execution(format!(
1292 "parallel step(s) failed: {}",
1293 errors.join("; ")
1294 )));
1295 }
1296
1297 Ok(())
1298}
1299
1300fn init_vars(workflow: &Workflow) -> HashMap<String, String> {
1303 let mut vars = HashMap::new();
1304 for (name, var) in &workflow.vars {
1305 let value = match &var.default {
1306 Some(toml::Value::String(s)) => s.clone(),
1307 Some(toml::Value::Integer(n)) => n.to_string(),
1308 Some(toml::Value::Float(f)) => f.to_string(),
1309 Some(toml::Value::Boolean(b)) => b.to_string(),
1310 Some(other) => other.to_string(),
1311 None => String::new(),
1312 };
1313 vars.insert(name.clone(), value);
1314 }
1315 vars
1316}
1317
1318fn execute(
1320 workflow: &Workflow,
1321 workflow_path: &std::path::Path,
1322 user_prompt: Option<&str>,
1323 workflow_dir: &Path,
1324 disable_resources: bool,
1325 disable_memory: bool,
1326) -> Result<(), ZigError> {
1327 let mut vars = init_vars(workflow);
1328
1329 let resource_collector = ResourceCollector::from_env(
1330 &workflow.workflow.name,
1331 &workflow.workflow.resources,
1332 workflow_dir,
1333 disable_resources,
1334 );
1335
1336 let config = ZigConfig::load();
1337 let workflow_memory_mode = MemoryMode::from_str_opt(workflow.workflow.memory.as_deref());
1338 let memory_collector = MemoryCollector::from_env(
1339 &workflow.workflow.name,
1340 workflow_memory_mode,
1341 &config,
1342 disable_memory,
1343 );
1344
1345 let storage_manager = if workflow.storage.is_empty() {
1349 StorageManager::empty()
1350 } else {
1351 let backend = FilesystemBackend::from_cwd()?;
1352 StorageManager::build(&workflow.storage, backend)?
1353 };
1354
1355 load_file_defaults(&mut vars, &workflow.vars, workflow_dir)?;
1357
1358 let prompt_var = workflow
1360 .vars
1361 .iter()
1362 .find(|(_, v)| v.from.as_deref() == Some("prompt"))
1363 .map(|(name, _)| name.clone());
1364
1365 if let Some(ref var_name) = prompt_var {
1366 if let Some(prompt) = user_prompt {
1367 vars.insert(var_name.clone(), prompt.to_string());
1368 }
1369 }
1370
1371 if let Err(errors) = validate::validate_var_values(&vars, &workflow.vars) {
1373 let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
1374 return Err(ZigError::Validation(msgs.join("; ")));
1375 }
1376
1377 let effective_user_prompt = if prompt_var.is_some() {
1379 None
1380 } else {
1381 user_prompt
1382 };
1383
1384 let mut step_outputs: HashMap<String, String> = HashMap::new();
1385
1386 let wf_provider = workflow.workflow.provider.as_deref();
1387 let wf_model = workflow.workflow.model.as_deref();
1388
1389 let tiers = topological_sort(&workflow.steps)?;
1390
1391 eprintln!(
1392 "running workflow '{}' ({} steps in {} tiers)",
1393 workflow.workflow.name,
1394 workflow.steps.len(),
1395 tiers.len()
1396 );
1397
1398 let coordinator = match SessionWriter::create(
1403 &workflow.workflow.name,
1404 &workflow_path.to_string_lossy(),
1405 user_prompt,
1406 tiers.len(),
1407 ) {
1408 Ok(writer) => {
1409 eprintln!("zig session: {}", writer.session_id());
1410 Some(SessionCoordinator::start(writer))
1411 }
1412 Err(e) => {
1413 eprintln!("warning: failed to open zig session log: {e}");
1414 None
1415 }
1416 };
1417 let session_writer: Option<Arc<SessionWriter>> = coordinator.as_ref().map(|c| c.writer());
1418 let session_ref = session_writer.as_ref();
1419
1420 let mut iteration = 0;
1421 let mut pending_next: Option<String> = None;
1422
1423 loop {
1424 let tiers_to_run = if let Some(ref next_step) = pending_next {
1425 let remaining: Vec<Vec<&Step>> = tiers
1427 .iter()
1428 .map(|tier| {
1429 tier.iter()
1430 .filter(|s| s.name == *next_step)
1431 .copied()
1432 .collect::<Vec<_>>()
1433 })
1434 .filter(|tier| !tier.is_empty())
1435 .collect();
1436 pending_next = None;
1437 remaining
1438 } else if iteration == 0 {
1439 tiers.clone()
1440 } else {
1441 break;
1442 };
1443
1444 for (tier_index, tier) in tiers_to_run.iter().enumerate() {
1445 let (non_race, race_groups) = partition_tier(tier);
1446
1447 if let Some(w) = session_ref {
1448 let names: Vec<String> = tier.iter().map(|s| s.name.clone()).collect();
1449 let _ = w.tier_started(tier_index, names);
1450 }
1451
1452 if non_race.len() <= 1 {
1456 for step in &non_race {
1457 execute_sequential_step(
1458 step,
1459 &mut vars,
1460 effective_user_prompt,
1461 &mut step_outputs,
1462 &workflow.workflow.name,
1463 &mut pending_next,
1464 tier_index,
1465 session_ref,
1466 &workflow.roles,
1467 &resource_collector,
1468 &memory_collector,
1469 &storage_manager,
1470 workflow_dir,
1471 wf_provider,
1472 wf_model,
1473 )?;
1474 }
1475 } else {
1476 execute_parallel_tier(
1477 &non_race,
1478 &mut vars,
1479 effective_user_prompt,
1480 &mut step_outputs,
1481 &workflow.workflow.name,
1482 &mut pending_next,
1483 tier_index,
1484 session_ref,
1485 &workflow.roles,
1486 &resource_collector,
1487 &memory_collector,
1488 &storage_manager,
1489 workflow_dir,
1490 wf_provider,
1491 wf_model,
1492 )?;
1493 }
1494
1495 for (group_name, race_steps) in &race_groups {
1497 eprintln!(" starting race group '{group_name}'...");
1498
1499 let mut prompts = HashMap::new();
1501 let mut race_sps: HashMap<String, String> = HashMap::new();
1502 let mut race_storage_dirs: HashMap<String, Vec<std::path::PathBuf>> =
1503 HashMap::new();
1504 let mut active_steps: Vec<&Step> = Vec::new();
1505 for step in race_steps {
1506 if let Some(condition) = &step.condition {
1507 if !evaluate_condition(condition, &vars)? {
1508 eprintln!(
1509 " skipping '{}' (condition not met: {condition})",
1510 step.name
1511 );
1512 continue;
1513 }
1514 }
1515 let prompt =
1516 render_step_prompt(step, &vars, effective_user_prompt, &step_outputs);
1517 prompts.insert(step.name.clone(), prompt);
1518 if let Some(sp) = resolve_role_system_prompt(
1519 step,
1520 &workflow.roles,
1521 &resource_collector,
1522 &memory_collector,
1523 &storage_manager,
1524 &vars,
1525 workflow_dir,
1526 &workflow.workflow.name,
1527 )? {
1528 race_sps.insert(step.name.clone(), sp);
1529 }
1530 race_storage_dirs.insert(
1531 step.name.clone(),
1532 storage_manager.add_dirs_for_step(step.storage.as_deref()),
1533 );
1534 active_steps.push(step);
1535 }
1536
1537 if active_steps.is_empty() {
1538 continue;
1539 }
1540
1541 match execute_race_group(
1542 &active_steps,
1543 &prompts,
1544 &race_sps,
1545 &workflow.workflow.name,
1546 tier_index,
1547 session_ref,
1548 wf_provider,
1549 wf_model,
1550 &race_storage_dirs,
1551 ) {
1552 Ok((winner_name, output)) => {
1553 if let Some(winner) = active_steps.iter().find(|s| s.name == winner_name) {
1555 if !winner.saves.is_empty() {
1556 let saved = extract_saves(&output, &winner.saves)?;
1557 for (k, v) in &saved {
1558 eprintln!(" saved {k} = {v}");
1559 }
1560 vars.extend(saved);
1561 }
1562 if winner.next.is_some() {
1563 pending_next = winner.next.clone();
1564 }
1565 }
1566 step_outputs.insert(winner_name.clone(), output);
1567 eprintln!(
1568 " completed race group '{group_name}' (winner: '{winner_name}')"
1569 );
1570 }
1571 Err(e) => return Err(e),
1572 }
1573 }
1574 }
1575
1576 iteration += 1;
1577 if pending_next.is_none() || iteration >= MAX_LOOP_ITERATIONS {
1578 if iteration >= MAX_LOOP_ITERATIONS {
1579 eprintln!("warning: reached maximum loop iterations ({MAX_LOOP_ITERATIONS})");
1580 }
1581 break;
1582 }
1583 }
1584
1585 eprintln!("workflow '{}' completed", workflow.workflow.name);
1586 if let Some(c) = coordinator {
1587 let _ = c.finish(SessionStatus::Success);
1588 }
1589 Ok(())
1590}
1591
1592fn zag_command_name(cmd: &Option<StepCommand>) -> &'static str {
1595 match cmd {
1596 None => "run",
1597 Some(StepCommand::Review) => "review",
1598 Some(StepCommand::Plan) => "plan",
1599 Some(StepCommand::Pipe) => "pipe",
1600 Some(StepCommand::Collect) => "collect",
1601 Some(StepCommand::Summary) => "summary",
1602 }
1603}
1604
1605fn prompt_preview(prompt: &str) -> String {
1607 const MAX: usize = 200;
1608 let collapsed: String = prompt
1609 .chars()
1610 .map(|c| if c == '\n' { ' ' } else { c })
1611 .collect();
1612 if collapsed.chars().count() <= MAX {
1613 collapsed
1614 } else {
1615 let truncated: String = collapsed.chars().take(MAX).collect();
1616 format!("{truncated}…")
1617 }
1618}
1619
1620#[cfg(test)]
1621#[path = "run_tests.rs"]
1622mod tests;