1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use serde::Serialize;
7use tokio::task::JoinSet;
8use zag_agent::builder::AgentBuilder;
9use zag_agent::session_log::LogEventKind;
10use zag_agent::{plan as agent_plan, review as agent_review};
11use zag_orch::collect as orch_collect;
12use zag_orch::summary as orch_summary;
13
14use crate::config::ZigConfig;
15use crate::dry_run::{DryRunContext, DryRunFormat};
16use crate::error::ZigError;
17use crate::memory::{MemoryCollector, render_memory_block};
18use crate::paths::expand_path;
19use crate::resources::{ResourceCollector, render_system_block};
20use crate::session::{OutputStream, SessionCoordinator, SessionStatus, SessionWriter};
21use crate::storage::{FilesystemBackend, StorageManager};
22use crate::workflow::model::{FailurePolicy, MemoryMode, Role, Step, StepCommand, Workflow};
23use crate::workflow::{parser, validate};
24
25const MAX_LOOP_ITERATIONS: usize = 100;
27
28#[allow(clippy::too_many_arguments)]
50pub async fn run_workflow(
51 workflow_path: &str,
52 user_prompt: Option<&str>,
53 disable_resources: bool,
54 disable_memory: bool,
55 disable_storage: bool,
56 dry_run: bool,
57 dry_run_format: DryRunFormat,
58) -> Result<(), ZigError> {
59 let path = resolve_workflow_path(workflow_path)?;
60 let (workflow, source) = parser::parse_workflow(&path)?;
61
62 if let Err(errors) = validate::validate(&workflow) {
63 let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
64 return Err(ZigError::Validation(msgs.join("; ")));
65 }
66
67 execute(
68 &workflow,
69 &path,
70 user_prompt,
71 source.dir(),
72 disable_resources,
73 disable_memory,
74 disable_storage,
75 dry_run,
76 dry_run_format,
77 )
78 .await
79}
80
81pub fn resolve_workflow_path(workflow: &str) -> Result<PathBuf, ZigError> {
94 let mut candidates = vec![
95 PathBuf::from(workflow),
96 PathBuf::from(format!("{workflow}.zwf")),
97 PathBuf::from(format!("{workflow}.zwfz")),
98 ];
99
100 if let Some(local_dir) = crate::paths::cwd_workflows_dir() {
101 candidates.push(local_dir.join(workflow));
102 candidates.push(local_dir.join(format!("{workflow}.zwf")));
103 candidates.push(local_dir.join(format!("{workflow}.zwfz")));
104 }
105
106 if let Some(global_dir) = crate::paths::global_workflows_dir() {
107 candidates.push(global_dir.join(workflow));
108 candidates.push(global_dir.join(format!("{workflow}.zwf")));
109 candidates.push(global_dir.join(format!("{workflow}.zwfz")));
110 }
111
112 for candidate in &candidates {
113 if candidate.exists() {
114 return Ok(candidate.clone());
115 }
116 }
117
118 Err(ZigError::Io(format!(
119 "workflow not found: '{workflow}' (tried: {})",
120 candidates
121 .iter()
122 .map(|p| p.display().to_string())
123 .collect::<Vec<_>>()
124 .join(", ")
125 )))
126}
127
128pub(crate) fn topological_sort(steps: &[Step]) -> Result<Vec<Vec<&Step>>, ZigError> {
134 let step_index: HashMap<&str, usize> = steps
135 .iter()
136 .enumerate()
137 .map(|(i, s)| (s.name.as_str(), i))
138 .collect();
139
140 let mut in_degree = vec![0usize; steps.len()];
141 for (i, step) in steps.iter().enumerate() {
142 for dep in &step.depends_on {
143 if step_index.contains_key(dep.as_str()) {
144 in_degree[i] += 1;
145 }
146 }
147 }
148
149 let mut tiers = Vec::new();
150 let mut remaining = in_degree.clone();
151 let mut completed: Vec<bool> = vec![false; steps.len()];
152
153 loop {
154 let tier: Vec<usize> = (0..steps.len())
155 .filter(|&i| !completed[i] && remaining[i] == 0)
156 .collect();
157
158 if tier.is_empty() {
159 break;
160 }
161
162 for &i in &tier {
163 completed[i] = true;
164 }
165
166 for &i in &tier {
168 for (j, step) in steps.iter().enumerate() {
169 if !completed[j] && step.depends_on.contains(&steps[i].name) {
170 remaining[j] -= 1;
171 }
172 }
173 }
174
175 tiers.push(tier.iter().map(|&i| &steps[i]).collect());
176 }
177
178 let completed_count: usize = completed.iter().filter(|&&c| c).count();
179 if completed_count != steps.len() {
180 return Err(ZigError::Execution(
181 "could not resolve all steps — possible undetected cycle".into(),
182 ));
183 }
184
185 Ok(tiers)
186}
187
188pub(crate) fn substitute_vars(template: &str, vars: &HashMap<String, String>) -> String {
194 let mut result = String::with_capacity(template.len());
195 let mut rest = template;
196
197 while let Some(start) = rest.find("${") {
198 result.push_str(&rest[..start]);
199 let after_start = &rest[start + 2..];
200
201 if let Some(end) = after_start.find('}') {
202 let var_expr = &after_start[..end];
203 let mut parts = var_expr.splitn(2, '.');
204 let root = parts.next().unwrap_or(var_expr);
205
206 if let Some(value) = vars.get(root) {
207 if let Some(path) = parts.next() {
208 if let Ok(json) = serde_json::from_str::<serde_json::Value>(value) {
210 let resolved = json_path_lookup(&json, path);
211 result.push_str(&resolved);
212 } else {
213 result.push_str(value);
214 }
215 } else {
216 result.push_str(value);
217 }
218 } else {
219 result.push_str(&rest[start..start + 2 + end + 1]);
221 }
222
223 rest = &after_start[end + 1..];
224 } else {
225 result.push_str(&rest[start..]);
226 rest = "";
227 }
228 }
229
230 result.push_str(rest);
231 result
232}
233
234fn json_path_lookup(value: &serde_json::Value, path: &str) -> String {
236 let mut current = value;
237 for key in path.split('.') {
238 match current.get(key) {
239 Some(v) => current = v,
240 None => return format!("${{?.{path}}}"),
241 }
242 }
243 match current {
244 serde_json::Value::String(s) => s.clone(),
245 other => other.to_string(),
246 }
247}
248
249#[allow(clippy::too_many_arguments)]
266pub(crate) fn resolve_role_system_prompt(
267 step: &Step,
268 roles: &HashMap<String, Role>,
269 resources: &ResourceCollector<'_>,
270 memory: &MemoryCollector,
271 storage: &StorageManager,
272 vars: &HashMap<String, String>,
273 workflow_dir: &Path,
274 workflow_name: &str,
275) -> Result<Option<String>, ZigError> {
276 let base_prompt: Option<String> = if let Some(ref sp) = step.system_prompt {
278 Some(substitute_vars(sp, vars))
279 } else if let Some(ref role_ref) = step.role {
280 let resolved_name = substitute_vars(role_ref, vars);
281 let role = roles.get(&resolved_name).ok_or_else(|| {
282 ZigError::Execution(format!(
283 "step '{}' references role '{}' which does not exist",
284 step.name, resolved_name
285 ))
286 })?;
287
288 let raw_prompt = if let Some(ref file_path) = role.system_prompt_file {
289 let full_path = workflow_dir.join(expand_path(file_path));
290 Some(std::fs::read_to_string(&full_path).map_err(|e| {
291 ZigError::Execution(format!(
292 "failed to read system_prompt_file '{}' for role '{}': {e}",
293 full_path.display(),
294 resolved_name
295 ))
296 })?)
297 } else {
298 role.system_prompt.clone()
299 };
300
301 raw_prompt.map(|p| substitute_vars(&p, vars))
302 } else {
303 None
304 };
305
306 let set = resources.collect_for_step(&step.resources)?;
308 let resource_block = render_system_block(&set);
309
310 let memory_entries = memory.collect_for_step(step.memory.as_deref())?;
312 let memory_block = render_memory_block(&memory_entries, workflow_name, Some(&step.name));
313
314 let storage_block = match storage.render_block(step.storage.as_deref())? {
316 Some(mut s) => {
317 s.push('\n');
318 s
319 }
320 None => String::new(),
321 };
322
323 let prefix = format!("{resource_block}{memory_block}{storage_block}");
324
325 match (prefix.is_empty(), base_prompt) {
326 (true, None) => Ok(None),
327 (true, Some(p)) => Ok(Some(p)),
328 (false, None) => Ok(Some(prefix.trim_end().to_string())),
329 (false, Some(p)) => Ok(Some(format!("{prefix}{p}"))),
330 }
331}
332
333fn load_file_defaults(
338 vars: &mut HashMap<String, String>,
339 declarations: &HashMap<String, crate::workflow::model::Variable>,
340 workflow_dir: &Path,
341) -> Result<(), ZigError> {
342 for (name, decl) in declarations {
343 if decl.default.is_none() {
344 if let Some(ref file_path) = decl.default_file {
345 let full_path = workflow_dir.join(expand_path(file_path));
346 let content = std::fs::read_to_string(&full_path).map_err(|e| {
347 ZigError::Execution(format!(
348 "failed to read default_file '{}' for variable '{name}': {e}",
349 full_path.display()
350 ))
351 })?;
352 vars.insert(name.clone(), content);
353 }
354 }
355 }
356 Ok(())
357}
358
359pub(crate) fn evaluate_condition(
366 condition: &str,
367 vars: &HashMap<String, String>,
368) -> Result<bool, ZigError> {
369 let condition = condition.trim();
370
371 let operators = ["<=", ">=", "!=", "==", "<", ">"];
373 for op in &operators {
374 if let Some(pos) = condition.find(op) {
375 let lhs = resolve_operand(condition[..pos].trim(), vars);
376 let rhs = resolve_operand(condition[pos + op.len()..].trim(), vars);
377 return Ok(compare(&lhs, &rhs, op));
378 }
379 }
380
381 let value = vars.get(condition).map(|s| s.as_str()).unwrap_or("");
383 Ok(is_truthy(value))
384}
385
386fn resolve_operand(token: &str, vars: &HashMap<String, String>) -> String {
391 if (token.starts_with('"') && token.ends_with('"'))
393 || (token.starts_with('\'') && token.ends_with('\''))
394 {
395 return token[1..token.len() - 1].to_string();
396 }
397 if let Some(val) = vars.get(token) {
399 return val.clone();
400 }
401 token.to_string()
403}
404
405fn compare(lhs: &str, rhs: &str, op: &str) -> bool {
408 if let (Ok(l), Ok(r)) = (lhs.parse::<f64>(), rhs.parse::<f64>()) {
409 return match op {
410 "==" => (l - r).abs() < f64::EPSILON,
411 "!=" => (l - r).abs() >= f64::EPSILON,
412 "<" => l < r,
413 ">" => l > r,
414 "<=" => l <= r,
415 ">=" => l >= r,
416 _ => false,
417 };
418 }
419 match op {
420 "==" => lhs == rhs,
421 "!=" => lhs != rhs,
422 "<" => lhs < rhs,
423 ">" => lhs > rhs,
424 "<=" => lhs <= rhs,
425 ">=" => lhs >= rhs,
426 _ => false,
427 }
428}
429
430fn is_truthy(value: &str) -> bool {
432 !value.is_empty() && value != "false" && value != "0"
433}
434
435pub(crate) fn render_step_prompt(
438 step: &Step,
439 vars: &HashMap<String, String>,
440 user_prompt: Option<&str>,
441 dependency_outputs: &HashMap<String, String>,
442) -> String {
443 let mut prompt = String::new();
444
445 if let Some(ctx) = user_prompt {
447 prompt.push_str(&format!("User context: {ctx}\n\n"));
448 }
449
450 if step.inject_context {
452 for dep in &step.depends_on {
453 if let Some(output) = dependency_outputs.get(dep) {
454 prompt.push_str(&format!("--- Output from '{dep}' ---\n{output}\n\n"));
455 }
456 }
457 }
458
459 prompt.push_str(&substitute_vars(&step.prompt, vars));
461
462 prompt
463}
464
465#[derive(Debug, Clone, Serialize, Default)]
474pub struct AgentConfig {
475 pub command: String,
478
479 pub provider: Option<String>,
481 pub model: Option<String>,
482 pub system_prompt: Option<String>,
483 pub root: Option<String>,
484 pub add_dirs: Vec<String>,
485 #[serde(serialize_with = "serialize_env_pairs")]
486 pub env: Vec<(String, String)>,
487 pub files: Vec<String>,
488 pub auto_approve: bool,
489 #[serde(skip_serializing_if = "Option::is_none")]
491 pub worktree: Option<Option<String>>,
492 #[serde(skip_serializing_if = "Option::is_none")]
493 pub sandbox: Option<String>,
494
495 pub json_mode: bool,
497 #[serde(skip_serializing_if = "Option::is_none")]
498 pub json_schema: Option<String>,
499 #[serde(skip_serializing_if = "Option::is_none")]
500 pub output_format: Option<String>,
501
502 #[serde(skip_serializing_if = "Option::is_none")]
504 pub max_turns: Option<u32>,
505 #[serde(skip_serializing_if = "Option::is_none")]
506 pub timeout: Option<String>,
507 #[serde(skip_serializing_if = "Option::is_none")]
508 pub mcp_config: Option<String>,
509
510 pub session_name: String,
512 #[serde(skip_serializing_if = "Option::is_none")]
513 pub description: Option<String>,
514 pub tags: Vec<String>,
515
516 pub prompt: String,
518
519 pub accepts_agent_args: bool,
522
523 #[serde(skip_serializing_if = "Option::is_none")]
525 pub command_params: Option<CommandParams>,
526
527 pub interactive: bool,
530}
531
532fn serialize_env_pairs<S>(pairs: &[(String, String)], s: S) -> Result<S::Ok, S::Error>
533where
534 S: serde::Serializer,
535{
536 use serde::ser::SerializeSeq;
537 let mut seq = s.serialize_seq(Some(pairs.len()))?;
538 for (k, v) in pairs {
539 seq.serialize_element(&format!("{k}={v}"))?;
540 }
541 seq.end()
542}
543
544#[derive(Debug, Clone, Serialize)]
546#[serde(tag = "kind", rename_all = "snake_case")]
547pub enum CommandParams {
548 Review {
549 uncommitted: bool,
550 base: Option<String>,
551 commit: Option<String>,
552 title: Option<String>,
553 },
554 Plan {
555 output: Option<String>,
556 instructions: Option<String>,
557 },
558 Pipe {
559 session_ids: Vec<String>,
560 },
561 Collect {
562 session_ids: Vec<String>,
563 },
564 Summary {
565 session_ids: Vec<String>,
566 },
567}
568
569#[allow(clippy::too_many_arguments)]
573pub(crate) fn build_agent_config(
574 step: &Step,
575 prompt: &str,
576 workflow_name: &str,
577 model_override: Option<&str>,
578 rendered_system_prompt: Option<&str>,
579 workflow_provider: Option<&str>,
580 workflow_model: Option<&str>,
581 extra_add_dirs: &[std::path::PathBuf],
582) -> AgentConfig {
583 let session_name = |dep: &str| format!("zig-{workflow_name}-{dep}");
584
585 let (command_label, accepts_agent_args, command_params) = match &step.command {
586 None => ("run".to_string(), true, None),
587 Some(StepCommand::Review) => (
588 "review".to_string(),
589 true,
590 Some(CommandParams::Review {
591 uncommitted: step.uncommitted,
592 base: step.base.clone(),
593 commit: step.commit.clone(),
594 title: step.title.clone(),
595 }),
596 ),
597 Some(StepCommand::Plan) => (
598 "plan".to_string(),
599 true,
600 Some(CommandParams::Plan {
601 output: step.plan_output.as_deref().map(expand_path),
602 instructions: step.instructions.clone(),
603 }),
604 ),
605 Some(StepCommand::Pipe) => {
606 let session_ids: Vec<String> =
607 step.depends_on.iter().map(|d| session_name(d)).collect();
608 (
609 "pipe".to_string(),
610 true,
611 Some(CommandParams::Pipe { session_ids }),
612 )
613 }
614 Some(StepCommand::Collect) => {
615 let session_ids: Vec<String> =
616 step.depends_on.iter().map(|d| session_name(d)).collect();
617 (
618 "collect".to_string(),
619 false,
620 Some(CommandParams::Collect { session_ids }),
621 )
622 }
623 Some(StepCommand::Summary) => {
624 let session_ids: Vec<String> =
625 step.depends_on.iter().map(|d| session_name(d)).collect();
626 (
627 "summary".to_string(),
628 false,
629 Some(CommandParams::Summary { session_ids }),
630 )
631 }
632 };
633
634 let mut cfg = AgentConfig {
637 command: command_label,
638 session_name: session_name(&step.name),
639 description: if step.description.is_empty() {
640 None
641 } else {
642 Some(step.description.clone())
643 },
644 tags: {
645 let mut t = vec!["zig-workflow".to_string()];
646 t.extend(step.tags.iter().cloned());
647 t
648 },
649 timeout: step.timeout.clone(),
650 prompt: prompt.to_string(),
651 accepts_agent_args,
652 command_params,
653 interactive: step.interactive,
654 ..Default::default()
655 };
656
657 if !accepts_agent_args {
658 return cfg;
659 }
660
661 cfg.provider = step
662 .provider
663 .clone()
664 .or_else(|| workflow_provider.map(String::from));
665 cfg.model = model_override
666 .map(String::from)
667 .or_else(|| step.model.clone())
668 .or_else(|| workflow_model.map(String::from));
669 cfg.system_prompt = rendered_system_prompt.map(String::from);
670 cfg.max_turns = step.max_turns;
671
672 if let Some(output) = &step.output {
675 cfg.output_format = Some(output.clone());
676 } else if step.json {
677 cfg.json_mode = true;
678 }
679 cfg.json_schema = step.json_schema.clone();
680 cfg.mcp_config = step.mcp_config.as_deref().map(expand_path);
681
682 cfg.auto_approve = step.auto_approve;
683 cfg.root = step.root.as_deref().map(expand_path);
684 cfg.add_dirs = step
685 .add_dirs
686 .iter()
687 .map(|d| expand_path(d))
688 .chain(extra_add_dirs.iter().map(|p| p.display().to_string()))
689 .collect();
690 cfg.env = step
691 .env
692 .iter()
693 .map(|(k, v)| (k.clone(), v.clone()))
694 .collect();
695 cfg.files = step.files.iter().map(|f| expand_path(f)).collect();
696
697 if step.worktree {
699 cfg.worktree = Some(None);
700 }
701 cfg.sandbox = step.sandbox.clone();
702
703 if cfg.interactive {
707 let base = cfg.system_prompt.take().unwrap_or_default();
708 cfg.system_prompt = Some(format!(
709 "{base}{}",
710 crate::self_cmd::INTERACTIVE_SELF_TERMINATE_INSTRUCTION
711 ));
712 }
713
714 cfg
715}
716
717pub(crate) fn apply_agent_config(mut builder: AgentBuilder, cfg: &AgentConfig) -> AgentBuilder {
720 if let Some(ref p) = cfg.provider {
721 builder = builder.provider(p);
722 }
723 if let Some(ref m) = cfg.model {
724 builder = builder.model(m);
725 }
726 if let Some(ref sp) = cfg.system_prompt {
727 builder = builder.system_prompt(sp);
728 }
729 if let Some(ref r) = cfg.root {
730 builder = builder.root(r);
731 }
732 if cfg.auto_approve {
733 builder = builder.auto_approve(true);
734 }
735 for dir in &cfg.add_dirs {
736 builder = builder.add_dir(dir);
737 }
738 for (k, v) in &cfg.env {
739 builder = builder.env(k, v);
740 }
741 for f in &cfg.files {
742 builder = builder.file(f);
743 }
744 if let Some(ref wt) = cfg.worktree {
745 builder = builder.worktree(wt.as_deref());
746 }
747 if let Some(ref sb) = cfg.sandbox {
748 builder = builder.sandbox(Some(sb));
749 }
750 if let Some(ref fmt) = cfg.output_format {
751 builder = builder.output_format(fmt);
752 }
753 if cfg.json_mode {
754 if let Some(ref schema) = cfg.json_schema {
755 if let Ok(v) = serde_json::from_str::<serde_json::Value>(schema) {
756 builder = builder.json_schema(v);
757 } else {
758 builder = builder.json();
759 }
760 } else {
761 builder = builder.json();
762 }
763 } else if let Some(ref schema) = cfg.json_schema {
764 if let Ok(v) = serde_json::from_str::<serde_json::Value>(schema) {
765 builder = builder.json_schema(v);
766 }
767 }
768 if let Some(turns) = cfg.max_turns {
769 builder = builder.max_turns(turns);
770 }
771 if let Some(ref t) = cfg.timeout {
772 if let Some(dur) = parse_timeout_string(t) {
773 builder = builder.timeout(dur);
774 }
775 }
776 if let Some(ref mcp) = cfg.mcp_config {
777 builder = builder.mcp_config(mcp);
778 }
779 builder = builder.name(&cfg.session_name);
780 if let Some(ref d) = cfg.description {
781 builder = builder.description(d);
782 }
783 for tag in &cfg.tags {
784 builder = builder.tag(tag);
785 }
786 builder
787}
788
789fn parse_timeout_string(s: &str) -> Option<Duration> {
792 let s = s.trim();
796 if s.is_empty() {
797 return None;
798 }
799 if let Ok(secs) = s.parse::<u64>() {
800 return Some(Duration::from_secs(secs));
801 }
802 let mut total = Duration::ZERO;
803 let mut current = String::new();
804 let mut chars = s.chars().peekable();
805 while let Some(c) = chars.next() {
806 if c.is_ascii_digit() || c == '.' {
807 current.push(c);
808 continue;
809 }
810 let mut unit = String::from(c);
811 if c == 'm' && chars.peek() == Some(&'s') {
812 unit.push(chars.next().unwrap());
813 }
814 let value: f64 = current.parse().ok()?;
815 current.clear();
816 let piece = match unit.as_str() {
817 "ms" => Duration::from_millis(value as u64),
818 "s" => Duration::from_secs_f64(value),
819 "m" => Duration::from_secs_f64(value * 60.0),
820 "h" => Duration::from_secs_f64(value * 3600.0),
821 _ => return None,
822 };
823 total += piece;
824 }
825 if !current.is_empty() {
826 return None;
827 }
828 Some(total)
829}
830
831async fn dispatch_agent(
849 cfg: &AgentConfig,
850 step_name: &str,
851 workflow_name: &str,
852 session: Option<&Arc<SessionWriter>>,
853 prefix: Option<&str>,
854) -> Result<String, ZigError> {
855 match cfg.command.as_str() {
856 "run" => {
857 if cfg.interactive {
858 let mut builder = apply_agent_config(AgentBuilder::new(), cfg);
871 builder = builder.register_process(
872 zag_agent::process_registration::RegisterOptionsOwned {
873 provider: cfg.provider.clone().unwrap_or_else(|| "claude".to_string()),
874 model: cfg.model.clone().unwrap_or_default(),
875 command: "run".to_string(),
876 prompt_preview: Some(prompt_preview(&cfg.prompt)),
877 session_id: Some(format!("zig-{workflow_name}-{step_name}")),
878 session_name: None,
879 root: cfg.root.clone(),
880 },
881 );
882 builder.run(Some(&cfg.prompt)).await.map_err(|e| {
883 ZigError::Zag(format!("agent run failed for step '{step_name}': {e}"))
884 })?;
885 Ok(String::new())
886 } else {
887 let mut builder = apply_agent_config(AgentBuilder::new(), cfg);
888 builder = install_live_streaming(builder, step_name, session, prefix);
889 let output = builder.exec(&cfg.prompt).await.map_err(|e| {
890 ZigError::Zag(format!("agent exec failed for step '{step_name}': {e}"))
891 })?;
892 Ok(output.result.unwrap_or_default())
893 }
894 }
895 "review" => {
896 let provider = cfg.provider.clone().unwrap_or_else(|| "claude".to_string());
897 let (uncommitted, base, commit, title) = match &cfg.command_params {
898 Some(CommandParams::Review {
899 uncommitted,
900 base,
901 commit,
902 title,
903 }) => (*uncommitted, base.clone(), commit.clone(), title.clone()),
904 _ => (false, None, None, None),
905 };
906
907 if provider == "codex" {
911 let params = agent_review::ReviewParams {
912 provider,
913 uncommitted,
914 base,
915 commit,
916 title,
917 prompt: if cfg.prompt.is_empty() {
918 None
919 } else {
920 Some(cfg.prompt.clone())
921 },
922 system_prompt: cfg.system_prompt.clone(),
923 model: cfg.model.clone(),
924 root: cfg.root.clone(),
925 auto_approve: cfg.auto_approve,
926 add_dirs: cfg.add_dirs.clone(),
927 progress: Box::new(zag_agent::progress::SilentProgress),
928 };
929 let output = agent_review::run_review(params).await.map_err(|e| {
930 ZigError::Zag(format!("review failed for step '{step_name}': {e}"))
931 })?;
932 return Ok(output.and_then(|o| o.result).unwrap_or_default());
933 }
934
935 let diff = agent_review::gather_diff(
938 uncommitted,
939 base.as_deref(),
940 commit.as_deref(),
941 cfg.root.as_deref(),
942 )
943 .map_err(|e| {
944 ZigError::Zag(format!(
945 "review gather_diff failed for step '{step_name}': {e}"
946 ))
947 })?;
948 let user_prompt = if cfg.prompt.is_empty() {
949 None
950 } else {
951 Some(cfg.prompt.as_str())
952 };
953 let review_prompt =
954 agent_review::build_review_prompt(&diff, title.as_deref(), user_prompt);
955
956 let mut builder = apply_agent_config(AgentBuilder::new(), cfg);
957 builder = install_live_streaming(builder, step_name, session, prefix);
958 let output = builder.exec(&review_prompt).await.map_err(|e| {
959 ZigError::Zag(format!("review exec failed for step '{step_name}': {e}"))
960 })?;
961 Ok(output.result.unwrap_or_default())
962 }
963 "plan" => {
964 let (plan_output_path, instructions) = match &cfg.command_params {
965 Some(CommandParams::Plan {
966 output,
967 instructions,
968 }) => (output.clone(), instructions.clone()),
969 _ => (None, None),
970 };
971
972 let plan_prompt = agent_plan::build_plan_prompt(&cfg.prompt, instructions.as_deref());
973
974 let mut builder = apply_agent_config(AgentBuilder::new(), cfg);
975 builder = install_live_streaming(builder, step_name, session, prefix);
976 let output = builder.exec(&plan_prompt).await.map_err(|e| {
977 ZigError::Zag(format!("plan exec failed for step '{step_name}': {e}"))
978 })?;
979 let text = output.result.unwrap_or_default();
980
981 if let Some(path_str) = plan_output_path {
982 let target = resolve_plan_output_path(&path_str);
983 if let Some(parent) = target.parent()
984 && !parent.as_os_str().is_empty()
985 {
986 std::fs::create_dir_all(parent).map_err(|e| {
987 ZigError::Io(format!(
988 "failed to create plan output directory {}: {e}",
989 parent.display()
990 ))
991 })?;
992 }
993 std::fs::write(&target, &text).map_err(|e| {
994 ZigError::Io(format!(
995 "failed to write plan output to {}: {e}",
996 target.display()
997 ))
998 })?;
999 eprintln!("plan written to {}", target.display());
1000 }
1001
1002 Ok(text)
1003 }
1004 "pipe" => {
1005 let session_ids = match &cfg.command_params {
1006 Some(CommandParams::Pipe { session_ids }) => session_ids.as_slice(),
1007 _ => &[] as &[String],
1008 };
1009 let context = build_pipe_context(session_ids, cfg.root.as_deref())?;
1010 let combined = format!(
1011 "Here are results from previous agent sessions:\n\n{context}\n\n{}",
1012 cfg.prompt
1013 );
1014
1015 let mut builder = apply_agent_config(AgentBuilder::new(), cfg);
1016 builder = install_live_streaming(builder, step_name, session, prefix);
1017 let output = builder.exec(&combined).await.map_err(|e| {
1018 ZigError::Zag(format!("pipe exec failed for step '{step_name}': {e}"))
1019 })?;
1020 Ok(output.result.unwrap_or_default())
1021 }
1022 "collect" => {
1023 let session_ids = match &cfg.command_params {
1024 Some(CommandParams::Collect { session_ids }) => session_ids.clone(),
1025 _ => Vec::new(),
1026 };
1027 let params = orch_collect::CollectParams {
1028 session_ids,
1029 tag: None,
1030 json: true,
1031 root: cfg.root.clone(),
1032 };
1033 let results = orch_collect::collect_results(¶ms).map_err(|e| {
1034 ZigError::Zag(format!("collect failed for step '{step_name}': {e}"))
1035 })?;
1036 let json = serde_json::to_string(&results)
1037 .map_err(|e| ZigError::Execution(format!("collect serialization failed: {e}")))?;
1038 emit_captured(&json, step_name, session, prefix);
1039 Ok(json)
1040 }
1041 "summary" => {
1042 let session_ids = match &cfg.command_params {
1043 Some(CommandParams::Summary { session_ids }) => session_ids.clone(),
1044 _ => Vec::new(),
1045 };
1046 let params = orch_summary::SummaryParams {
1047 session_ids,
1048 tag: None,
1049 stats: false,
1050 json: true,
1051 root: cfg.root.clone(),
1052 };
1053 let results = orch_summary::summarize_sessions(¶ms).map_err(|e| {
1054 ZigError::Zag(format!("summary failed for step '{step_name}': {e}"))
1055 })?;
1056 let json = serde_json::to_string(&results)
1057 .map_err(|e| ZigError::Execution(format!("summary serialization failed: {e}")))?;
1058 emit_captured(&json, step_name, session, prefix);
1059 Ok(json)
1060 }
1061 other => Err(ZigError::Execution(format!(
1062 "unknown command '{other}' for step '{step_name}'"
1063 ))),
1064 }
1065}
1066
1067fn install_live_streaming(
1074 builder: AgentBuilder,
1075 step_name: &str,
1076 session: Option<&Arc<SessionWriter>>,
1077 prefix: Option<&str>,
1078) -> AgentBuilder {
1079 let step_name_owned = step_name.to_string();
1080 let prefix_owned = prefix.map(String::from);
1081 let session_owned = session.cloned();
1082 let last_activity = Arc::new(std::sync::Mutex::new(Instant::now()));
1083 builder.on_log_event(move |evt| {
1084 if matches!(evt.kind, LogEventKind::Heartbeat { .. }) {
1085 let elapsed = last_activity.lock().unwrap().elapsed().as_secs();
1086 emit_heartbeat_line(elapsed, prefix_owned.as_deref());
1087 return;
1088 }
1089 *last_activity.lock().unwrap() = Instant::now();
1090 let Some(text) = zag_agent::listen::format_event_text(evt, false) else {
1091 return;
1092 };
1093 emit_live_line(
1094 &text,
1095 &step_name_owned,
1096 session_owned.as_ref(),
1097 prefix_owned.as_deref(),
1098 );
1099 })
1100}
1101
1102fn emit_live_line(
1105 text: &str,
1106 step_name: &str,
1107 session: Option<&Arc<SessionWriter>>,
1108 prefix: Option<&str>,
1109) {
1110 use std::io::Write;
1111 if text.is_empty() {
1112 return;
1113 }
1114 let stderr = std::io::stderr();
1115 for line in text.lines() {
1116 if let Some(w) = session {
1117 let _ = w.step_output(step_name, OutputStream::Stdout, line);
1118 }
1119 let mut h = stderr.lock();
1120 let _ = match prefix {
1121 Some(p) => writeln!(h, "[{p}] {line}"),
1122 None => writeln!(h, "{line}"),
1123 };
1124 }
1125}
1126
1127fn emit_heartbeat_line(elapsed_secs: u64, prefix: Option<&str>) {
1132 use std::io::Write;
1133 let line = format!(" \u{00b7} waiting ({elapsed_secs}s)");
1134 let stderr = std::io::stderr();
1135 let mut h = stderr.lock();
1136 let _ = match prefix {
1137 Some(p) => writeln!(h, "[{p}] {line}"),
1138 None => writeln!(h, "{line}"),
1139 };
1140}
1141
1142fn emit_captured(
1146 text: &str,
1147 step_name: &str,
1148 session: Option<&Arc<SessionWriter>>,
1149 prefix: Option<&str>,
1150) {
1151 emit_live_line(text, step_name, session, prefix);
1152}
1153
1154fn build_pipe_context(session_ids: &[String], root: Option<&str>) -> Result<String, ZigError> {
1160 let mut parts = Vec::new();
1161 for (i, id) in session_ids.iter().enumerate() {
1162 let Some(text) = orch_collect::extract_last_assistant_message(id, root) else {
1163 eprintln!("warning: no result found for upstream session {id}");
1164 continue;
1165 };
1166 let short = &id[..id.len().min(8)];
1167 let block = if session_ids.len() == 1 {
1168 format!("<session-result session=\"{short}\">\n{text}\n</session-result>")
1169 } else {
1170 format!(
1171 "<session-result index=\"{}\" session=\"{short}\">\n{text}\n</session-result>",
1172 i + 1
1173 )
1174 };
1175 parts.push(block);
1176 }
1177
1178 if parts.is_empty() {
1179 return Err(ZigError::Execution(
1180 "pipe: no results available from the specified sessions".into(),
1181 ));
1182 }
1183 Ok(parts.join("\n\n"))
1184}
1185
1186fn resolve_plan_output_path(path_str: &str) -> std::path::PathBuf {
1191 let expanded = expand_path(path_str);
1192 let path = std::path::PathBuf::from(&expanded);
1193 if path.extension().is_some() {
1194 return path;
1195 }
1196 let stamp = chrono::Utc::now().format("%Y%m%d-%H%M%S");
1197 path.join(format!("plan-{stamp}.md"))
1198}
1199
1200#[allow(clippy::too_many_arguments)]
1205async fn execute_step(
1206 step: &Step,
1207 prompt: &str,
1208 workflow_name: &str,
1209 model_override: Option<&str>,
1210 prefix: Option<&str>,
1211 session: Option<&Arc<SessionWriter>>,
1212 rendered_system_prompt: Option<&str>,
1213 workflow_provider: Option<&str>,
1214 workflow_model: Option<&str>,
1215 extra_add_dirs: &[std::path::PathBuf],
1216) -> Result<String, ZigError> {
1217 let cfg = build_agent_config(
1218 step,
1219 prompt,
1220 workflow_name,
1221 model_override,
1222 rendered_system_prompt,
1223 workflow_provider,
1224 workflow_model,
1225 extra_add_dirs,
1226 );
1227 dispatch_agent(&cfg, &step.name, workflow_name, session, prefix).await
1228}
1229
1230#[allow(clippy::too_many_arguments)]
1235async fn run_step_attempts(
1236 step: &Step,
1237 prompt: &str,
1238 workflow_name: &str,
1239 prefix: Option<&str>,
1240 session: Option<&Arc<SessionWriter>>,
1241 rendered_system_prompt: Option<&str>,
1242 workflow_provider: Option<&str>,
1243 workflow_model: Option<&str>,
1244 extra_add_dirs: &[std::path::PathBuf],
1245) -> Result<String, ZigError> {
1246 let mut attempts = 0;
1247 let max_attempts = if step.on_failure.as_ref() == Some(&FailurePolicy::Retry) {
1248 step.max_retries.unwrap_or(1) + 1
1249 } else {
1250 1
1251 };
1252
1253 loop {
1254 attempts += 1;
1255 let model_override = if attempts > 1 {
1256 step.retry_model.as_deref()
1257 } else {
1258 None
1259 };
1260 match execute_step(
1261 step,
1262 prompt,
1263 workflow_name,
1264 model_override,
1265 prefix,
1266 session,
1267 rendered_system_prompt,
1268 workflow_provider,
1269 workflow_model,
1270 extra_add_dirs,
1271 )
1272 .await
1273 {
1274 Ok(output) => return Ok(output),
1275 Err(e) => {
1276 if let Some(w) = session {
1277 let _ = w.step_failed(&step.name, None, attempts, &e.to_string());
1278 }
1279 if attempts < max_attempts {
1280 eprintln!(
1281 " retry {}/{} for step '{}'",
1282 attempts,
1283 max_attempts - 1,
1284 step.name
1285 );
1286 continue;
1287 }
1288 return Err(e);
1289 }
1290 }
1291 }
1292}
1293
1294fn extract_saves(
1301 output: &str,
1302 saves: &HashMap<String, String>,
1303) -> Result<HashMap<String, String>, ZigError> {
1304 let mut extracted = HashMap::new();
1305
1306 for (var_name, selector) in saves {
1307 let value = if selector == "$" {
1308 output.trim().to_string()
1309 } else if let Some(path) = selector.strip_prefix("$.") {
1310 let json: serde_json::Value = serde_json::from_str(output.trim()).map_err(|e| {
1311 ZigError::Execution(format!(
1312 "saves selector '{selector}' requires JSON output, but got parse error: {e}"
1313 ))
1314 })?;
1315 json_path_lookup(&json, path)
1316 } else {
1317 output.trim().to_string()
1318 };
1319
1320 extracted.insert(var_name.clone(), value);
1321 }
1322
1323 Ok(extracted)
1324}
1325
1326fn partition_tier<'a>(tier: &[&'a Step]) -> (Vec<&'a Step>, HashMap<String, Vec<&'a Step>>) {
1331 let mut sequential = Vec::new();
1332 let mut race_groups: HashMap<String, Vec<&'a Step>> = HashMap::new();
1333
1334 for step in tier {
1335 if let Some(group) = &step.race_group {
1336 race_groups.entry(group.clone()).or_default().push(step);
1337 } else {
1338 sequential.push(*step);
1339 }
1340 }
1341
1342 (sequential, race_groups)
1343}
1344
1345#[allow(clippy::too_many_arguments)]
1351async fn execute_race_group(
1352 steps: &[&Step],
1353 prompts: &HashMap<String, String>,
1354 system_prompts: &HashMap<String, String>,
1355 workflow_name: &str,
1356 tier_index: usize,
1357 session: Option<&Arc<SessionWriter>>,
1358 workflow_provider: Option<&str>,
1359 workflow_model: Option<&str>,
1360 storage_dirs: &HashMap<String, Vec<std::path::PathBuf>>,
1361) -> Result<(String, String), ZigError> {
1362 if let Some(w) = session {
1363 for step in steps {
1364 let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
1365 let preview = prompts
1366 .get(&step.name)
1367 .map(|p| prompt_preview(p))
1368 .unwrap_or_default();
1369 let _ = w.step_started(
1370 &step.name,
1371 tier_index,
1372 &zag_session_id,
1373 zag_command_name(&step.command),
1374 step.model.as_deref(),
1375 &preview,
1376 );
1377 }
1378 }
1379
1380 let race_started = Instant::now();
1381 let mut set: JoinSet<(String, Result<String, ZigError>)> = JoinSet::new();
1382
1383 for step in steps {
1384 let prompt = prompts
1385 .get(&step.name)
1386 .ok_or_else(|| ZigError::Execution(format!("missing prompt for step '{}'", step.name)))?
1387 .clone();
1388 eprintln!(" racing step '{}'...", step.name);
1389 let rendered_sp = system_prompts.get(&step.name).cloned();
1390 let empty: Vec<std::path::PathBuf> = Vec::new();
1391 let extra_dirs = storage_dirs.get(&step.name).unwrap_or(&empty).clone();
1392 let step_clone: Step = (*step).clone();
1393 let wf_name = workflow_name.to_string();
1394 let wf_provider = workflow_provider.map(String::from);
1395 let wf_model = workflow_model.map(String::from);
1396 let session_clone = session.cloned();
1397 let name = step.name.clone();
1398 set.spawn(async move {
1399 let res = execute_step(
1400 &step_clone,
1401 &prompt,
1402 &wf_name,
1403 None,
1404 None,
1405 session_clone.as_ref(),
1406 rendered_sp.as_deref(),
1407 wf_provider.as_deref(),
1408 wf_model.as_deref(),
1409 &extra_dirs,
1410 )
1411 .await;
1412 (name, res)
1413 });
1414 }
1415
1416 while let Some(joined) = set.join_next().await {
1418 let (winner_name, result) = match joined {
1419 Ok(pair) => pair,
1420 Err(e) if e.is_cancelled() => continue,
1421 Err(e) => return Err(ZigError::Execution(format!("race task panicked: {e}"))),
1422 };
1423 match result {
1424 Ok(stdout) => {
1425 set.abort_all();
1427 while let Some(r) = set.join_next().await {
1428 if let Ok((name, _)) = r {
1429 eprintln!(" cancelling step '{name}' (race lost)");
1430 }
1431 }
1432 let elapsed = race_started.elapsed().as_millis() as u64;
1433 eprintln!(" race won by '{winner_name}'");
1434 if let Some(w) = session {
1435 let _ = w.step_completed(&winner_name, 0, elapsed, Vec::new());
1436 }
1437 return Ok((winner_name, stdout));
1438 }
1439 Err(e) => {
1440 if let Some(w) = session {
1441 let _ = w.step_failed(&winner_name, None, 1, &e.to_string());
1442 }
1443 continue;
1445 }
1446 }
1447 }
1448
1449 Err(ZigError::Execution(
1450 "all racers failed without a winner".into(),
1451 ))
1452}
1453
1454#[allow(clippy::too_many_arguments)]
1456async fn execute_sequential_step(
1457 step: &Step,
1458 vars: &mut HashMap<String, String>,
1459 user_prompt: Option<&str>,
1460 step_outputs: &mut HashMap<String, String>,
1461 workflow_name: &str,
1462 pending_next: &mut Option<String>,
1463 tier_index: usize,
1464 session: Option<&Arc<SessionWriter>>,
1465 roles: &HashMap<String, Role>,
1466 resources: &ResourceCollector<'_>,
1467 memory: &MemoryCollector,
1468 storage: &StorageManager,
1469 workflow_dir: &Path,
1470 workflow_provider: Option<&str>,
1471 workflow_model: Option<&str>,
1472) -> Result<(), ZigError> {
1473 if let Some(condition) = &step.condition {
1474 if !evaluate_condition(condition, vars)? {
1475 eprintln!(
1476 " skipping '{}' (condition not met: {condition})",
1477 step.name
1478 );
1479 if let Some(w) = session {
1480 let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
1481 }
1482 return Ok(());
1483 }
1484 }
1485
1486 eprintln!(" running step '{}'...", step.name);
1487
1488 let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
1489 let rendered_sp = resolve_role_system_prompt(
1490 step,
1491 roles,
1492 resources,
1493 memory,
1494 storage,
1495 vars,
1496 workflow_dir,
1497 workflow_name,
1498 )?;
1499 let storage_dirs = storage.add_dirs_for_step(step.storage.as_deref());
1500 if let Some(w) = session {
1501 let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
1502 let _ = w.step_started(
1503 &step.name,
1504 tier_index,
1505 &zag_session_id,
1506 zag_command_name(&step.command),
1507 step.model.as_deref(),
1508 &prompt_preview(&prompt),
1509 );
1510 }
1511 let started = Instant::now();
1512 let result = run_step_attempts(
1513 step,
1514 &prompt,
1515 workflow_name,
1516 None,
1517 session,
1518 rendered_sp.as_deref(),
1519 workflow_provider,
1520 workflow_model,
1521 &storage_dirs,
1522 )
1523 .await;
1524
1525 match result {
1526 Ok(output) => {
1527 let mut saved_keys: Vec<String> = Vec::new();
1528 if !step.saves.is_empty() {
1529 let saved = extract_saves(&output, &step.saves)?;
1530 for (k, v) in &saved {
1531 eprintln!(" saved {k} = {v}");
1532 saved_keys.push(k.clone());
1533 }
1534 vars.extend(saved);
1535 }
1536
1537 step_outputs.insert(step.name.clone(), output);
1538 eprintln!(" completed '{}'", step.name);
1539 if let Some(w) = session {
1540 let _ = w.step_completed(
1541 &step.name,
1542 0,
1543 started.elapsed().as_millis() as u64,
1544 saved_keys,
1545 );
1546 }
1547
1548 if step.next.is_some() {
1549 *pending_next = step.next.clone();
1550 }
1551 }
1552 Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1553 FailurePolicy::Fail => return Err(e),
1554 FailurePolicy::Continue => {
1555 eprintln!(" step '{}' failed (continuing): {e}", step.name);
1556 }
1557 FailurePolicy::Retry => {
1558 return Err(e);
1559 }
1560 },
1561 }
1562
1563 Ok(())
1564}
1565
1566#[allow(clippy::too_many_arguments)]
1575async fn execute_parallel_tier(
1576 steps: &[&Step],
1577 vars: &mut HashMap<String, String>,
1578 user_prompt: Option<&str>,
1579 step_outputs: &mut HashMap<String, String>,
1580 workflow_name: &str,
1581 pending_next: &mut Option<String>,
1582 tier_index: usize,
1583 session: Option<&Arc<SessionWriter>>,
1584 roles: &HashMap<String, Role>,
1585 resources: &ResourceCollector<'_>,
1586 memory: &MemoryCollector,
1587 storage: &StorageManager,
1588 workflow_dir: &Path,
1589 workflow_provider: Option<&str>,
1590 workflow_model: Option<&str>,
1591) -> Result<(), ZigError> {
1592 let mut active: Vec<&Step> = Vec::new();
1595 let mut prompts: HashMap<String, String> = HashMap::new();
1596 let mut rendered_sps: HashMap<String, String> = HashMap::new();
1597 let mut storage_dirs_map: HashMap<String, Vec<std::path::PathBuf>> = HashMap::new();
1598 for step in steps {
1599 if let Some(condition) = &step.condition {
1600 if !evaluate_condition(condition, vars)? {
1601 eprintln!(
1602 " skipping '{}' (condition not met: {condition})",
1603 step.name
1604 );
1605 if let Some(w) = session {
1606 let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
1607 }
1608 continue;
1609 }
1610 }
1611 let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
1612 prompts.insert(step.name.clone(), prompt);
1613 if let Some(sp) = resolve_role_system_prompt(
1614 step,
1615 roles,
1616 resources,
1617 memory,
1618 storage,
1619 vars,
1620 workflow_dir,
1621 workflow_name,
1622 )? {
1623 rendered_sps.insert(step.name.clone(), sp);
1624 }
1625 storage_dirs_map.insert(
1626 step.name.clone(),
1627 storage.add_dirs_for_step(step.storage.as_deref()),
1628 );
1629 active.push(*step);
1630 }
1631
1632 if active.is_empty() {
1633 return Ok(());
1634 }
1635
1636 eprintln!(" running {} steps in parallel...", active.len());
1637
1638 let mut start_times: HashMap<String, Instant> = HashMap::new();
1639 let mut set: JoinSet<(String, Result<String, ZigError>)> = JoinSet::new();
1640 for step in &active {
1641 let step_clone: Step = (*step).clone();
1642 let prompt = prompts.remove(&step.name).unwrap_or_default();
1643 let rendered_sp = rendered_sps.remove(&step.name);
1644 let workflow_name_owned = workflow_name.to_string();
1645 let name = step.name.clone();
1646 eprintln!(" starting '{name}'...");
1647 if let Some(w) = session {
1648 let zag_session_id = format!("zig-{workflow_name}-{name}");
1649 let _ = w.step_started(
1650 &name,
1651 tier_index,
1652 &zag_session_id,
1653 zag_command_name(&step.command),
1654 step.model.as_deref(),
1655 &prompt_preview(&prompt),
1656 );
1657 }
1658 start_times.insert(name.clone(), Instant::now());
1659 let session_clone = session.cloned();
1660 let wf_provider = workflow_provider.map(String::from);
1661 let wf_model = workflow_model.map(String::from);
1662 let storage_dirs = storage_dirs_map.remove(&step.name).unwrap_or_default();
1663 set.spawn(async move {
1664 let res = run_step_attempts(
1665 &step_clone,
1666 &prompt,
1667 &workflow_name_owned,
1668 Some(&name),
1669 session_clone.as_ref(),
1670 rendered_sp.as_deref(),
1671 wf_provider.as_deref(),
1672 wf_model.as_deref(),
1673 &storage_dirs,
1674 )
1675 .await;
1676 (name, res)
1677 });
1678 }
1679
1680 let mut results: HashMap<String, Result<String, ZigError>> = HashMap::new();
1681 while let Some(joined) = set.join_next().await {
1682 match joined {
1683 Ok((name, res)) => {
1684 results.insert(name, res);
1685 }
1686 Err(e) => {
1687 return Err(ZigError::Execution(format!(
1688 "parallel step task panicked: {e}"
1689 )));
1690 }
1691 }
1692 }
1693
1694 let mut errors: Vec<String> = Vec::new();
1696 for step in &active {
1697 let Some(res) = results.remove(&step.name) else {
1698 continue;
1699 };
1700 let elapsed = start_times
1701 .remove(&step.name)
1702 .map(|t| t.elapsed().as_millis() as u64)
1703 .unwrap_or(0);
1704 match res {
1705 Ok(output) => {
1706 let mut saved_keys: Vec<String> = Vec::new();
1707 if !step.saves.is_empty() {
1708 let saved = extract_saves(&output, &step.saves)?;
1709 for (k, v) in &saved {
1710 eprintln!(" saved {k} = {v}");
1711 saved_keys.push(k.clone());
1712 }
1713 vars.extend(saved);
1714 }
1715 step_outputs.insert(step.name.clone(), output);
1716 eprintln!(" completed '{}'", step.name);
1717 if let Some(w) = session {
1718 let _ = w.step_completed(&step.name, 0, elapsed, saved_keys);
1719 }
1720 if step.next.is_some() && pending_next.is_none() {
1721 *pending_next = step.next.clone();
1722 }
1723 }
1724 Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1725 FailurePolicy::Continue => {
1726 eprintln!(" step '{}' failed (continuing): {e}", step.name);
1727 }
1728 FailurePolicy::Fail | FailurePolicy::Retry => {
1729 errors.push(format!("'{}': {e}", step.name));
1730 }
1731 },
1732 }
1733 }
1734
1735 if !errors.is_empty() {
1736 return Err(ZigError::Execution(format!(
1737 "parallel step(s) failed: {}",
1738 errors.join("; ")
1739 )));
1740 }
1741
1742 Ok(())
1743}
1744
1745fn init_vars(workflow: &Workflow) -> HashMap<String, String> {
1748 let mut vars = HashMap::new();
1749 for (name, var) in &workflow.vars {
1750 let value = match &var.default {
1751 Some(toml::Value::String(s)) => s.clone(),
1752 Some(toml::Value::Integer(n)) => n.to_string(),
1753 Some(toml::Value::Float(f)) => f.to_string(),
1754 Some(toml::Value::Boolean(b)) => b.to_string(),
1755 Some(other) => other.to_string(),
1756 None => String::new(),
1757 };
1758 vars.insert(name.clone(), value);
1759 }
1760 vars
1761}
1762
1763#[allow(clippy::too_many_arguments)]
1765async fn execute(
1766 workflow: &Workflow,
1767 workflow_path: &std::path::Path,
1768 user_prompt: Option<&str>,
1769 workflow_dir: &Path,
1770 disable_resources: bool,
1771 disable_memory: bool,
1772 disable_storage: bool,
1773 dry_run: bool,
1774 dry_run_format: DryRunFormat,
1775) -> Result<(), ZigError> {
1776 let mut vars = init_vars(workflow);
1777
1778 let resource_collector = ResourceCollector::from_env(
1779 &workflow.workflow.name,
1780 &workflow.workflow.resources,
1781 workflow_dir,
1782 disable_resources,
1783 );
1784
1785 let config = ZigConfig::load();
1786 let workflow_memory_mode = MemoryMode::from_str_opt(workflow.workflow.memory.as_deref());
1787 let memory_collector = MemoryCollector::from_env(
1788 &workflow.workflow.name,
1789 workflow_memory_mode,
1790 &config,
1791 disable_memory,
1792 );
1793
1794 let storage_manager = if disable_storage || workflow.storage.is_empty() {
1802 StorageManager::empty()
1803 } else if dry_run {
1804 let backend = FilesystemBackend::from_cwd()?;
1805 StorageManager::build_dry(&workflow.storage, backend)
1806 } else {
1807 let backend = FilesystemBackend::from_cwd()?;
1808 StorageManager::build(&workflow.storage, backend)?
1809 };
1810
1811 load_file_defaults(&mut vars, &workflow.vars, workflow_dir)?;
1813
1814 let prompt_var = workflow
1816 .vars
1817 .iter()
1818 .find(|(_, v)| v.from.as_deref() == Some("prompt"))
1819 .map(|(name, _)| name.clone());
1820
1821 if let Some(ref var_name) = prompt_var {
1822 if let Some(prompt) = user_prompt {
1823 vars.insert(var_name.clone(), prompt.to_string());
1824 }
1825 }
1826
1827 if let Err(errors) = validate::validate_var_values(&vars, &workflow.vars) {
1829 let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
1830 return Err(ZigError::Validation(msgs.join("; ")));
1831 }
1832
1833 let effective_user_prompt = if prompt_var.is_some() {
1835 None
1836 } else {
1837 user_prompt
1838 };
1839
1840 let mut step_outputs: HashMap<String, String> = HashMap::new();
1841
1842 let wf_provider = workflow.workflow.provider.as_deref();
1843 let wf_model = workflow.workflow.model.as_deref();
1844
1845 let tiers = topological_sort(&workflow.steps)?;
1846
1847 if dry_run {
1848 let ctx = DryRunContext {
1849 workflow,
1850 workflow_path,
1851 workflow_dir,
1852 vars: &vars,
1853 user_prompt: effective_user_prompt,
1854 roles: &workflow.roles,
1855 resources: &resource_collector,
1856 memory: &memory_collector,
1857 storage: &storage_manager,
1858 wf_provider,
1859 wf_model,
1860 disable_resources,
1861 disable_memory,
1862 disable_storage,
1863 };
1864 return crate::dry_run::print_plan(&ctx, &tiers, dry_run_format);
1865 }
1866
1867 eprintln!(
1868 "running workflow '{}' ({} steps in {} tiers)",
1869 workflow.workflow.name,
1870 workflow.steps.len(),
1871 tiers.len()
1872 );
1873
1874 let coordinator = match SessionWriter::create(
1879 &workflow.workflow.name,
1880 &workflow_path.to_string_lossy(),
1881 user_prompt,
1882 tiers.len(),
1883 ) {
1884 Ok(writer) => {
1885 eprintln!("zig session: {}", writer.session_id());
1886 Some(SessionCoordinator::start(writer))
1887 }
1888 Err(e) => {
1889 eprintln!("warning: failed to open zig session log: {e}");
1890 None
1891 }
1892 };
1893 let session_writer: Option<Arc<SessionWriter>> = coordinator.as_ref().map(|c| c.writer());
1894 let session_ref = session_writer.as_ref();
1895
1896 let mut iteration = 0;
1897 let mut pending_next: Option<String> = None;
1898
1899 loop {
1900 let tiers_to_run = if let Some(ref next_step) = pending_next {
1901 let remaining: Vec<Vec<&Step>> = tiers
1903 .iter()
1904 .map(|tier| {
1905 tier.iter()
1906 .filter(|s| s.name == *next_step)
1907 .copied()
1908 .collect::<Vec<_>>()
1909 })
1910 .filter(|tier| !tier.is_empty())
1911 .collect();
1912 pending_next = None;
1913 remaining
1914 } else if iteration == 0 {
1915 tiers.clone()
1916 } else {
1917 break;
1918 };
1919
1920 for (tier_index, tier) in tiers_to_run.iter().enumerate() {
1921 let (non_race, race_groups) = partition_tier(tier);
1922
1923 if let Some(w) = session_ref {
1924 let names: Vec<String> = tier.iter().map(|s| s.name.clone()).collect();
1925 let _ = w.tier_started(tier_index, names);
1926 }
1927
1928 if non_race.len() <= 1 {
1932 for step in &non_race {
1933 execute_sequential_step(
1934 step,
1935 &mut vars,
1936 effective_user_prompt,
1937 &mut step_outputs,
1938 &workflow.workflow.name,
1939 &mut pending_next,
1940 tier_index,
1941 session_ref,
1942 &workflow.roles,
1943 &resource_collector,
1944 &memory_collector,
1945 &storage_manager,
1946 workflow_dir,
1947 wf_provider,
1948 wf_model,
1949 )
1950 .await?;
1951 }
1952 } else {
1953 execute_parallel_tier(
1954 &non_race,
1955 &mut vars,
1956 effective_user_prompt,
1957 &mut step_outputs,
1958 &workflow.workflow.name,
1959 &mut pending_next,
1960 tier_index,
1961 session_ref,
1962 &workflow.roles,
1963 &resource_collector,
1964 &memory_collector,
1965 &storage_manager,
1966 workflow_dir,
1967 wf_provider,
1968 wf_model,
1969 )
1970 .await?;
1971 }
1972
1973 for (group_name, race_steps) in &race_groups {
1975 eprintln!(" starting race group '{group_name}'...");
1976
1977 let mut prompts = HashMap::new();
1979 let mut race_sps: HashMap<String, String> = HashMap::new();
1980 let mut race_storage_dirs: HashMap<String, Vec<std::path::PathBuf>> =
1981 HashMap::new();
1982 let mut active_steps: Vec<&Step> = Vec::new();
1983 for step in race_steps {
1984 if let Some(condition) = &step.condition {
1985 if !evaluate_condition(condition, &vars)? {
1986 eprintln!(
1987 " skipping '{}' (condition not met: {condition})",
1988 step.name
1989 );
1990 continue;
1991 }
1992 }
1993 let prompt =
1994 render_step_prompt(step, &vars, effective_user_prompt, &step_outputs);
1995 prompts.insert(step.name.clone(), prompt);
1996 if let Some(sp) = resolve_role_system_prompt(
1997 step,
1998 &workflow.roles,
1999 &resource_collector,
2000 &memory_collector,
2001 &storage_manager,
2002 &vars,
2003 workflow_dir,
2004 &workflow.workflow.name,
2005 )? {
2006 race_sps.insert(step.name.clone(), sp);
2007 }
2008 race_storage_dirs.insert(
2009 step.name.clone(),
2010 storage_manager.add_dirs_for_step(step.storage.as_deref()),
2011 );
2012 active_steps.push(step);
2013 }
2014
2015 if active_steps.is_empty() {
2016 continue;
2017 }
2018
2019 match execute_race_group(
2020 &active_steps,
2021 &prompts,
2022 &race_sps,
2023 &workflow.workflow.name,
2024 tier_index,
2025 session_ref,
2026 wf_provider,
2027 wf_model,
2028 &race_storage_dirs,
2029 )
2030 .await
2031 {
2032 Ok((winner_name, output)) => {
2033 if let Some(winner) = active_steps.iter().find(|s| s.name == winner_name) {
2035 if !winner.saves.is_empty() {
2036 let saved = extract_saves(&output, &winner.saves)?;
2037 for (k, v) in &saved {
2038 eprintln!(" saved {k} = {v}");
2039 }
2040 vars.extend(saved);
2041 }
2042 if winner.next.is_some() {
2043 pending_next = winner.next.clone();
2044 }
2045 }
2046 step_outputs.insert(winner_name.clone(), output);
2047 eprintln!(
2048 " completed race group '{group_name}' (winner: '{winner_name}')"
2049 );
2050 }
2051 Err(e) => return Err(e),
2052 }
2053 }
2054 }
2055
2056 iteration += 1;
2057 if pending_next.is_none() || iteration >= MAX_LOOP_ITERATIONS {
2058 if iteration >= MAX_LOOP_ITERATIONS {
2059 eprintln!("warning: reached maximum loop iterations ({MAX_LOOP_ITERATIONS})");
2060 }
2061 break;
2062 }
2063 }
2064
2065 eprintln!("workflow '{}' completed", workflow.workflow.name);
2066 if let Some(c) = coordinator {
2067 let _ = c.finish(SessionStatus::Success);
2068 }
2069 Ok(())
2070}
2071
2072fn zag_command_name(cmd: &Option<StepCommand>) -> &'static str {
2075 match cmd {
2076 None => "run",
2077 Some(StepCommand::Review) => "review",
2078 Some(StepCommand::Plan) => "plan",
2079 Some(StepCommand::Pipe) => "pipe",
2080 Some(StepCommand::Collect) => "collect",
2081 Some(StepCommand::Summary) => "summary",
2082 }
2083}
2084
2085fn prompt_preview(prompt: &str) -> String {
2087 const MAX: usize = 200;
2088 let collapsed: String = prompt
2089 .chars()
2090 .map(|c| if c == '\n' { ' ' } else { c })
2091 .collect();
2092 if collapsed.chars().count() <= MAX {
2093 collapsed
2094 } else {
2095 let truncated: String = collapsed.chars().take(MAX).collect();
2096 format!("{truncated}…")
2097 }
2098}
2099
2100#[cfg(test)]
2101#[path = "run_tests.rs"]
2102mod tests;