1use std::collections::HashMap;
2use std::io::{BufRead, BufReader, Write};
3use std::path::{Path, PathBuf};
4use std::process::Command;
5use std::sync::{Arc, Mutex};
6use std::thread;
7use std::time::Duration;
8
9use std::time::Instant;
10
11use crate::config::ZigConfig;
12use crate::error::ZigError;
13use crate::memory::{MemoryCollector, render_memory_block};
14use crate::paths::expand_path;
15use crate::resources::{ResourceCollector, render_system_block};
16use crate::session::{OutputStream, SessionCoordinator, SessionStatus, SessionWriter};
17use crate::storage::{FilesystemBackend, StorageManager};
18use crate::workflow::model::{FailurePolicy, MemoryMode, Role, Step, StepCommand, Workflow};
19use crate::workflow::{parser, validate};
20
21const MAX_LOOP_ITERATIONS: usize = 100;
23
24pub fn run_workflow(
40 workflow_path: &str,
41 user_prompt: Option<&str>,
42 disable_resources: bool,
43 disable_memory: bool,
44 disable_storage: bool,
45) -> Result<(), ZigError> {
46 check_zag()?;
47
48 let path = resolve_workflow_path(workflow_path)?;
49 let (workflow, source) = parser::parse_workflow(&path)?;
50
51 if let Err(errors) = validate::validate(&workflow) {
52 let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
53 return Err(ZigError::Validation(msgs.join("; ")));
54 }
55
56 execute(
57 &workflow,
58 &path,
59 user_prompt,
60 source.dir(),
61 disable_resources,
62 disable_memory,
63 disable_storage,
64 )
65}
66
67pub(crate) fn check_zag() -> Result<(), ZigError> {
69 let zag_available = Command::new("zag")
70 .arg("--version")
71 .output()
72 .is_ok_and(|o| o.status.success());
73
74 if !zag_available {
75 return Err(ZigError::Zag(
76 "zag is not installed or not in PATH. Install it from https://github.com/niclaslindstedt/zag".into(),
77 ));
78 }
79 Ok(())
80}
81
82pub fn resolve_workflow_path(workflow: &str) -> Result<PathBuf, ZigError> {
95 let mut candidates = vec![
96 PathBuf::from(workflow),
97 PathBuf::from(format!("{workflow}.zwf")),
98 PathBuf::from(format!("{workflow}.zwfz")),
99 ];
100
101 if let Some(local_dir) = crate::paths::cwd_workflows_dir() {
102 candidates.push(local_dir.join(workflow));
103 candidates.push(local_dir.join(format!("{workflow}.zwf")));
104 candidates.push(local_dir.join(format!("{workflow}.zwfz")));
105 }
106
107 if let Some(global_dir) = crate::paths::global_workflows_dir() {
108 candidates.push(global_dir.join(workflow));
109 candidates.push(global_dir.join(format!("{workflow}.zwf")));
110 candidates.push(global_dir.join(format!("{workflow}.zwfz")));
111 }
112
113 for candidate in &candidates {
114 if candidate.exists() {
115 return Ok(candidate.clone());
116 }
117 }
118
119 Err(ZigError::Io(format!(
120 "workflow not found: '{workflow}' (tried: {})",
121 candidates
122 .iter()
123 .map(|p| p.display().to_string())
124 .collect::<Vec<_>>()
125 .join(", ")
126 )))
127}
128
129fn topological_sort(steps: &[Step]) -> Result<Vec<Vec<&Step>>, ZigError> {
135 let step_index: HashMap<&str, usize> = steps
136 .iter()
137 .enumerate()
138 .map(|(i, s)| (s.name.as_str(), i))
139 .collect();
140
141 let mut in_degree = vec![0usize; steps.len()];
142 for (i, step) in steps.iter().enumerate() {
143 for dep in &step.depends_on {
144 if step_index.contains_key(dep.as_str()) {
145 in_degree[i] += 1;
146 }
147 }
148 }
149
150 let mut tiers = Vec::new();
151 let mut remaining = in_degree.clone();
152 let mut completed: Vec<bool> = vec![false; steps.len()];
153
154 loop {
155 let tier: Vec<usize> = (0..steps.len())
156 .filter(|&i| !completed[i] && remaining[i] == 0)
157 .collect();
158
159 if tier.is_empty() {
160 break;
161 }
162
163 for &i in &tier {
164 completed[i] = true;
165 }
166
167 for &i in &tier {
169 for (j, step) in steps.iter().enumerate() {
170 if !completed[j] && step.depends_on.contains(&steps[i].name) {
171 remaining[j] -= 1;
172 }
173 }
174 }
175
176 tiers.push(tier.iter().map(|&i| &steps[i]).collect());
177 }
178
179 let completed_count: usize = completed.iter().filter(|&&c| c).count();
180 if completed_count != steps.len() {
181 return Err(ZigError::Execution(
182 "could not resolve all steps — possible undetected cycle".into(),
183 ));
184 }
185
186 Ok(tiers)
187}
188
189fn substitute_vars(template: &str, vars: &HashMap<String, String>) -> String {
195 let mut result = String::with_capacity(template.len());
196 let mut rest = template;
197
198 while let Some(start) = rest.find("${") {
199 result.push_str(&rest[..start]);
200 let after_start = &rest[start + 2..];
201
202 if let Some(end) = after_start.find('}') {
203 let var_expr = &after_start[..end];
204 let mut parts = var_expr.splitn(2, '.');
205 let root = parts.next().unwrap_or(var_expr);
206
207 if let Some(value) = vars.get(root) {
208 if let Some(path) = parts.next() {
209 if let Ok(json) = serde_json::from_str::<serde_json::Value>(value) {
211 let resolved = json_path_lookup(&json, path);
212 result.push_str(&resolved);
213 } else {
214 result.push_str(value);
215 }
216 } else {
217 result.push_str(value);
218 }
219 } else {
220 result.push_str(&rest[start..start + 2 + end + 1]);
222 }
223
224 rest = &after_start[end + 1..];
225 } else {
226 result.push_str(&rest[start..]);
227 rest = "";
228 }
229 }
230
231 result.push_str(rest);
232 result
233}
234
235fn json_path_lookup(value: &serde_json::Value, path: &str) -> String {
237 let mut current = value;
238 for key in path.split('.') {
239 match current.get(key) {
240 Some(v) => current = v,
241 None => return format!("${{?.{path}}}"),
242 }
243 }
244 match current {
245 serde_json::Value::String(s) => s.clone(),
246 other => other.to_string(),
247 }
248}
249
250#[allow(clippy::too_many_arguments)]
267fn resolve_role_system_prompt(
268 step: &Step,
269 roles: &HashMap<String, Role>,
270 resources: &ResourceCollector,
271 memory: &MemoryCollector,
272 storage: &StorageManager,
273 vars: &HashMap<String, String>,
274 workflow_dir: &Path,
275 workflow_name: &str,
276) -> Result<Option<String>, ZigError> {
277 let base_prompt: Option<String> = if let Some(ref sp) = step.system_prompt {
279 Some(substitute_vars(sp, vars))
280 } else if let Some(ref role_ref) = step.role {
281 let resolved_name = substitute_vars(role_ref, vars);
282 let role = roles.get(&resolved_name).ok_or_else(|| {
283 ZigError::Execution(format!(
284 "step '{}' references role '{}' which does not exist",
285 step.name, resolved_name
286 ))
287 })?;
288
289 let raw_prompt = if let Some(ref file_path) = role.system_prompt_file {
290 let full_path = workflow_dir.join(expand_path(file_path));
291 Some(std::fs::read_to_string(&full_path).map_err(|e| {
292 ZigError::Execution(format!(
293 "failed to read system_prompt_file '{}' for role '{}': {e}",
294 full_path.display(),
295 resolved_name
296 ))
297 })?)
298 } else {
299 role.system_prompt.clone()
300 };
301
302 raw_prompt.map(|p| substitute_vars(&p, vars))
303 } else {
304 None
305 };
306
307 let set = resources.collect_for_step(&step.resources)?;
309 let resource_block = render_system_block(&set);
310
311 let memory_entries = memory.collect_for_step(step.memory.as_deref())?;
313 let memory_block = render_memory_block(&memory_entries, workflow_name, Some(&step.name));
314
315 let storage_block = match storage.render_block(step.storage.as_deref())? {
317 Some(mut s) => {
318 s.push('\n');
319 s
320 }
321 None => String::new(),
322 };
323
324 let prefix = format!("{resource_block}{memory_block}{storage_block}");
325
326 match (prefix.is_empty(), base_prompt) {
327 (true, None) => Ok(None),
328 (true, Some(p)) => Ok(Some(p)),
329 (false, None) => Ok(Some(prefix.trim_end().to_string())),
330 (false, Some(p)) => Ok(Some(format!("{prefix}{p}"))),
331 }
332}
333
334fn load_file_defaults(
339 vars: &mut HashMap<String, String>,
340 declarations: &HashMap<String, crate::workflow::model::Variable>,
341 workflow_dir: &Path,
342) -> Result<(), ZigError> {
343 for (name, decl) in declarations {
344 if decl.default.is_none() {
345 if let Some(ref file_path) = decl.default_file {
346 let full_path = workflow_dir.join(expand_path(file_path));
347 let content = std::fs::read_to_string(&full_path).map_err(|e| {
348 ZigError::Execution(format!(
349 "failed to read default_file '{}' for variable '{name}': {e}",
350 full_path.display()
351 ))
352 })?;
353 vars.insert(name.clone(), content);
354 }
355 }
356 }
357 Ok(())
358}
359
360fn evaluate_condition(condition: &str, vars: &HashMap<String, String>) -> Result<bool, ZigError> {
367 let condition = condition.trim();
368
369 let operators = ["<=", ">=", "!=", "==", "<", ">"];
371 for op in &operators {
372 if let Some(pos) = condition.find(op) {
373 let lhs = resolve_operand(condition[..pos].trim(), vars);
374 let rhs = resolve_operand(condition[pos + op.len()..].trim(), vars);
375 return Ok(compare(&lhs, &rhs, op));
376 }
377 }
378
379 let value = vars.get(condition).map(|s| s.as_str()).unwrap_or("");
381 Ok(is_truthy(value))
382}
383
384fn resolve_operand(token: &str, vars: &HashMap<String, String>) -> String {
389 if (token.starts_with('"') && token.ends_with('"'))
391 || (token.starts_with('\'') && token.ends_with('\''))
392 {
393 return token[1..token.len() - 1].to_string();
394 }
395 if let Some(val) = vars.get(token) {
397 return val.clone();
398 }
399 token.to_string()
401}
402
403fn compare(lhs: &str, rhs: &str, op: &str) -> bool {
406 if let (Ok(l), Ok(r)) = (lhs.parse::<f64>(), rhs.parse::<f64>()) {
407 return match op {
408 "==" => (l - r).abs() < f64::EPSILON,
409 "!=" => (l - r).abs() >= f64::EPSILON,
410 "<" => l < r,
411 ">" => l > r,
412 "<=" => l <= r,
413 ">=" => l >= r,
414 _ => false,
415 };
416 }
417 match op {
418 "==" => lhs == rhs,
419 "!=" => lhs != rhs,
420 "<" => lhs < rhs,
421 ">" => lhs > rhs,
422 "<=" => lhs <= rhs,
423 ">=" => lhs >= rhs,
424 _ => false,
425 }
426}
427
428fn is_truthy(value: &str) -> bool {
430 !value.is_empty() && value != "false" && value != "0"
431}
432
433fn render_step_prompt(
436 step: &Step,
437 vars: &HashMap<String, String>,
438 user_prompt: Option<&str>,
439 dependency_outputs: &HashMap<String, String>,
440) -> String {
441 let mut prompt = String::new();
442
443 if let Some(ctx) = user_prompt {
445 prompt.push_str(&format!("User context: {ctx}\n\n"));
446 }
447
448 if step.inject_context {
450 for dep in &step.depends_on {
451 if let Some(output) = dependency_outputs.get(dep) {
452 prompt.push_str(&format!("--- Output from '{dep}' ---\n{output}\n\n"));
453 }
454 }
455 }
456
457 prompt.push_str(&substitute_vars(&step.prompt, vars));
459
460 prompt
461}
462
463#[allow(clippy::too_many_arguments)]
477fn build_zag_args(
478 step: &Step,
479 prompt: &str,
480 workflow_name: &str,
481 model_override: Option<&str>,
482 rendered_system_prompt: Option<&str>,
483 workflow_provider: Option<&str>,
484 workflow_model: Option<&str>,
485 extra_add_dirs: &[std::path::PathBuf],
486) -> Vec<String> {
487 let session_name = |dep: &str| format!("zig-{workflow_name}-{dep}");
488
489 let (mut args, accepts_agent_args) = match &step.command {
491 None => (vec!["run".to_string(), prompt.to_string()], true),
492 Some(StepCommand::Review) => {
493 let mut a = vec!["review".to_string()];
494 if !prompt.is_empty() {
495 a.push(prompt.to_string());
496 }
497 if step.uncommitted {
498 a.push("--uncommitted".into());
499 }
500 if let Some(base) = &step.base {
501 a.extend(["--base".into(), base.clone()]);
502 }
503 if let Some(commit) = &step.commit {
504 a.extend(["--commit".into(), commit.clone()]);
505 }
506 if let Some(title) = &step.title {
507 a.extend(["--title".into(), title.clone()]);
508 }
509 (a, true)
510 }
511 Some(StepCommand::Plan) => {
512 let mut a = vec!["plan".to_string(), prompt.to_string()];
513 if let Some(output) = &step.plan_output {
514 a.extend(["-o".into(), expand_path(output)]);
515 }
516 if let Some(instructions) = &step.instructions {
517 a.extend(["--instructions".into(), instructions.clone()]);
518 }
519 (a, true)
520 }
521 Some(StepCommand::Pipe) => {
522 let mut a = vec!["pipe".to_string()];
523 for dep in &step.depends_on {
524 a.push(session_name(dep));
525 }
526 a.push("--".into());
527 a.push(prompt.to_string());
528 (a, true)
529 }
530 Some(StepCommand::Collect) => {
531 let mut a = vec!["collect".to_string()];
532 for dep in &step.depends_on {
533 a.push(session_name(dep));
534 }
535 (a, false)
536 }
537 Some(StepCommand::Summary) => {
538 let mut a = vec!["summary".to_string()];
539 for dep in &step.depends_on {
540 a.push(session_name(dep));
541 }
542 (a, false)
543 }
544 };
545
546 if accepts_agent_args {
549 let effective_provider = step.provider.as_deref().or(workflow_provider);
550 if let Some(provider) = effective_provider {
551 args.extend(["--provider".into(), provider.to_string()]);
552 }
553
554 let effective_model = model_override.or(step.model.as_deref()).or(workflow_model);
555 if let Some(model) = effective_model {
556 args.extend(["--model".into(), model.to_string()]);
557 }
558
559 if let Some(sp) = rendered_system_prompt {
560 args.extend(["--system-prompt".into(), sp.to_string()]);
561 }
562 if let Some(max_turns) = step.max_turns {
563 args.extend(["--max-turns".into(), max_turns.to_string()]);
564 }
565
566 if let Some(output) = &step.output {
568 args.extend(["-o".into(), output.clone()]);
569 } else if step.json {
570 args.push("--json".into());
571 }
572 if let Some(schema) = &step.json_schema {
573 args.extend(["--json-schema".into(), schema.clone()]);
574 }
575
576 if let Some(mcp_config) = &step.mcp_config {
577 args.extend(["--mcp-config".into(), expand_path(mcp_config)]);
578 }
579
580 if step.auto_approve {
582 args.push("--auto-approve".into());
583 }
584 if let Some(root) = &step.root {
585 args.extend(["--root".into(), expand_path(root)]);
586 }
587 for dir in &step.add_dirs {
588 args.extend(["--add-dir".into(), expand_path(dir)]);
589 }
590 for dir in extra_add_dirs {
591 args.extend(["--add-dir".into(), dir.display().to_string()]);
592 }
593 for (key, value) in &step.env {
594 args.extend(["--env".into(), format!("{key}={value}")]);
595 }
596 for file in &step.files {
597 args.extend(["--file".into(), expand_path(file)]);
598 }
599
600 for ctx in &step.context {
602 args.extend(["--context".into(), ctx.clone()]);
603 }
604 if let Some(plan) = &step.plan {
605 args.extend(["--plan".into(), expand_path(plan)]);
606 }
607
608 if step.worktree {
610 args.push("--worktree".into());
611 }
612 if let Some(sandbox) = &step.sandbox {
613 args.extend(["--sandbox".into(), sandbox.clone()]);
614 }
615 }
616
617 let name = session_name(&step.name);
619 args.extend(["--name".into(), name]);
620
621 if !step.description.is_empty() {
622 args.extend(["--description".into(), step.description.clone()]);
623 }
624
625 args.extend(["--tag".into(), "zig-workflow".into()]);
626 for tag in &step.tags {
627 args.extend(["--tag".into(), tag.clone()]);
628 }
629
630 if let Some(timeout) = &step.timeout {
631 args.extend(["--timeout".into(), timeout.clone()]);
632 }
633
634 args
635}
636
637fn run_zag_streaming(
643 args: &[String],
644 step_name: &str,
645 prefix: Option<&str>,
646 session: Option<&Arc<SessionWriter>>,
647) -> Result<(std::process::ExitStatus, String), ZigError> {
648 let mut cmd = Command::new("zag");
649 cmd.args(args)
650 .stdout(std::process::Stdio::piped())
651 .stderr(std::process::Stdio::piped());
652
653 let mut child = cmd
654 .spawn()
655 .map_err(|e| ZigError::Zag(format!("failed to launch zag for step '{step_name}': {e}")))?;
656
657 let stdout = child.stdout.take().expect("stdout was piped");
658 let stderr = child.stderr.take().expect("stderr was piped");
659
660 let buffer = Arc::new(Mutex::new(String::new()));
661 let buffer_clone = Arc::clone(&buffer);
662 let prefix_stdout = prefix.map(String::from);
663 let prefix_stderr = prefix.map(String::from);
664 let session_stdout = session.cloned();
665 let session_stderr = session.cloned();
666 let step_name_stdout = step_name.to_string();
667 let step_name_stderr = step_name.to_string();
668
669 let stdout_thread = thread::spawn(move || {
670 let reader = BufReader::new(stdout);
671 let stderr_handle = std::io::stderr();
672 for line in reader.lines().map_while(Result::ok) {
673 if let Ok(mut buf) = buffer_clone.lock() {
674 buf.push_str(&line);
675 buf.push('\n');
676 }
677 if let Some(w) = &session_stdout {
678 let _ = w.step_output(&step_name_stdout, OutputStream::Stdout, &line);
679 }
680 let mut h = stderr_handle.lock();
681 let _ = match &prefix_stdout {
682 Some(p) => writeln!(h, "[{p}] {line}"),
683 None => writeln!(h, "{line}"),
684 };
685 }
686 });
687
688 let stderr_thread = thread::spawn(move || {
689 let reader = BufReader::new(stderr);
690 let stderr_handle = std::io::stderr();
691 for line in reader.lines().map_while(Result::ok) {
692 if let Some(w) = &session_stderr {
693 let _ = w.step_output(&step_name_stderr, OutputStream::Stderr, &line);
694 }
695 let mut h = stderr_handle.lock();
696 let _ = match &prefix_stderr {
697 Some(p) => writeln!(h, "[{p}] {line}"),
698 None => writeln!(h, "{line}"),
699 };
700 }
701 });
702
703 let status = child
704 .wait()
705 .map_err(|e| ZigError::Execution(format!("failed to wait for child: {e}")))?;
706
707 let _ = stdout_thread.join();
708 let _ = stderr_thread.join();
709
710 let captured = Arc::try_unwrap(buffer)
711 .map_err(|_| ZigError::Execution("buffer still shared after threads joined".into()))?
712 .into_inner()
713 .map_err(|_| ZigError::Execution("output buffer poisoned".into()))?;
714
715 Ok((status, captured))
716}
717
718#[allow(clippy::too_many_arguments)]
724fn execute_step(
725 step: &Step,
726 prompt: &str,
727 workflow_name: &str,
728 model_override: Option<&str>,
729 prefix: Option<&str>,
730 session: Option<&Arc<SessionWriter>>,
731 rendered_system_prompt: Option<&str>,
732 workflow_provider: Option<&str>,
733 workflow_model: Option<&str>,
734 extra_add_dirs: &[std::path::PathBuf],
735) -> Result<String, ZigError> {
736 let args = build_zag_args(
737 step,
738 prompt,
739 workflow_name,
740 model_override,
741 rendered_system_prompt,
742 workflow_provider,
743 workflow_model,
744 extra_add_dirs,
745 );
746 let (status, stdout) = run_zag_streaming(&args, &step.name, prefix, session)?;
747
748 if !status.success() {
749 return Err(ZigError::Execution(format!(
750 "step '{}' failed (exit {})",
751 step.name, status,
752 )));
753 }
754
755 Ok(stdout)
756}
757
758#[allow(clippy::too_many_arguments)]
763fn run_step_attempts(
764 step: &Step,
765 prompt: &str,
766 workflow_name: &str,
767 prefix: Option<&str>,
768 session: Option<&Arc<SessionWriter>>,
769 rendered_system_prompt: Option<&str>,
770 workflow_provider: Option<&str>,
771 workflow_model: Option<&str>,
772 extra_add_dirs: &[std::path::PathBuf],
773) -> Result<String, ZigError> {
774 let mut attempts = 0;
775 let max_attempts = if step.on_failure.as_ref() == Some(&FailurePolicy::Retry) {
776 step.max_retries.unwrap_or(1) + 1
777 } else {
778 1
779 };
780
781 loop {
782 attempts += 1;
783 let model_override = if attempts > 1 {
784 step.retry_model.as_deref()
785 } else {
786 None
787 };
788 match execute_step(
789 step,
790 prompt,
791 workflow_name,
792 model_override,
793 prefix,
794 session,
795 rendered_system_prompt,
796 workflow_provider,
797 workflow_model,
798 extra_add_dirs,
799 ) {
800 Ok(output) => return Ok(output),
801 Err(e) => {
802 if let Some(w) = session {
803 let _ = w.step_failed(&step.name, None, attempts, &e.to_string());
804 }
805 if attempts < max_attempts {
806 eprintln!(
807 " retry {}/{} for step '{}'",
808 attempts,
809 max_attempts - 1,
810 step.name
811 );
812 continue;
813 }
814 return Err(e);
815 }
816 }
817 }
818}
819
820fn extract_saves(
827 output: &str,
828 saves: &HashMap<String, String>,
829) -> Result<HashMap<String, String>, ZigError> {
830 let mut extracted = HashMap::new();
831
832 for (var_name, selector) in saves {
833 let value = if selector == "$" {
834 output.trim().to_string()
835 } else if let Some(path) = selector.strip_prefix("$.") {
836 let json: serde_json::Value = serde_json::from_str(output.trim()).map_err(|e| {
837 ZigError::Execution(format!(
838 "saves selector '{selector}' requires JSON output, but got parse error: {e}"
839 ))
840 })?;
841 json_path_lookup(&json, path)
842 } else {
843 output.trim().to_string()
844 };
845
846 extracted.insert(var_name.clone(), value);
847 }
848
849 Ok(extracted)
850}
851
852fn partition_tier<'a>(tier: &[&'a Step]) -> (Vec<&'a Step>, HashMap<String, Vec<&'a Step>>) {
857 let mut sequential = Vec::new();
858 let mut race_groups: HashMap<String, Vec<&'a Step>> = HashMap::new();
859
860 for step in tier {
861 if let Some(group) = &step.race_group {
862 race_groups.entry(group.clone()).or_default().push(step);
863 } else {
864 sequential.push(*step);
865 }
866 }
867
868 (sequential, race_groups)
869}
870
871fn spawn_step(
873 step: &Step,
874 prompt: &str,
875 workflow_name: &str,
876 rendered_system_prompt: Option<&str>,
877 workflow_provider: Option<&str>,
878 workflow_model: Option<&str>,
879 extra_add_dirs: &[std::path::PathBuf],
880) -> Result<std::process::Child, ZigError> {
881 let args = build_zag_args(
882 step,
883 prompt,
884 workflow_name,
885 None,
886 rendered_system_prompt,
887 workflow_provider,
888 workflow_model,
889 extra_add_dirs,
890 );
891 let mut cmd = Command::new("zag");
892 cmd.args(&args)
893 .stdout(std::process::Stdio::piped())
894 .stderr(std::process::Stdio::piped());
895
896 cmd.spawn()
897 .map_err(|e| ZigError::Zag(format!("failed to spawn zag for step '{}': {e}", step.name)))
898}
899
900#[allow(clippy::too_many_arguments)]
905fn execute_race_group(
906 steps: &[&Step],
907 prompts: &HashMap<String, String>,
908 system_prompts: &HashMap<String, String>,
909 workflow_name: &str,
910 tier_index: usize,
911 session: Option<&Arc<SessionWriter>>,
912 workflow_provider: Option<&str>,
913 workflow_model: Option<&str>,
914 storage_dirs: &HashMap<String, Vec<std::path::PathBuf>>,
915) -> Result<(String, String), ZigError> {
916 if let Some(w) = session {
917 for step in steps {
918 let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
919 let preview = prompts
920 .get(&step.name)
921 .map(|p| prompt_preview(p))
922 .unwrap_or_default();
923 let _ = w.step_started(
924 &step.name,
925 tier_index,
926 &zag_session_id,
927 zag_command_name(&step.command),
928 step.model.as_deref(),
929 &preview,
930 );
931 }
932 }
933 let race_started = Instant::now();
934 let mut children: Vec<(String, std::process::Child)> = Vec::new();
935
936 for step in steps {
937 let prompt = prompts.get(&step.name).ok_or_else(|| {
938 ZigError::Execution(format!("missing prompt for step '{}'", step.name))
939 })?;
940 eprintln!(" racing step '{}'...", step.name);
941 let rendered_sp = system_prompts.get(&step.name).map(|s| s.as_str());
942 let empty: Vec<std::path::PathBuf> = Vec::new();
943 let extra_dirs = storage_dirs.get(&step.name).unwrap_or(&empty);
944 let child = spawn_step(
945 step,
946 prompt,
947 workflow_name,
948 rendered_sp,
949 workflow_provider,
950 workflow_model,
951 extra_dirs,
952 )?;
953 children.push((step.name.clone(), child));
954 }
955
956 loop {
958 for i in 0..children.len() {
959 let status = children[i]
960 .1
961 .try_wait()
962 .map_err(|e| ZigError::Execution(format!("failed to poll child: {e}")))?;
963
964 if let Some(exit_status) = status {
965 let (winner_name, winner_child) = children.remove(i);
966
967 for (name, mut child) in children {
969 eprintln!(" cancelling step '{name}' (race lost)");
970 let _ = child.kill();
971 let _ = child.wait();
972 }
973
974 let elapsed = race_started.elapsed().as_millis() as u64;
975 if !exit_status.success() {
976 let stderr = winner_child
977 .stderr
978 .map(|mut s| {
979 let mut buf = String::new();
980 std::io::Read::read_to_string(&mut s, &mut buf).ok();
981 buf
982 })
983 .unwrap_or_default();
984 let err_msg = format!(
985 "race winner '{}' failed (exit {}): {}",
986 winner_name,
987 exit_status,
988 stderr.trim()
989 );
990 if let Some(w) = session {
991 let _ = w.step_failed(&winner_name, exit_status.code(), 1, &err_msg);
992 }
993 return Err(ZigError::Execution(err_msg));
994 }
995
996 let stdout = winner_child
997 .stdout
998 .map(|mut s| {
999 let mut buf = String::new();
1000 std::io::Read::read_to_string(&mut s, &mut buf).ok();
1001 buf
1002 })
1003 .unwrap_or_default();
1004
1005 eprintln!(" race won by '{winner_name}'");
1006 if let Some(w) = session {
1007 let _ = w.step_completed(&winner_name, 0, elapsed, Vec::new());
1008 }
1009 return Ok((winner_name, stdout));
1010 }
1011 }
1012
1013 std::thread::sleep(Duration::from_millis(100));
1014 }
1015}
1016
1017#[allow(clippy::too_many_arguments)]
1019fn execute_sequential_step(
1020 step: &Step,
1021 vars: &mut HashMap<String, String>,
1022 user_prompt: Option<&str>,
1023 step_outputs: &mut HashMap<String, String>,
1024 workflow_name: &str,
1025 pending_next: &mut Option<String>,
1026 tier_index: usize,
1027 session: Option<&Arc<SessionWriter>>,
1028 roles: &HashMap<String, Role>,
1029 resources: &ResourceCollector,
1030 memory: &MemoryCollector,
1031 storage: &StorageManager,
1032 workflow_dir: &Path,
1033 workflow_provider: Option<&str>,
1034 workflow_model: Option<&str>,
1035) -> Result<(), ZigError> {
1036 if let Some(condition) = &step.condition {
1037 if !evaluate_condition(condition, vars)? {
1038 eprintln!(
1039 " skipping '{}' (condition not met: {condition})",
1040 step.name
1041 );
1042 if let Some(w) = session {
1043 let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
1044 }
1045 return Ok(());
1046 }
1047 }
1048
1049 eprintln!(" running step '{}'...", step.name);
1050
1051 let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
1052 let rendered_sp = resolve_role_system_prompt(
1053 step,
1054 roles,
1055 resources,
1056 memory,
1057 storage,
1058 vars,
1059 workflow_dir,
1060 workflow_name,
1061 )?;
1062 let storage_dirs = storage.add_dirs_for_step(step.storage.as_deref());
1063 if let Some(w) = session {
1064 let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
1065 let _ = w.step_started(
1066 &step.name,
1067 tier_index,
1068 &zag_session_id,
1069 zag_command_name(&step.command),
1070 step.model.as_deref(),
1071 &prompt_preview(&prompt),
1072 );
1073 }
1074 let started = Instant::now();
1075 let result = run_step_attempts(
1076 step,
1077 &prompt,
1078 workflow_name,
1079 None,
1080 session,
1081 rendered_sp.as_deref(),
1082 workflow_provider,
1083 workflow_model,
1084 &storage_dirs,
1085 );
1086
1087 match result {
1088 Ok(output) => {
1089 let mut saved_keys: Vec<String> = Vec::new();
1090 if !step.saves.is_empty() {
1091 let saved = extract_saves(&output, &step.saves)?;
1092 for (k, v) in &saved {
1093 eprintln!(" saved {k} = {v}");
1094 saved_keys.push(k.clone());
1095 }
1096 vars.extend(saved);
1097 }
1098
1099 step_outputs.insert(step.name.clone(), output);
1100 eprintln!(" completed '{}'", step.name);
1101 if let Some(w) = session {
1102 let _ = w.step_completed(
1103 &step.name,
1104 0,
1105 started.elapsed().as_millis() as u64,
1106 saved_keys,
1107 );
1108 }
1109
1110 if step.next.is_some() {
1111 *pending_next = step.next.clone();
1112 }
1113 }
1114 Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1115 FailurePolicy::Fail => return Err(e),
1116 FailurePolicy::Continue => {
1117 eprintln!(" step '{}' failed (continuing): {e}", step.name);
1118 }
1119 FailurePolicy::Retry => {
1120 return Err(e);
1121 }
1122 },
1123 }
1124
1125 Ok(())
1126}
1127
1128#[allow(clippy::too_many_arguments)]
1137fn execute_parallel_tier(
1138 steps: &[&Step],
1139 vars: &mut HashMap<String, String>,
1140 user_prompt: Option<&str>,
1141 step_outputs: &mut HashMap<String, String>,
1142 workflow_name: &str,
1143 pending_next: &mut Option<String>,
1144 tier_index: usize,
1145 session: Option<&Arc<SessionWriter>>,
1146 roles: &HashMap<String, Role>,
1147 resources: &ResourceCollector,
1148 memory: &MemoryCollector,
1149 storage: &StorageManager,
1150 workflow_dir: &Path,
1151 workflow_provider: Option<&str>,
1152 workflow_model: Option<&str>,
1153) -> Result<(), ZigError> {
1154 let mut active: Vec<&Step> = Vec::new();
1157 let mut prompts: HashMap<String, String> = HashMap::new();
1158 let mut rendered_sps: HashMap<String, String> = HashMap::new();
1159 let mut storage_dirs_map: HashMap<String, Vec<std::path::PathBuf>> = HashMap::new();
1160 for step in steps {
1161 if let Some(condition) = &step.condition {
1162 if !evaluate_condition(condition, vars)? {
1163 eprintln!(
1164 " skipping '{}' (condition not met: {condition})",
1165 step.name
1166 );
1167 if let Some(w) = session {
1168 let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
1169 }
1170 continue;
1171 }
1172 }
1173 let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
1174 prompts.insert(step.name.clone(), prompt);
1175 if let Some(sp) = resolve_role_system_prompt(
1176 step,
1177 roles,
1178 resources,
1179 memory,
1180 storage,
1181 vars,
1182 workflow_dir,
1183 workflow_name,
1184 )? {
1185 rendered_sps.insert(step.name.clone(), sp);
1186 }
1187 storage_dirs_map.insert(
1188 step.name.clone(),
1189 storage.add_dirs_for_step(step.storage.as_deref()),
1190 );
1191 active.push(*step);
1192 }
1193
1194 if active.is_empty() {
1195 return Ok(());
1196 }
1197
1198 eprintln!(" running {} steps in parallel...", active.len());
1199
1200 let mut start_times: HashMap<String, Instant> = HashMap::new();
1201 let mut handles: Vec<thread::JoinHandle<(String, Result<String, ZigError>)>> = Vec::new();
1202 for step in &active {
1203 let step_clone: Step = (*step).clone();
1204 let prompt = prompts.remove(&step.name).unwrap_or_default();
1205 let rendered_sp = rendered_sps.remove(&step.name);
1206 let workflow_name_owned = workflow_name.to_string();
1207 let name = step.name.clone();
1208 eprintln!(" starting '{name}'...");
1209 if let Some(w) = session {
1210 let zag_session_id = format!("zig-{workflow_name}-{name}");
1211 let _ = w.step_started(
1212 &name,
1213 tier_index,
1214 &zag_session_id,
1215 zag_command_name(&step.command),
1216 step.model.as_deref(),
1217 &prompt_preview(&prompt),
1218 );
1219 }
1220 start_times.insert(name.clone(), Instant::now());
1221 let session_clone = session.cloned();
1222 let wf_provider = workflow_provider.map(String::from);
1223 let wf_model = workflow_model.map(String::from);
1224 let storage_dirs = storage_dirs_map.remove(&step.name).unwrap_or_default();
1225 let handle = thread::spawn(move || {
1226 let res = run_step_attempts(
1227 &step_clone,
1228 &prompt,
1229 &workflow_name_owned,
1230 Some(&name),
1231 session_clone.as_ref(),
1232 rendered_sp.as_deref(),
1233 wf_provider.as_deref(),
1234 wf_model.as_deref(),
1235 &storage_dirs,
1236 );
1237 (name, res)
1238 });
1239 handles.push(handle);
1240 }
1241
1242 let mut results: HashMap<String, Result<String, ZigError>> = HashMap::new();
1243 for handle in handles {
1244 match handle.join() {
1245 Ok((name, res)) => {
1246 results.insert(name, res);
1247 }
1248 Err(_) => {
1249 return Err(ZigError::Execution("parallel step thread panicked".into()));
1250 }
1251 }
1252 }
1253
1254 let mut errors: Vec<String> = Vec::new();
1256 for step in &active {
1257 let Some(res) = results.remove(&step.name) else {
1258 continue;
1259 };
1260 let elapsed = start_times
1261 .remove(&step.name)
1262 .map(|t| t.elapsed().as_millis() as u64)
1263 .unwrap_or(0);
1264 match res {
1265 Ok(output) => {
1266 let mut saved_keys: Vec<String> = Vec::new();
1267 if !step.saves.is_empty() {
1268 let saved = extract_saves(&output, &step.saves)?;
1269 for (k, v) in &saved {
1270 eprintln!(" saved {k} = {v}");
1271 saved_keys.push(k.clone());
1272 }
1273 vars.extend(saved);
1274 }
1275 step_outputs.insert(step.name.clone(), output);
1276 eprintln!(" completed '{}'", step.name);
1277 if let Some(w) = session {
1278 let _ = w.step_completed(&step.name, 0, elapsed, saved_keys);
1279 }
1280 if step.next.is_some() && pending_next.is_none() {
1281 *pending_next = step.next.clone();
1282 }
1283 }
1284 Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1285 FailurePolicy::Continue => {
1286 eprintln!(" step '{}' failed (continuing): {e}", step.name);
1287 }
1288 FailurePolicy::Fail | FailurePolicy::Retry => {
1289 errors.push(format!("'{}': {e}", step.name));
1290 }
1291 },
1292 }
1293 }
1294
1295 if !errors.is_empty() {
1296 return Err(ZigError::Execution(format!(
1297 "parallel step(s) failed: {}",
1298 errors.join("; ")
1299 )));
1300 }
1301
1302 Ok(())
1303}
1304
1305fn init_vars(workflow: &Workflow) -> HashMap<String, String> {
1308 let mut vars = HashMap::new();
1309 for (name, var) in &workflow.vars {
1310 let value = match &var.default {
1311 Some(toml::Value::String(s)) => s.clone(),
1312 Some(toml::Value::Integer(n)) => n.to_string(),
1313 Some(toml::Value::Float(f)) => f.to_string(),
1314 Some(toml::Value::Boolean(b)) => b.to_string(),
1315 Some(other) => other.to_string(),
1316 None => String::new(),
1317 };
1318 vars.insert(name.clone(), value);
1319 }
1320 vars
1321}
1322
1323fn execute(
1325 workflow: &Workflow,
1326 workflow_path: &std::path::Path,
1327 user_prompt: Option<&str>,
1328 workflow_dir: &Path,
1329 disable_resources: bool,
1330 disable_memory: bool,
1331 disable_storage: bool,
1332) -> Result<(), ZigError> {
1333 let mut vars = init_vars(workflow);
1334
1335 let resource_collector = ResourceCollector::from_env(
1336 &workflow.workflow.name,
1337 &workflow.workflow.resources,
1338 workflow_dir,
1339 disable_resources,
1340 );
1341
1342 let config = ZigConfig::load();
1343 let workflow_memory_mode = MemoryMode::from_str_opt(workflow.workflow.memory.as_deref());
1344 let memory_collector = MemoryCollector::from_env(
1345 &workflow.workflow.name,
1346 workflow_memory_mode,
1347 &config,
1348 disable_memory,
1349 );
1350
1351 let storage_manager = if disable_storage || workflow.storage.is_empty() {
1357 StorageManager::empty()
1358 } else {
1359 let backend = FilesystemBackend::from_cwd()?;
1360 StorageManager::build(&workflow.storage, backend)?
1361 };
1362
1363 load_file_defaults(&mut vars, &workflow.vars, workflow_dir)?;
1365
1366 let prompt_var = workflow
1368 .vars
1369 .iter()
1370 .find(|(_, v)| v.from.as_deref() == Some("prompt"))
1371 .map(|(name, _)| name.clone());
1372
1373 if let Some(ref var_name) = prompt_var {
1374 if let Some(prompt) = user_prompt {
1375 vars.insert(var_name.clone(), prompt.to_string());
1376 }
1377 }
1378
1379 if let Err(errors) = validate::validate_var_values(&vars, &workflow.vars) {
1381 let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
1382 return Err(ZigError::Validation(msgs.join("; ")));
1383 }
1384
1385 let effective_user_prompt = if prompt_var.is_some() {
1387 None
1388 } else {
1389 user_prompt
1390 };
1391
1392 let mut step_outputs: HashMap<String, String> = HashMap::new();
1393
1394 let wf_provider = workflow.workflow.provider.as_deref();
1395 let wf_model = workflow.workflow.model.as_deref();
1396
1397 let tiers = topological_sort(&workflow.steps)?;
1398
1399 eprintln!(
1400 "running workflow '{}' ({} steps in {} tiers)",
1401 workflow.workflow.name,
1402 workflow.steps.len(),
1403 tiers.len()
1404 );
1405
1406 let coordinator = match SessionWriter::create(
1411 &workflow.workflow.name,
1412 &workflow_path.to_string_lossy(),
1413 user_prompt,
1414 tiers.len(),
1415 ) {
1416 Ok(writer) => {
1417 eprintln!("zig session: {}", writer.session_id());
1418 Some(SessionCoordinator::start(writer))
1419 }
1420 Err(e) => {
1421 eprintln!("warning: failed to open zig session log: {e}");
1422 None
1423 }
1424 };
1425 let session_writer: Option<Arc<SessionWriter>> = coordinator.as_ref().map(|c| c.writer());
1426 let session_ref = session_writer.as_ref();
1427
1428 let mut iteration = 0;
1429 let mut pending_next: Option<String> = None;
1430
1431 loop {
1432 let tiers_to_run = if let Some(ref next_step) = pending_next {
1433 let remaining: Vec<Vec<&Step>> = tiers
1435 .iter()
1436 .map(|tier| {
1437 tier.iter()
1438 .filter(|s| s.name == *next_step)
1439 .copied()
1440 .collect::<Vec<_>>()
1441 })
1442 .filter(|tier| !tier.is_empty())
1443 .collect();
1444 pending_next = None;
1445 remaining
1446 } else if iteration == 0 {
1447 tiers.clone()
1448 } else {
1449 break;
1450 };
1451
1452 for (tier_index, tier) in tiers_to_run.iter().enumerate() {
1453 let (non_race, race_groups) = partition_tier(tier);
1454
1455 if let Some(w) = session_ref {
1456 let names: Vec<String> = tier.iter().map(|s| s.name.clone()).collect();
1457 let _ = w.tier_started(tier_index, names);
1458 }
1459
1460 if non_race.len() <= 1 {
1464 for step in &non_race {
1465 execute_sequential_step(
1466 step,
1467 &mut vars,
1468 effective_user_prompt,
1469 &mut step_outputs,
1470 &workflow.workflow.name,
1471 &mut pending_next,
1472 tier_index,
1473 session_ref,
1474 &workflow.roles,
1475 &resource_collector,
1476 &memory_collector,
1477 &storage_manager,
1478 workflow_dir,
1479 wf_provider,
1480 wf_model,
1481 )?;
1482 }
1483 } else {
1484 execute_parallel_tier(
1485 &non_race,
1486 &mut vars,
1487 effective_user_prompt,
1488 &mut step_outputs,
1489 &workflow.workflow.name,
1490 &mut pending_next,
1491 tier_index,
1492 session_ref,
1493 &workflow.roles,
1494 &resource_collector,
1495 &memory_collector,
1496 &storage_manager,
1497 workflow_dir,
1498 wf_provider,
1499 wf_model,
1500 )?;
1501 }
1502
1503 for (group_name, race_steps) in &race_groups {
1505 eprintln!(" starting race group '{group_name}'...");
1506
1507 let mut prompts = HashMap::new();
1509 let mut race_sps: HashMap<String, String> = HashMap::new();
1510 let mut race_storage_dirs: HashMap<String, Vec<std::path::PathBuf>> =
1511 HashMap::new();
1512 let mut active_steps: Vec<&Step> = Vec::new();
1513 for step in race_steps {
1514 if let Some(condition) = &step.condition {
1515 if !evaluate_condition(condition, &vars)? {
1516 eprintln!(
1517 " skipping '{}' (condition not met: {condition})",
1518 step.name
1519 );
1520 continue;
1521 }
1522 }
1523 let prompt =
1524 render_step_prompt(step, &vars, effective_user_prompt, &step_outputs);
1525 prompts.insert(step.name.clone(), prompt);
1526 if let Some(sp) = resolve_role_system_prompt(
1527 step,
1528 &workflow.roles,
1529 &resource_collector,
1530 &memory_collector,
1531 &storage_manager,
1532 &vars,
1533 workflow_dir,
1534 &workflow.workflow.name,
1535 )? {
1536 race_sps.insert(step.name.clone(), sp);
1537 }
1538 race_storage_dirs.insert(
1539 step.name.clone(),
1540 storage_manager.add_dirs_for_step(step.storage.as_deref()),
1541 );
1542 active_steps.push(step);
1543 }
1544
1545 if active_steps.is_empty() {
1546 continue;
1547 }
1548
1549 match execute_race_group(
1550 &active_steps,
1551 &prompts,
1552 &race_sps,
1553 &workflow.workflow.name,
1554 tier_index,
1555 session_ref,
1556 wf_provider,
1557 wf_model,
1558 &race_storage_dirs,
1559 ) {
1560 Ok((winner_name, output)) => {
1561 if let Some(winner) = active_steps.iter().find(|s| s.name == winner_name) {
1563 if !winner.saves.is_empty() {
1564 let saved = extract_saves(&output, &winner.saves)?;
1565 for (k, v) in &saved {
1566 eprintln!(" saved {k} = {v}");
1567 }
1568 vars.extend(saved);
1569 }
1570 if winner.next.is_some() {
1571 pending_next = winner.next.clone();
1572 }
1573 }
1574 step_outputs.insert(winner_name.clone(), output);
1575 eprintln!(
1576 " completed race group '{group_name}' (winner: '{winner_name}')"
1577 );
1578 }
1579 Err(e) => return Err(e),
1580 }
1581 }
1582 }
1583
1584 iteration += 1;
1585 if pending_next.is_none() || iteration >= MAX_LOOP_ITERATIONS {
1586 if iteration >= MAX_LOOP_ITERATIONS {
1587 eprintln!("warning: reached maximum loop iterations ({MAX_LOOP_ITERATIONS})");
1588 }
1589 break;
1590 }
1591 }
1592
1593 eprintln!("workflow '{}' completed", workflow.workflow.name);
1594 if let Some(c) = coordinator {
1595 let _ = c.finish(SessionStatus::Success);
1596 }
1597 Ok(())
1598}
1599
1600fn zag_command_name(cmd: &Option<StepCommand>) -> &'static str {
1603 match cmd {
1604 None => "run",
1605 Some(StepCommand::Review) => "review",
1606 Some(StepCommand::Plan) => "plan",
1607 Some(StepCommand::Pipe) => "pipe",
1608 Some(StepCommand::Collect) => "collect",
1609 Some(StepCommand::Summary) => "summary",
1610 }
1611}
1612
1613fn prompt_preview(prompt: &str) -> String {
1615 const MAX: usize = 200;
1616 let collapsed: String = prompt
1617 .chars()
1618 .map(|c| if c == '\n' { ' ' } else { c })
1619 .collect();
1620 if collapsed.chars().count() <= MAX {
1621 collapsed
1622 } else {
1623 let truncated: String = collapsed.chars().take(MAX).collect();
1624 format!("{truncated}…")
1625 }
1626}
1627
1628#[cfg(test)]
1629#[path = "run_tests.rs"]
1630mod tests;