1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use serde::Serialize;
7use tokio::task::JoinSet;
8use zag_agent::builder::AgentBuilder;
9use zag_agent::session_log::LogEventKind;
10use zag_agent::{plan as agent_plan, review as agent_review};
11use zag_orch::collect as orch_collect;
12use zag_orch::summary as orch_summary;
13
14use crate::config::ZigConfig;
15use crate::dry_run::{DryRunContext, DryRunFormat};
16use crate::error::ZigError;
17use crate::memory::{MemoryCollector, render_memory_block};
18use crate::paths::expand_path;
19use crate::resources::{ResourceCollector, render_system_block};
20use crate::session::{OutputStream, SessionCoordinator, SessionStatus, SessionWriter};
21use crate::storage::{FilesystemBackend, StorageManager};
22use crate::workflow::model::{FailurePolicy, MemoryMode, Role, Step, StepCommand, Workflow};
23use crate::workflow::{parser, validate};
24
25const MAX_LOOP_ITERATIONS: usize = 100;
27
28#[allow(clippy::too_many_arguments)]
50pub async fn run_workflow(
51 workflow_path: &str,
52 user_prompt: Option<&str>,
53 disable_resources: bool,
54 disable_memory: bool,
55 disable_storage: bool,
56 dry_run: bool,
57 dry_run_format: DryRunFormat,
58) -> Result<(), ZigError> {
59 let path = resolve_workflow_path(workflow_path)?;
60 let (workflow, source) = parser::parse_workflow(&path)?;
61
62 if let Err(errors) = validate::validate(&workflow) {
63 let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
64 return Err(ZigError::Validation(msgs.join("; ")));
65 }
66
67 execute(
68 &workflow,
69 &path,
70 user_prompt,
71 source.dir(),
72 disable_resources,
73 disable_memory,
74 disable_storage,
75 dry_run,
76 dry_run_format,
77 )
78 .await
79}
80
81pub fn resolve_workflow_path(workflow: &str) -> Result<PathBuf, ZigError> {
94 let mut candidates = vec![
95 PathBuf::from(workflow),
96 PathBuf::from(format!("{workflow}.zwf")),
97 PathBuf::from(format!("{workflow}.zwfz")),
98 ];
99
100 if let Some(local_dir) = crate::paths::cwd_workflows_dir() {
101 candidates.push(local_dir.join(workflow));
102 candidates.push(local_dir.join(format!("{workflow}.zwf")));
103 candidates.push(local_dir.join(format!("{workflow}.zwfz")));
104 }
105
106 if let Some(global_dir) = crate::paths::global_workflows_dir() {
107 candidates.push(global_dir.join(workflow));
108 candidates.push(global_dir.join(format!("{workflow}.zwf")));
109 candidates.push(global_dir.join(format!("{workflow}.zwfz")));
110 }
111
112 for candidate in &candidates {
113 if candidate.exists() {
114 return Ok(candidate.clone());
115 }
116 }
117
118 Err(ZigError::Io(format!(
119 "workflow not found: '{workflow}' (tried: {})",
120 candidates
121 .iter()
122 .map(|p| p.display().to_string())
123 .collect::<Vec<_>>()
124 .join(", ")
125 )))
126}
127
128pub(crate) fn topological_sort(steps: &[Step]) -> Result<Vec<Vec<&Step>>, ZigError> {
134 let step_index: HashMap<&str, usize> = steps
135 .iter()
136 .enumerate()
137 .map(|(i, s)| (s.name.as_str(), i))
138 .collect();
139
140 let mut in_degree = vec![0usize; steps.len()];
141 for (i, step) in steps.iter().enumerate() {
142 for dep in &step.depends_on {
143 if step_index.contains_key(dep.as_str()) {
144 in_degree[i] += 1;
145 }
146 }
147 }
148
149 let mut tiers = Vec::new();
150 let mut remaining = in_degree.clone();
151 let mut completed: Vec<bool> = vec![false; steps.len()];
152
153 loop {
154 let tier: Vec<usize> = (0..steps.len())
155 .filter(|&i| !completed[i] && remaining[i] == 0)
156 .collect();
157
158 if tier.is_empty() {
159 break;
160 }
161
162 for &i in &tier {
163 completed[i] = true;
164 }
165
166 for &i in &tier {
168 for (j, step) in steps.iter().enumerate() {
169 if !completed[j] && step.depends_on.contains(&steps[i].name) {
170 remaining[j] -= 1;
171 }
172 }
173 }
174
175 tiers.push(tier.iter().map(|&i| &steps[i]).collect());
176 }
177
178 let completed_count: usize = completed.iter().filter(|&&c| c).count();
179 if completed_count != steps.len() {
180 return Err(ZigError::Execution(
181 "could not resolve all steps — possible undetected cycle".into(),
182 ));
183 }
184
185 Ok(tiers)
186}
187
188pub(crate) fn substitute_vars(template: &str, vars: &HashMap<String, String>) -> String {
194 let mut result = String::with_capacity(template.len());
195 let mut rest = template;
196
197 while let Some(start) = rest.find("${") {
198 result.push_str(&rest[..start]);
199 let after_start = &rest[start + 2..];
200
201 if let Some(end) = after_start.find('}') {
202 let var_expr = &after_start[..end];
203 let mut parts = var_expr.splitn(2, '.');
204 let root = parts.next().unwrap_or(var_expr);
205
206 if let Some(value) = vars.get(root) {
207 if let Some(path) = parts.next() {
208 if let Ok(json) = serde_json::from_str::<serde_json::Value>(value) {
210 let resolved = json_path_lookup(&json, path);
211 result.push_str(&resolved);
212 } else {
213 result.push_str(value);
214 }
215 } else {
216 result.push_str(value);
217 }
218 } else {
219 result.push_str(&rest[start..start + 2 + end + 1]);
221 }
222
223 rest = &after_start[end + 1..];
224 } else {
225 result.push_str(&rest[start..]);
226 rest = "";
227 }
228 }
229
230 result.push_str(rest);
231 result
232}
233
234fn json_path_lookup(value: &serde_json::Value, path: &str) -> String {
236 let mut current = value;
237 for key in path.split('.') {
238 match current.get(key) {
239 Some(v) => current = v,
240 None => return format!("${{?.{path}}}"),
241 }
242 }
243 match current {
244 serde_json::Value::String(s) => s.clone(),
245 other => other.to_string(),
246 }
247}
248
249#[allow(clippy::too_many_arguments)]
266pub(crate) fn resolve_role_system_prompt(
267 step: &Step,
268 roles: &HashMap<String, Role>,
269 resources: &ResourceCollector<'_>,
270 memory: &MemoryCollector,
271 storage: &StorageManager,
272 vars: &HashMap<String, String>,
273 workflow_dir: &Path,
274 workflow_name: &str,
275) -> Result<Option<String>, ZigError> {
276 let base_prompt: Option<String> = if let Some(ref sp) = step.system_prompt {
278 Some(substitute_vars(sp, vars))
279 } else if let Some(ref role_ref) = step.role {
280 let resolved_name = substitute_vars(role_ref, vars);
281 let role = roles.get(&resolved_name).ok_or_else(|| {
282 ZigError::Execution(format!(
283 "step '{}' references role '{}' which does not exist",
284 step.name, resolved_name
285 ))
286 })?;
287
288 let raw_prompt = if let Some(ref file_path) = role.system_prompt_file {
289 let full_path = workflow_dir.join(expand_path(file_path));
290 Some(std::fs::read_to_string(&full_path).map_err(|e| {
291 ZigError::Execution(format!(
292 "failed to read system_prompt_file '{}' for role '{}': {e}",
293 full_path.display(),
294 resolved_name
295 ))
296 })?)
297 } else {
298 role.system_prompt.clone()
299 };
300
301 raw_prompt.map(|p| substitute_vars(&p, vars))
302 } else {
303 None
304 };
305
306 let set = resources.collect_for_step(&step.resources)?;
308 let resource_block = render_system_block(&set);
309
310 let memory_entries = memory.collect_for_step(step.memory.as_deref())?;
312 let memory_block = render_memory_block(&memory_entries, workflow_name, Some(&step.name));
313
314 let storage_block = match storage.render_block(step.storage.as_deref())? {
316 Some(mut s) => {
317 s.push('\n');
318 s
319 }
320 None => String::new(),
321 };
322
323 let prefix = format!("{resource_block}{memory_block}{storage_block}");
324
325 match (prefix.is_empty(), base_prompt) {
326 (true, None) => Ok(None),
327 (true, Some(p)) => Ok(Some(p)),
328 (false, None) => Ok(Some(prefix.trim_end().to_string())),
329 (false, Some(p)) => Ok(Some(format!("{prefix}{p}"))),
330 }
331}
332
333fn load_file_defaults(
338 vars: &mut HashMap<String, String>,
339 declarations: &HashMap<String, crate::workflow::model::Variable>,
340 workflow_dir: &Path,
341) -> Result<(), ZigError> {
342 for (name, decl) in declarations {
343 if decl.default.is_none() {
344 if let Some(ref file_path) = decl.default_file {
345 let full_path = workflow_dir.join(expand_path(file_path));
346 let content = std::fs::read_to_string(&full_path).map_err(|e| {
347 ZigError::Execution(format!(
348 "failed to read default_file '{}' for variable '{name}': {e}",
349 full_path.display()
350 ))
351 })?;
352 vars.insert(name.clone(), content);
353 }
354 }
355 }
356 Ok(())
357}
358
359pub(crate) fn evaluate_condition(
366 condition: &str,
367 vars: &HashMap<String, String>,
368) -> Result<bool, ZigError> {
369 let condition = condition.trim();
370
371 let operators = ["<=", ">=", "!=", "==", "<", ">"];
373 for op in &operators {
374 if let Some(pos) = condition.find(op) {
375 let lhs = resolve_operand(condition[..pos].trim(), vars);
376 let rhs = resolve_operand(condition[pos + op.len()..].trim(), vars);
377 return Ok(compare(&lhs, &rhs, op));
378 }
379 }
380
381 let value = vars.get(condition).map(|s| s.as_str()).unwrap_or("");
383 Ok(is_truthy(value))
384}
385
386fn resolve_operand(token: &str, vars: &HashMap<String, String>) -> String {
391 if (token.starts_with('"') && token.ends_with('"'))
393 || (token.starts_with('\'') && token.ends_with('\''))
394 {
395 return token[1..token.len() - 1].to_string();
396 }
397 if let Some(val) = vars.get(token) {
399 return val.clone();
400 }
401 token.to_string()
403}
404
405fn compare(lhs: &str, rhs: &str, op: &str) -> bool {
408 if let (Ok(l), Ok(r)) = (lhs.parse::<f64>(), rhs.parse::<f64>()) {
409 return match op {
410 "==" => (l - r).abs() < f64::EPSILON,
411 "!=" => (l - r).abs() >= f64::EPSILON,
412 "<" => l < r,
413 ">" => l > r,
414 "<=" => l <= r,
415 ">=" => l >= r,
416 _ => false,
417 };
418 }
419 match op {
420 "==" => lhs == rhs,
421 "!=" => lhs != rhs,
422 "<" => lhs < rhs,
423 ">" => lhs > rhs,
424 "<=" => lhs <= rhs,
425 ">=" => lhs >= rhs,
426 _ => false,
427 }
428}
429
430fn is_truthy(value: &str) -> bool {
432 !value.is_empty() && value != "false" && value != "0"
433}
434
435pub(crate) fn render_step_prompt(
438 step: &Step,
439 vars: &HashMap<String, String>,
440 user_prompt: Option<&str>,
441 dependency_outputs: &HashMap<String, String>,
442) -> String {
443 let mut prompt = String::new();
444
445 if let Some(ctx) = user_prompt {
447 prompt.push_str(&format!("User context: {ctx}\n\n"));
448 }
449
450 if step.inject_context {
452 for dep in &step.depends_on {
453 if let Some(output) = dependency_outputs.get(dep) {
454 prompt.push_str(&format!("--- Output from '{dep}' ---\n{output}\n\n"));
455 }
456 }
457 }
458
459 prompt.push_str(&substitute_vars(&step.prompt, vars));
461
462 prompt
463}
464
465#[derive(Debug, Clone, Serialize, Default)]
474pub struct AgentConfig {
475 pub command: String,
478
479 pub provider: Option<String>,
481 pub model: Option<String>,
482 pub system_prompt: Option<String>,
483 pub root: Option<String>,
484 pub add_dirs: Vec<String>,
485 #[serde(serialize_with = "serialize_env_pairs")]
486 pub env: Vec<(String, String)>,
487 pub files: Vec<String>,
488 pub auto_approve: bool,
489 #[serde(skip_serializing_if = "Option::is_none")]
491 pub worktree: Option<Option<String>>,
492 #[serde(skip_serializing_if = "Option::is_none")]
493 pub sandbox: Option<String>,
494
495 pub json_mode: bool,
497 #[serde(skip_serializing_if = "Option::is_none")]
498 pub json_schema: Option<String>,
499 #[serde(skip_serializing_if = "Option::is_none")]
500 pub output_format: Option<String>,
501
502 #[serde(skip_serializing_if = "Option::is_none")]
504 pub max_turns: Option<u32>,
505 #[serde(skip_serializing_if = "Option::is_none")]
506 pub timeout: Option<String>,
507 #[serde(skip_serializing_if = "Option::is_none")]
508 pub mcp_config: Option<String>,
509
510 pub session_name: String,
512 #[serde(skip_serializing_if = "Option::is_none")]
513 pub description: Option<String>,
514 pub tags: Vec<String>,
515
516 pub prompt: String,
518
519 pub accepts_agent_args: bool,
522
523 #[serde(skip_serializing_if = "Option::is_none")]
525 pub command_params: Option<CommandParams>,
526
527 pub interactive: bool,
530}
531
532fn serialize_env_pairs<S>(pairs: &[(String, String)], s: S) -> Result<S::Ok, S::Error>
533where
534 S: serde::Serializer,
535{
536 use serde::ser::SerializeSeq;
537 let mut seq = s.serialize_seq(Some(pairs.len()))?;
538 for (k, v) in pairs {
539 seq.serialize_element(&format!("{k}={v}"))?;
540 }
541 seq.end()
542}
543
544#[derive(Debug, Clone, Serialize)]
546#[serde(tag = "kind", rename_all = "snake_case")]
547pub enum CommandParams {
548 Review {
549 uncommitted: bool,
550 base: Option<String>,
551 commit: Option<String>,
552 title: Option<String>,
553 },
554 Plan {
555 output: Option<String>,
556 instructions: Option<String>,
557 },
558 Pipe {
559 session_ids: Vec<String>,
560 },
561 Collect {
562 session_ids: Vec<String>,
563 },
564 Summary {
565 session_ids: Vec<String>,
566 },
567}
568
569#[allow(clippy::too_many_arguments)]
573pub(crate) fn build_agent_config(
574 step: &Step,
575 prompt: &str,
576 workflow_name: &str,
577 model_override: Option<&str>,
578 rendered_system_prompt: Option<&str>,
579 workflow_provider: Option<&str>,
580 workflow_model: Option<&str>,
581 extra_add_dirs: &[std::path::PathBuf],
582) -> AgentConfig {
583 let session_name = |dep: &str| format!("zig-{workflow_name}-{dep}");
584
585 let (command_label, accepts_agent_args, command_params) = match &step.command {
586 None => ("run".to_string(), true, None),
587 Some(StepCommand::Review) => (
588 "review".to_string(),
589 true,
590 Some(CommandParams::Review {
591 uncommitted: step.uncommitted,
592 base: step.base.clone(),
593 commit: step.commit.clone(),
594 title: step.title.clone(),
595 }),
596 ),
597 Some(StepCommand::Plan) => (
598 "plan".to_string(),
599 true,
600 Some(CommandParams::Plan {
601 output: step.plan_output.as_deref().map(expand_path),
602 instructions: step.instructions.clone(),
603 }),
604 ),
605 Some(StepCommand::Pipe) => {
606 let session_ids: Vec<String> =
607 step.depends_on.iter().map(|d| session_name(d)).collect();
608 (
609 "pipe".to_string(),
610 true,
611 Some(CommandParams::Pipe { session_ids }),
612 )
613 }
614 Some(StepCommand::Collect) => {
615 let session_ids: Vec<String> =
616 step.depends_on.iter().map(|d| session_name(d)).collect();
617 (
618 "collect".to_string(),
619 false,
620 Some(CommandParams::Collect { session_ids }),
621 )
622 }
623 Some(StepCommand::Summary) => {
624 let session_ids: Vec<String> =
625 step.depends_on.iter().map(|d| session_name(d)).collect();
626 (
627 "summary".to_string(),
628 false,
629 Some(CommandParams::Summary { session_ids }),
630 )
631 }
632 };
633
634 let mut cfg = AgentConfig {
637 command: command_label,
638 session_name: session_name(&step.name),
639 description: if step.description.is_empty() {
640 None
641 } else {
642 Some(step.description.clone())
643 },
644 tags: {
645 let mut t = vec!["zig-workflow".to_string()];
646 t.extend(step.tags.iter().cloned());
647 t
648 },
649 timeout: step.timeout.clone(),
650 prompt: prompt.to_string(),
651 accepts_agent_args,
652 command_params,
653 interactive: step.interactive,
654 ..Default::default()
655 };
656
657 if !accepts_agent_args {
658 return cfg;
659 }
660
661 cfg.provider = step
662 .provider
663 .clone()
664 .or_else(|| workflow_provider.map(String::from));
665 cfg.model = model_override
666 .map(String::from)
667 .or_else(|| step.model.clone())
668 .or_else(|| workflow_model.map(String::from));
669 cfg.system_prompt = rendered_system_prompt.map(String::from);
670 cfg.max_turns = step.max_turns;
671
672 if let Some(output) = &step.output {
675 cfg.output_format = Some(output.clone());
676 } else if step.json {
677 cfg.json_mode = true;
678 }
679 cfg.json_schema = step.json_schema.clone();
680 cfg.mcp_config = step.mcp_config.as_deref().map(expand_path);
681
682 cfg.auto_approve = step.auto_approve;
683 cfg.root = step.root.as_deref().map(expand_path);
684 cfg.add_dirs = step
685 .add_dirs
686 .iter()
687 .map(|d| expand_path(d))
688 .chain(extra_add_dirs.iter().map(|p| p.display().to_string()))
689 .collect();
690 cfg.env = step
691 .env
692 .iter()
693 .map(|(k, v)| (k.clone(), v.clone()))
694 .collect();
695 cfg.files = step.files.iter().map(|f| expand_path(f)).collect();
696
697 if step.worktree {
699 cfg.worktree = Some(None);
700 }
701 cfg.sandbox = step.sandbox.clone();
702
703 if cfg.interactive {
707 let base = cfg.system_prompt.take().unwrap_or_default();
708 cfg.system_prompt = Some(format!(
709 "{base}{}",
710 crate::self_cmd::INTERACTIVE_SELF_TERMINATE_INSTRUCTION
711 ));
712 }
713
714 cfg
715}
716
717pub(crate) fn apply_agent_config(mut builder: AgentBuilder, cfg: &AgentConfig) -> AgentBuilder {
720 if let Some(ref p) = cfg.provider {
721 builder = builder.provider(p);
722 }
723 if let Some(ref m) = cfg.model {
724 builder = builder.model(m);
725 }
726 if let Some(ref sp) = cfg.system_prompt {
727 builder = builder.system_prompt(sp);
728 }
729 if let Some(ref r) = cfg.root {
730 builder = builder.root(r);
731 }
732 if cfg.auto_approve {
733 builder = builder.auto_approve(true);
734 }
735 for dir in &cfg.add_dirs {
736 builder = builder.add_dir(dir);
737 }
738 for (k, v) in &cfg.env {
739 builder = builder.env(k, v);
740 }
741 for f in &cfg.files {
742 builder = builder.file(f);
743 }
744 if let Some(ref wt) = cfg.worktree {
745 builder = builder.worktree(wt.as_deref());
746 }
747 if let Some(ref sb) = cfg.sandbox {
748 builder = builder.sandbox(Some(sb));
749 }
750 if let Some(ref fmt) = cfg.output_format {
751 builder = builder.output_format(fmt);
752 }
753 if cfg.json_mode {
754 if let Some(ref schema) = cfg.json_schema {
755 if let Ok(v) = serde_json::from_str::<serde_json::Value>(schema) {
756 builder = builder.json_schema(v);
757 } else {
758 builder = builder.json();
759 }
760 } else {
761 builder = builder.json();
762 }
763 } else if let Some(ref schema) = cfg.json_schema {
764 if let Ok(v) = serde_json::from_str::<serde_json::Value>(schema) {
765 builder = builder.json_schema(v);
766 }
767 }
768 if let Some(turns) = cfg.max_turns {
769 builder = builder.max_turns(turns);
770 }
771 if let Some(ref t) = cfg.timeout {
772 if let Some(dur) = parse_timeout_string(t) {
773 builder = builder.timeout(dur);
774 }
775 }
776 if let Some(ref mcp) = cfg.mcp_config {
777 builder = builder.mcp_config(mcp);
778 }
779 builder = builder.name(&cfg.session_name);
780 if let Some(ref d) = cfg.description {
781 builder = builder.description(d);
782 }
783 for tag in &cfg.tags {
784 builder = builder.tag(tag);
785 }
786 builder
787}
788
789fn parse_timeout_string(s: &str) -> Option<Duration> {
792 let s = s.trim();
796 if s.is_empty() {
797 return None;
798 }
799 if let Ok(secs) = s.parse::<u64>() {
800 return Some(Duration::from_secs(secs));
801 }
802 let mut total = Duration::ZERO;
803 let mut current = String::new();
804 let mut chars = s.chars().peekable();
805 while let Some(c) = chars.next() {
806 if c.is_ascii_digit() || c == '.' {
807 current.push(c);
808 continue;
809 }
810 let mut unit = String::from(c);
811 if c == 'm' && chars.peek() == Some(&'s') {
812 unit.push(chars.next().unwrap());
813 }
814 let value: f64 = current.parse().ok()?;
815 current.clear();
816 let piece = match unit.as_str() {
817 "ms" => Duration::from_millis(value as u64),
818 "s" => Duration::from_secs_f64(value),
819 "m" => Duration::from_secs_f64(value * 60.0),
820 "h" => Duration::from_secs_f64(value * 3600.0),
821 _ => return None,
822 };
823 total += piece;
824 }
825 if !current.is_empty() {
826 return None;
827 }
828 Some(total)
829}
830
831async fn dispatch_agent(
849 cfg: &AgentConfig,
850 step_name: &str,
851 session: Option<&Arc<SessionWriter>>,
852 prefix: Option<&str>,
853) -> Result<String, ZigError> {
854 match cfg.command.as_str() {
855 "run" => {
856 if cfg.interactive {
857 let builder = apply_agent_config(AgentBuilder::new(), cfg);
861 builder.run(Some(&cfg.prompt)).await.map_err(|e| {
862 ZigError::Zag(format!("agent run failed for step '{step_name}': {e}"))
863 })?;
864 Ok(String::new())
865 } else {
866 let mut builder = apply_agent_config(AgentBuilder::new(), cfg);
867 builder = install_live_streaming(builder, step_name, session, prefix);
868 let output = builder.exec(&cfg.prompt).await.map_err(|e| {
869 ZigError::Zag(format!("agent exec failed for step '{step_name}': {e}"))
870 })?;
871 Ok(output.result.unwrap_or_default())
872 }
873 }
874 "review" => {
875 let provider = cfg.provider.clone().unwrap_or_else(|| "claude".to_string());
876 let (uncommitted, base, commit, title) = match &cfg.command_params {
877 Some(CommandParams::Review {
878 uncommitted,
879 base,
880 commit,
881 title,
882 }) => (*uncommitted, base.clone(), commit.clone(), title.clone()),
883 _ => (false, None, None, None),
884 };
885
886 if provider == "codex" {
890 let params = agent_review::ReviewParams {
891 provider,
892 uncommitted,
893 base,
894 commit,
895 title,
896 prompt: if cfg.prompt.is_empty() {
897 None
898 } else {
899 Some(cfg.prompt.clone())
900 },
901 system_prompt: cfg.system_prompt.clone(),
902 model: cfg.model.clone(),
903 root: cfg.root.clone(),
904 auto_approve: cfg.auto_approve,
905 add_dirs: cfg.add_dirs.clone(),
906 progress: Box::new(zag_agent::progress::SilentProgress),
907 };
908 let output = agent_review::run_review(params).await.map_err(|e| {
909 ZigError::Zag(format!("review failed for step '{step_name}': {e}"))
910 })?;
911 return Ok(output.and_then(|o| o.result).unwrap_or_default());
912 }
913
914 let diff = agent_review::gather_diff(
917 uncommitted,
918 base.as_deref(),
919 commit.as_deref(),
920 cfg.root.as_deref(),
921 )
922 .map_err(|e| {
923 ZigError::Zag(format!(
924 "review gather_diff failed for step '{step_name}': {e}"
925 ))
926 })?;
927 let user_prompt = if cfg.prompt.is_empty() {
928 None
929 } else {
930 Some(cfg.prompt.as_str())
931 };
932 let review_prompt =
933 agent_review::build_review_prompt(&diff, title.as_deref(), user_prompt);
934
935 let mut builder = apply_agent_config(AgentBuilder::new(), cfg);
936 builder = install_live_streaming(builder, step_name, session, prefix);
937 let output = builder.exec(&review_prompt).await.map_err(|e| {
938 ZigError::Zag(format!("review exec failed for step '{step_name}': {e}"))
939 })?;
940 Ok(output.result.unwrap_or_default())
941 }
942 "plan" => {
943 let (plan_output_path, instructions) = match &cfg.command_params {
944 Some(CommandParams::Plan {
945 output,
946 instructions,
947 }) => (output.clone(), instructions.clone()),
948 _ => (None, None),
949 };
950
951 let plan_prompt = agent_plan::build_plan_prompt(&cfg.prompt, instructions.as_deref());
952
953 let mut builder = apply_agent_config(AgentBuilder::new(), cfg);
954 builder = install_live_streaming(builder, step_name, session, prefix);
955 let output = builder.exec(&plan_prompt).await.map_err(|e| {
956 ZigError::Zag(format!("plan exec failed for step '{step_name}': {e}"))
957 })?;
958 let text = output.result.unwrap_or_default();
959
960 if let Some(path_str) = plan_output_path {
961 let target = resolve_plan_output_path(&path_str);
962 if let Some(parent) = target.parent()
963 && !parent.as_os_str().is_empty()
964 {
965 std::fs::create_dir_all(parent).map_err(|e| {
966 ZigError::Io(format!(
967 "failed to create plan output directory {}: {e}",
968 parent.display()
969 ))
970 })?;
971 }
972 std::fs::write(&target, &text).map_err(|e| {
973 ZigError::Io(format!(
974 "failed to write plan output to {}: {e}",
975 target.display()
976 ))
977 })?;
978 eprintln!("plan written to {}", target.display());
979 }
980
981 Ok(text)
982 }
983 "pipe" => {
984 let session_ids = match &cfg.command_params {
985 Some(CommandParams::Pipe { session_ids }) => session_ids.as_slice(),
986 _ => &[] as &[String],
987 };
988 let context = build_pipe_context(session_ids, cfg.root.as_deref())?;
989 let combined = format!(
990 "Here are results from previous agent sessions:\n\n{context}\n\n{}",
991 cfg.prompt
992 );
993
994 let mut builder = apply_agent_config(AgentBuilder::new(), cfg);
995 builder = install_live_streaming(builder, step_name, session, prefix);
996 let output = builder.exec(&combined).await.map_err(|e| {
997 ZigError::Zag(format!("pipe exec failed for step '{step_name}': {e}"))
998 })?;
999 Ok(output.result.unwrap_or_default())
1000 }
1001 "collect" => {
1002 let session_ids = match &cfg.command_params {
1003 Some(CommandParams::Collect { session_ids }) => session_ids.clone(),
1004 _ => Vec::new(),
1005 };
1006 let params = orch_collect::CollectParams {
1007 session_ids,
1008 tag: None,
1009 json: true,
1010 root: cfg.root.clone(),
1011 };
1012 let results = orch_collect::collect_results(¶ms).map_err(|e| {
1013 ZigError::Zag(format!("collect failed for step '{step_name}': {e}"))
1014 })?;
1015 let json = serde_json::to_string(&results)
1016 .map_err(|e| ZigError::Execution(format!("collect serialization failed: {e}")))?;
1017 emit_captured(&json, step_name, session, prefix);
1018 Ok(json)
1019 }
1020 "summary" => {
1021 let session_ids = match &cfg.command_params {
1022 Some(CommandParams::Summary { session_ids }) => session_ids.clone(),
1023 _ => Vec::new(),
1024 };
1025 let params = orch_summary::SummaryParams {
1026 session_ids,
1027 tag: None,
1028 stats: false,
1029 json: true,
1030 root: cfg.root.clone(),
1031 };
1032 let results = orch_summary::summarize_sessions(¶ms).map_err(|e| {
1033 ZigError::Zag(format!("summary failed for step '{step_name}': {e}"))
1034 })?;
1035 let json = serde_json::to_string(&results)
1036 .map_err(|e| ZigError::Execution(format!("summary serialization failed: {e}")))?;
1037 emit_captured(&json, step_name, session, prefix);
1038 Ok(json)
1039 }
1040 other => Err(ZigError::Execution(format!(
1041 "unknown command '{other}' for step '{step_name}'"
1042 ))),
1043 }
1044}
1045
1046fn install_live_streaming(
1053 builder: AgentBuilder,
1054 step_name: &str,
1055 session: Option<&Arc<SessionWriter>>,
1056 prefix: Option<&str>,
1057) -> AgentBuilder {
1058 let step_name_owned = step_name.to_string();
1059 let prefix_owned = prefix.map(String::from);
1060 let session_owned = session.cloned();
1061 let last_activity = Arc::new(std::sync::Mutex::new(Instant::now()));
1062 builder.on_log_event(move |evt| {
1063 if matches!(evt.kind, LogEventKind::Heartbeat { .. }) {
1064 let elapsed = last_activity.lock().unwrap().elapsed().as_secs();
1065 emit_heartbeat_line(elapsed, prefix_owned.as_deref());
1066 return;
1067 }
1068 *last_activity.lock().unwrap() = Instant::now();
1069 let Some(text) = zag_agent::listen::format_event_text(evt, false) else {
1070 return;
1071 };
1072 emit_live_line(
1073 &text,
1074 &step_name_owned,
1075 session_owned.as_ref(),
1076 prefix_owned.as_deref(),
1077 );
1078 })
1079}
1080
1081fn emit_live_line(
1084 text: &str,
1085 step_name: &str,
1086 session: Option<&Arc<SessionWriter>>,
1087 prefix: Option<&str>,
1088) {
1089 use std::io::Write;
1090 if text.is_empty() {
1091 return;
1092 }
1093 let stderr = std::io::stderr();
1094 for line in text.lines() {
1095 if let Some(w) = session {
1096 let _ = w.step_output(step_name, OutputStream::Stdout, line);
1097 }
1098 let mut h = stderr.lock();
1099 let _ = match prefix {
1100 Some(p) => writeln!(h, "[{p}] {line}"),
1101 None => writeln!(h, "{line}"),
1102 };
1103 }
1104}
1105
1106fn emit_heartbeat_line(elapsed_secs: u64, prefix: Option<&str>) {
1111 use std::io::Write;
1112 let line = format!(" \u{00b7} waiting ({elapsed_secs}s)");
1113 let stderr = std::io::stderr();
1114 let mut h = stderr.lock();
1115 let _ = match prefix {
1116 Some(p) => writeln!(h, "[{p}] {line}"),
1117 None => writeln!(h, "{line}"),
1118 };
1119}
1120
1121fn emit_captured(
1125 text: &str,
1126 step_name: &str,
1127 session: Option<&Arc<SessionWriter>>,
1128 prefix: Option<&str>,
1129) {
1130 emit_live_line(text, step_name, session, prefix);
1131}
1132
1133fn build_pipe_context(session_ids: &[String], root: Option<&str>) -> Result<String, ZigError> {
1139 let mut parts = Vec::new();
1140 for (i, id) in session_ids.iter().enumerate() {
1141 let Some(text) = orch_collect::extract_last_assistant_message(id, root) else {
1142 eprintln!("warning: no result found for upstream session {id}");
1143 continue;
1144 };
1145 let short = &id[..id.len().min(8)];
1146 let block = if session_ids.len() == 1 {
1147 format!("<session-result session=\"{short}\">\n{text}\n</session-result>")
1148 } else {
1149 format!(
1150 "<session-result index=\"{}\" session=\"{short}\">\n{text}\n</session-result>",
1151 i + 1
1152 )
1153 };
1154 parts.push(block);
1155 }
1156
1157 if parts.is_empty() {
1158 return Err(ZigError::Execution(
1159 "pipe: no results available from the specified sessions".into(),
1160 ));
1161 }
1162 Ok(parts.join("\n\n"))
1163}
1164
1165fn resolve_plan_output_path(path_str: &str) -> std::path::PathBuf {
1170 let expanded = expand_path(path_str);
1171 let path = std::path::PathBuf::from(&expanded);
1172 if path.extension().is_some() {
1173 return path;
1174 }
1175 let stamp = chrono::Utc::now().format("%Y%m%d-%H%M%S");
1176 path.join(format!("plan-{stamp}.md"))
1177}
1178
1179#[allow(clippy::too_many_arguments)]
1184async fn execute_step(
1185 step: &Step,
1186 prompt: &str,
1187 workflow_name: &str,
1188 model_override: Option<&str>,
1189 prefix: Option<&str>,
1190 session: Option<&Arc<SessionWriter>>,
1191 rendered_system_prompt: Option<&str>,
1192 workflow_provider: Option<&str>,
1193 workflow_model: Option<&str>,
1194 extra_add_dirs: &[std::path::PathBuf],
1195) -> Result<String, ZigError> {
1196 let cfg = build_agent_config(
1197 step,
1198 prompt,
1199 workflow_name,
1200 model_override,
1201 rendered_system_prompt,
1202 workflow_provider,
1203 workflow_model,
1204 extra_add_dirs,
1205 );
1206 dispatch_agent(&cfg, &step.name, session, prefix).await
1207}
1208
1209#[allow(clippy::too_many_arguments)]
1214async fn run_step_attempts(
1215 step: &Step,
1216 prompt: &str,
1217 workflow_name: &str,
1218 prefix: Option<&str>,
1219 session: Option<&Arc<SessionWriter>>,
1220 rendered_system_prompt: Option<&str>,
1221 workflow_provider: Option<&str>,
1222 workflow_model: Option<&str>,
1223 extra_add_dirs: &[std::path::PathBuf],
1224) -> Result<String, ZigError> {
1225 let mut attempts = 0;
1226 let max_attempts = if step.on_failure.as_ref() == Some(&FailurePolicy::Retry) {
1227 step.max_retries.unwrap_or(1) + 1
1228 } else {
1229 1
1230 };
1231
1232 loop {
1233 attempts += 1;
1234 let model_override = if attempts > 1 {
1235 step.retry_model.as_deref()
1236 } else {
1237 None
1238 };
1239 match execute_step(
1240 step,
1241 prompt,
1242 workflow_name,
1243 model_override,
1244 prefix,
1245 session,
1246 rendered_system_prompt,
1247 workflow_provider,
1248 workflow_model,
1249 extra_add_dirs,
1250 )
1251 .await
1252 {
1253 Ok(output) => return Ok(output),
1254 Err(e) => {
1255 if let Some(w) = session {
1256 let _ = w.step_failed(&step.name, None, attempts, &e.to_string());
1257 }
1258 if attempts < max_attempts {
1259 eprintln!(
1260 " retry {}/{} for step '{}'",
1261 attempts,
1262 max_attempts - 1,
1263 step.name
1264 );
1265 continue;
1266 }
1267 return Err(e);
1268 }
1269 }
1270 }
1271}
1272
1273fn extract_saves(
1280 output: &str,
1281 saves: &HashMap<String, String>,
1282) -> Result<HashMap<String, String>, ZigError> {
1283 let mut extracted = HashMap::new();
1284
1285 for (var_name, selector) in saves {
1286 let value = if selector == "$" {
1287 output.trim().to_string()
1288 } else if let Some(path) = selector.strip_prefix("$.") {
1289 let json: serde_json::Value = serde_json::from_str(output.trim()).map_err(|e| {
1290 ZigError::Execution(format!(
1291 "saves selector '{selector}' requires JSON output, but got parse error: {e}"
1292 ))
1293 })?;
1294 json_path_lookup(&json, path)
1295 } else {
1296 output.trim().to_string()
1297 };
1298
1299 extracted.insert(var_name.clone(), value);
1300 }
1301
1302 Ok(extracted)
1303}
1304
1305fn partition_tier<'a>(tier: &[&'a Step]) -> (Vec<&'a Step>, HashMap<String, Vec<&'a Step>>) {
1310 let mut sequential = Vec::new();
1311 let mut race_groups: HashMap<String, Vec<&'a Step>> = HashMap::new();
1312
1313 for step in tier {
1314 if let Some(group) = &step.race_group {
1315 race_groups.entry(group.clone()).or_default().push(step);
1316 } else {
1317 sequential.push(*step);
1318 }
1319 }
1320
1321 (sequential, race_groups)
1322}
1323
1324#[allow(clippy::too_many_arguments)]
1330async fn execute_race_group(
1331 steps: &[&Step],
1332 prompts: &HashMap<String, String>,
1333 system_prompts: &HashMap<String, String>,
1334 workflow_name: &str,
1335 tier_index: usize,
1336 session: Option<&Arc<SessionWriter>>,
1337 workflow_provider: Option<&str>,
1338 workflow_model: Option<&str>,
1339 storage_dirs: &HashMap<String, Vec<std::path::PathBuf>>,
1340) -> Result<(String, String), ZigError> {
1341 if let Some(w) = session {
1342 for step in steps {
1343 let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
1344 let preview = prompts
1345 .get(&step.name)
1346 .map(|p| prompt_preview(p))
1347 .unwrap_or_default();
1348 let _ = w.step_started(
1349 &step.name,
1350 tier_index,
1351 &zag_session_id,
1352 zag_command_name(&step.command),
1353 step.model.as_deref(),
1354 &preview,
1355 );
1356 }
1357 }
1358
1359 let race_started = Instant::now();
1360 let mut set: JoinSet<(String, Result<String, ZigError>)> = JoinSet::new();
1361
1362 for step in steps {
1363 let prompt = prompts
1364 .get(&step.name)
1365 .ok_or_else(|| ZigError::Execution(format!("missing prompt for step '{}'", step.name)))?
1366 .clone();
1367 eprintln!(" racing step '{}'...", step.name);
1368 let rendered_sp = system_prompts.get(&step.name).cloned();
1369 let empty: Vec<std::path::PathBuf> = Vec::new();
1370 let extra_dirs = storage_dirs.get(&step.name).unwrap_or(&empty).clone();
1371 let step_clone: Step = (*step).clone();
1372 let wf_name = workflow_name.to_string();
1373 let wf_provider = workflow_provider.map(String::from);
1374 let wf_model = workflow_model.map(String::from);
1375 let session_clone = session.cloned();
1376 let name = step.name.clone();
1377 set.spawn(async move {
1378 let res = execute_step(
1379 &step_clone,
1380 &prompt,
1381 &wf_name,
1382 None,
1383 None,
1384 session_clone.as_ref(),
1385 rendered_sp.as_deref(),
1386 wf_provider.as_deref(),
1387 wf_model.as_deref(),
1388 &extra_dirs,
1389 )
1390 .await;
1391 (name, res)
1392 });
1393 }
1394
1395 while let Some(joined) = set.join_next().await {
1397 let (winner_name, result) = match joined {
1398 Ok(pair) => pair,
1399 Err(e) if e.is_cancelled() => continue,
1400 Err(e) => return Err(ZigError::Execution(format!("race task panicked: {e}"))),
1401 };
1402 match result {
1403 Ok(stdout) => {
1404 set.abort_all();
1406 while let Some(r) = set.join_next().await {
1407 if let Ok((name, _)) = r {
1408 eprintln!(" cancelling step '{name}' (race lost)");
1409 }
1410 }
1411 let elapsed = race_started.elapsed().as_millis() as u64;
1412 eprintln!(" race won by '{winner_name}'");
1413 if let Some(w) = session {
1414 let _ = w.step_completed(&winner_name, 0, elapsed, Vec::new());
1415 }
1416 return Ok((winner_name, stdout));
1417 }
1418 Err(e) => {
1419 if let Some(w) = session {
1420 let _ = w.step_failed(&winner_name, None, 1, &e.to_string());
1421 }
1422 continue;
1424 }
1425 }
1426 }
1427
1428 Err(ZigError::Execution(
1429 "all racers failed without a winner".into(),
1430 ))
1431}
1432
1433#[allow(clippy::too_many_arguments)]
1435async fn execute_sequential_step(
1436 step: &Step,
1437 vars: &mut HashMap<String, String>,
1438 user_prompt: Option<&str>,
1439 step_outputs: &mut HashMap<String, String>,
1440 workflow_name: &str,
1441 pending_next: &mut Option<String>,
1442 tier_index: usize,
1443 session: Option<&Arc<SessionWriter>>,
1444 roles: &HashMap<String, Role>,
1445 resources: &ResourceCollector<'_>,
1446 memory: &MemoryCollector,
1447 storage: &StorageManager,
1448 workflow_dir: &Path,
1449 workflow_provider: Option<&str>,
1450 workflow_model: Option<&str>,
1451) -> Result<(), ZigError> {
1452 if let Some(condition) = &step.condition {
1453 if !evaluate_condition(condition, vars)? {
1454 eprintln!(
1455 " skipping '{}' (condition not met: {condition})",
1456 step.name
1457 );
1458 if let Some(w) = session {
1459 let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
1460 }
1461 return Ok(());
1462 }
1463 }
1464
1465 eprintln!(" running step '{}'...", step.name);
1466
1467 let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
1468 let rendered_sp = resolve_role_system_prompt(
1469 step,
1470 roles,
1471 resources,
1472 memory,
1473 storage,
1474 vars,
1475 workflow_dir,
1476 workflow_name,
1477 )?;
1478 let storage_dirs = storage.add_dirs_for_step(step.storage.as_deref());
1479 if let Some(w) = session {
1480 let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
1481 let _ = w.step_started(
1482 &step.name,
1483 tier_index,
1484 &zag_session_id,
1485 zag_command_name(&step.command),
1486 step.model.as_deref(),
1487 &prompt_preview(&prompt),
1488 );
1489 }
1490 let started = Instant::now();
1491 let result = run_step_attempts(
1492 step,
1493 &prompt,
1494 workflow_name,
1495 None,
1496 session,
1497 rendered_sp.as_deref(),
1498 workflow_provider,
1499 workflow_model,
1500 &storage_dirs,
1501 )
1502 .await;
1503
1504 match result {
1505 Ok(output) => {
1506 let mut saved_keys: Vec<String> = Vec::new();
1507 if !step.saves.is_empty() {
1508 let saved = extract_saves(&output, &step.saves)?;
1509 for (k, v) in &saved {
1510 eprintln!(" saved {k} = {v}");
1511 saved_keys.push(k.clone());
1512 }
1513 vars.extend(saved);
1514 }
1515
1516 step_outputs.insert(step.name.clone(), output);
1517 eprintln!(" completed '{}'", step.name);
1518 if let Some(w) = session {
1519 let _ = w.step_completed(
1520 &step.name,
1521 0,
1522 started.elapsed().as_millis() as u64,
1523 saved_keys,
1524 );
1525 }
1526
1527 if step.next.is_some() {
1528 *pending_next = step.next.clone();
1529 }
1530 }
1531 Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1532 FailurePolicy::Fail => return Err(e),
1533 FailurePolicy::Continue => {
1534 eprintln!(" step '{}' failed (continuing): {e}", step.name);
1535 }
1536 FailurePolicy::Retry => {
1537 return Err(e);
1538 }
1539 },
1540 }
1541
1542 Ok(())
1543}
1544
1545#[allow(clippy::too_many_arguments)]
1554async fn execute_parallel_tier(
1555 steps: &[&Step],
1556 vars: &mut HashMap<String, String>,
1557 user_prompt: Option<&str>,
1558 step_outputs: &mut HashMap<String, String>,
1559 workflow_name: &str,
1560 pending_next: &mut Option<String>,
1561 tier_index: usize,
1562 session: Option<&Arc<SessionWriter>>,
1563 roles: &HashMap<String, Role>,
1564 resources: &ResourceCollector<'_>,
1565 memory: &MemoryCollector,
1566 storage: &StorageManager,
1567 workflow_dir: &Path,
1568 workflow_provider: Option<&str>,
1569 workflow_model: Option<&str>,
1570) -> Result<(), ZigError> {
1571 let mut active: Vec<&Step> = Vec::new();
1574 let mut prompts: HashMap<String, String> = HashMap::new();
1575 let mut rendered_sps: HashMap<String, String> = HashMap::new();
1576 let mut storage_dirs_map: HashMap<String, Vec<std::path::PathBuf>> = HashMap::new();
1577 for step in steps {
1578 if let Some(condition) = &step.condition {
1579 if !evaluate_condition(condition, vars)? {
1580 eprintln!(
1581 " skipping '{}' (condition not met: {condition})",
1582 step.name
1583 );
1584 if let Some(w) = session {
1585 let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
1586 }
1587 continue;
1588 }
1589 }
1590 let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
1591 prompts.insert(step.name.clone(), prompt);
1592 if let Some(sp) = resolve_role_system_prompt(
1593 step,
1594 roles,
1595 resources,
1596 memory,
1597 storage,
1598 vars,
1599 workflow_dir,
1600 workflow_name,
1601 )? {
1602 rendered_sps.insert(step.name.clone(), sp);
1603 }
1604 storage_dirs_map.insert(
1605 step.name.clone(),
1606 storage.add_dirs_for_step(step.storage.as_deref()),
1607 );
1608 active.push(*step);
1609 }
1610
1611 if active.is_empty() {
1612 return Ok(());
1613 }
1614
1615 eprintln!(" running {} steps in parallel...", active.len());
1616
1617 let mut start_times: HashMap<String, Instant> = HashMap::new();
1618 let mut set: JoinSet<(String, Result<String, ZigError>)> = JoinSet::new();
1619 for step in &active {
1620 let step_clone: Step = (*step).clone();
1621 let prompt = prompts.remove(&step.name).unwrap_or_default();
1622 let rendered_sp = rendered_sps.remove(&step.name);
1623 let workflow_name_owned = workflow_name.to_string();
1624 let name = step.name.clone();
1625 eprintln!(" starting '{name}'...");
1626 if let Some(w) = session {
1627 let zag_session_id = format!("zig-{workflow_name}-{name}");
1628 let _ = w.step_started(
1629 &name,
1630 tier_index,
1631 &zag_session_id,
1632 zag_command_name(&step.command),
1633 step.model.as_deref(),
1634 &prompt_preview(&prompt),
1635 );
1636 }
1637 start_times.insert(name.clone(), Instant::now());
1638 let session_clone = session.cloned();
1639 let wf_provider = workflow_provider.map(String::from);
1640 let wf_model = workflow_model.map(String::from);
1641 let storage_dirs = storage_dirs_map.remove(&step.name).unwrap_or_default();
1642 set.spawn(async move {
1643 let res = run_step_attempts(
1644 &step_clone,
1645 &prompt,
1646 &workflow_name_owned,
1647 Some(&name),
1648 session_clone.as_ref(),
1649 rendered_sp.as_deref(),
1650 wf_provider.as_deref(),
1651 wf_model.as_deref(),
1652 &storage_dirs,
1653 )
1654 .await;
1655 (name, res)
1656 });
1657 }
1658
1659 let mut results: HashMap<String, Result<String, ZigError>> = HashMap::new();
1660 while let Some(joined) = set.join_next().await {
1661 match joined {
1662 Ok((name, res)) => {
1663 results.insert(name, res);
1664 }
1665 Err(e) => {
1666 return Err(ZigError::Execution(format!(
1667 "parallel step task panicked: {e}"
1668 )));
1669 }
1670 }
1671 }
1672
1673 let mut errors: Vec<String> = Vec::new();
1675 for step in &active {
1676 let Some(res) = results.remove(&step.name) else {
1677 continue;
1678 };
1679 let elapsed = start_times
1680 .remove(&step.name)
1681 .map(|t| t.elapsed().as_millis() as u64)
1682 .unwrap_or(0);
1683 match res {
1684 Ok(output) => {
1685 let mut saved_keys: Vec<String> = Vec::new();
1686 if !step.saves.is_empty() {
1687 let saved = extract_saves(&output, &step.saves)?;
1688 for (k, v) in &saved {
1689 eprintln!(" saved {k} = {v}");
1690 saved_keys.push(k.clone());
1691 }
1692 vars.extend(saved);
1693 }
1694 step_outputs.insert(step.name.clone(), output);
1695 eprintln!(" completed '{}'", step.name);
1696 if let Some(w) = session {
1697 let _ = w.step_completed(&step.name, 0, elapsed, saved_keys);
1698 }
1699 if step.next.is_some() && pending_next.is_none() {
1700 *pending_next = step.next.clone();
1701 }
1702 }
1703 Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1704 FailurePolicy::Continue => {
1705 eprintln!(" step '{}' failed (continuing): {e}", step.name);
1706 }
1707 FailurePolicy::Fail | FailurePolicy::Retry => {
1708 errors.push(format!("'{}': {e}", step.name));
1709 }
1710 },
1711 }
1712 }
1713
1714 if !errors.is_empty() {
1715 return Err(ZigError::Execution(format!(
1716 "parallel step(s) failed: {}",
1717 errors.join("; ")
1718 )));
1719 }
1720
1721 Ok(())
1722}
1723
1724fn init_vars(workflow: &Workflow) -> HashMap<String, String> {
1727 let mut vars = HashMap::new();
1728 for (name, var) in &workflow.vars {
1729 let value = match &var.default {
1730 Some(toml::Value::String(s)) => s.clone(),
1731 Some(toml::Value::Integer(n)) => n.to_string(),
1732 Some(toml::Value::Float(f)) => f.to_string(),
1733 Some(toml::Value::Boolean(b)) => b.to_string(),
1734 Some(other) => other.to_string(),
1735 None => String::new(),
1736 };
1737 vars.insert(name.clone(), value);
1738 }
1739 vars
1740}
1741
1742#[allow(clippy::too_many_arguments)]
1744async fn execute(
1745 workflow: &Workflow,
1746 workflow_path: &std::path::Path,
1747 user_prompt: Option<&str>,
1748 workflow_dir: &Path,
1749 disable_resources: bool,
1750 disable_memory: bool,
1751 disable_storage: bool,
1752 dry_run: bool,
1753 dry_run_format: DryRunFormat,
1754) -> Result<(), ZigError> {
1755 let mut vars = init_vars(workflow);
1756
1757 let resource_collector = ResourceCollector::from_env(
1758 &workflow.workflow.name,
1759 &workflow.workflow.resources,
1760 workflow_dir,
1761 disable_resources,
1762 );
1763
1764 let config = ZigConfig::load();
1765 let workflow_memory_mode = MemoryMode::from_str_opt(workflow.workflow.memory.as_deref());
1766 let memory_collector = MemoryCollector::from_env(
1767 &workflow.workflow.name,
1768 workflow_memory_mode,
1769 &config,
1770 disable_memory,
1771 );
1772
1773 let storage_manager = if disable_storage || workflow.storage.is_empty() {
1781 StorageManager::empty()
1782 } else if dry_run {
1783 let backend = FilesystemBackend::from_cwd()?;
1784 StorageManager::build_dry(&workflow.storage, backend)
1785 } else {
1786 let backend = FilesystemBackend::from_cwd()?;
1787 StorageManager::build(&workflow.storage, backend)?
1788 };
1789
1790 load_file_defaults(&mut vars, &workflow.vars, workflow_dir)?;
1792
1793 let prompt_var = workflow
1795 .vars
1796 .iter()
1797 .find(|(_, v)| v.from.as_deref() == Some("prompt"))
1798 .map(|(name, _)| name.clone());
1799
1800 if let Some(ref var_name) = prompt_var {
1801 if let Some(prompt) = user_prompt {
1802 vars.insert(var_name.clone(), prompt.to_string());
1803 }
1804 }
1805
1806 if let Err(errors) = validate::validate_var_values(&vars, &workflow.vars) {
1808 let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
1809 return Err(ZigError::Validation(msgs.join("; ")));
1810 }
1811
1812 let effective_user_prompt = if prompt_var.is_some() {
1814 None
1815 } else {
1816 user_prompt
1817 };
1818
1819 let mut step_outputs: HashMap<String, String> = HashMap::new();
1820
1821 let wf_provider = workflow.workflow.provider.as_deref();
1822 let wf_model = workflow.workflow.model.as_deref();
1823
1824 let tiers = topological_sort(&workflow.steps)?;
1825
1826 if dry_run {
1827 let ctx = DryRunContext {
1828 workflow,
1829 workflow_path,
1830 workflow_dir,
1831 vars: &vars,
1832 user_prompt: effective_user_prompt,
1833 roles: &workflow.roles,
1834 resources: &resource_collector,
1835 memory: &memory_collector,
1836 storage: &storage_manager,
1837 wf_provider,
1838 wf_model,
1839 disable_resources,
1840 disable_memory,
1841 disable_storage,
1842 };
1843 return crate::dry_run::print_plan(&ctx, &tiers, dry_run_format);
1844 }
1845
1846 eprintln!(
1847 "running workflow '{}' ({} steps in {} tiers)",
1848 workflow.workflow.name,
1849 workflow.steps.len(),
1850 tiers.len()
1851 );
1852
1853 let coordinator = match SessionWriter::create(
1858 &workflow.workflow.name,
1859 &workflow_path.to_string_lossy(),
1860 user_prompt,
1861 tiers.len(),
1862 ) {
1863 Ok(writer) => {
1864 eprintln!("zig session: {}", writer.session_id());
1865 Some(SessionCoordinator::start(writer))
1866 }
1867 Err(e) => {
1868 eprintln!("warning: failed to open zig session log: {e}");
1869 None
1870 }
1871 };
1872 let session_writer: Option<Arc<SessionWriter>> = coordinator.as_ref().map(|c| c.writer());
1873 let session_ref = session_writer.as_ref();
1874
1875 let mut iteration = 0;
1876 let mut pending_next: Option<String> = None;
1877
1878 loop {
1879 let tiers_to_run = if let Some(ref next_step) = pending_next {
1880 let remaining: Vec<Vec<&Step>> = tiers
1882 .iter()
1883 .map(|tier| {
1884 tier.iter()
1885 .filter(|s| s.name == *next_step)
1886 .copied()
1887 .collect::<Vec<_>>()
1888 })
1889 .filter(|tier| !tier.is_empty())
1890 .collect();
1891 pending_next = None;
1892 remaining
1893 } else if iteration == 0 {
1894 tiers.clone()
1895 } else {
1896 break;
1897 };
1898
1899 for (tier_index, tier) in tiers_to_run.iter().enumerate() {
1900 let (non_race, race_groups) = partition_tier(tier);
1901
1902 if let Some(w) = session_ref {
1903 let names: Vec<String> = tier.iter().map(|s| s.name.clone()).collect();
1904 let _ = w.tier_started(tier_index, names);
1905 }
1906
1907 if non_race.len() <= 1 {
1911 for step in &non_race {
1912 execute_sequential_step(
1913 step,
1914 &mut vars,
1915 effective_user_prompt,
1916 &mut step_outputs,
1917 &workflow.workflow.name,
1918 &mut pending_next,
1919 tier_index,
1920 session_ref,
1921 &workflow.roles,
1922 &resource_collector,
1923 &memory_collector,
1924 &storage_manager,
1925 workflow_dir,
1926 wf_provider,
1927 wf_model,
1928 )
1929 .await?;
1930 }
1931 } else {
1932 execute_parallel_tier(
1933 &non_race,
1934 &mut vars,
1935 effective_user_prompt,
1936 &mut step_outputs,
1937 &workflow.workflow.name,
1938 &mut pending_next,
1939 tier_index,
1940 session_ref,
1941 &workflow.roles,
1942 &resource_collector,
1943 &memory_collector,
1944 &storage_manager,
1945 workflow_dir,
1946 wf_provider,
1947 wf_model,
1948 )
1949 .await?;
1950 }
1951
1952 for (group_name, race_steps) in &race_groups {
1954 eprintln!(" starting race group '{group_name}'...");
1955
1956 let mut prompts = HashMap::new();
1958 let mut race_sps: HashMap<String, String> = HashMap::new();
1959 let mut race_storage_dirs: HashMap<String, Vec<std::path::PathBuf>> =
1960 HashMap::new();
1961 let mut active_steps: Vec<&Step> = Vec::new();
1962 for step in race_steps {
1963 if let Some(condition) = &step.condition {
1964 if !evaluate_condition(condition, &vars)? {
1965 eprintln!(
1966 " skipping '{}' (condition not met: {condition})",
1967 step.name
1968 );
1969 continue;
1970 }
1971 }
1972 let prompt =
1973 render_step_prompt(step, &vars, effective_user_prompt, &step_outputs);
1974 prompts.insert(step.name.clone(), prompt);
1975 if let Some(sp) = resolve_role_system_prompt(
1976 step,
1977 &workflow.roles,
1978 &resource_collector,
1979 &memory_collector,
1980 &storage_manager,
1981 &vars,
1982 workflow_dir,
1983 &workflow.workflow.name,
1984 )? {
1985 race_sps.insert(step.name.clone(), sp);
1986 }
1987 race_storage_dirs.insert(
1988 step.name.clone(),
1989 storage_manager.add_dirs_for_step(step.storage.as_deref()),
1990 );
1991 active_steps.push(step);
1992 }
1993
1994 if active_steps.is_empty() {
1995 continue;
1996 }
1997
1998 match execute_race_group(
1999 &active_steps,
2000 &prompts,
2001 &race_sps,
2002 &workflow.workflow.name,
2003 tier_index,
2004 session_ref,
2005 wf_provider,
2006 wf_model,
2007 &race_storage_dirs,
2008 )
2009 .await
2010 {
2011 Ok((winner_name, output)) => {
2012 if let Some(winner) = active_steps.iter().find(|s| s.name == winner_name) {
2014 if !winner.saves.is_empty() {
2015 let saved = extract_saves(&output, &winner.saves)?;
2016 for (k, v) in &saved {
2017 eprintln!(" saved {k} = {v}");
2018 }
2019 vars.extend(saved);
2020 }
2021 if winner.next.is_some() {
2022 pending_next = winner.next.clone();
2023 }
2024 }
2025 step_outputs.insert(winner_name.clone(), output);
2026 eprintln!(
2027 " completed race group '{group_name}' (winner: '{winner_name}')"
2028 );
2029 }
2030 Err(e) => return Err(e),
2031 }
2032 }
2033 }
2034
2035 iteration += 1;
2036 if pending_next.is_none() || iteration >= MAX_LOOP_ITERATIONS {
2037 if iteration >= MAX_LOOP_ITERATIONS {
2038 eprintln!("warning: reached maximum loop iterations ({MAX_LOOP_ITERATIONS})");
2039 }
2040 break;
2041 }
2042 }
2043
2044 eprintln!("workflow '{}' completed", workflow.workflow.name);
2045 if let Some(c) = coordinator {
2046 let _ = c.finish(SessionStatus::Success);
2047 }
2048 Ok(())
2049}
2050
2051fn zag_command_name(cmd: &Option<StepCommand>) -> &'static str {
2054 match cmd {
2055 None => "run",
2056 Some(StepCommand::Review) => "review",
2057 Some(StepCommand::Plan) => "plan",
2058 Some(StepCommand::Pipe) => "pipe",
2059 Some(StepCommand::Collect) => "collect",
2060 Some(StepCommand::Summary) => "summary",
2061 }
2062}
2063
2064fn prompt_preview(prompt: &str) -> String {
2066 const MAX: usize = 200;
2067 let collapsed: String = prompt
2068 .chars()
2069 .map(|c| if c == '\n' { ' ' } else { c })
2070 .collect();
2071 if collapsed.chars().count() <= MAX {
2072 collapsed
2073 } else {
2074 let truncated: String = collapsed.chars().take(MAX).collect();
2075 format!("{truncated}…")
2076 }
2077}
2078
2079#[cfg(test)]
2080#[path = "run_tests.rs"]
2081mod tests;