1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use serde::Serialize;
7use tokio::task::JoinSet;
8use zag_agent::builder::AgentBuilder;
9use zag_agent::{plan as agent_plan, review as agent_review};
10use zag_orch::collect as orch_collect;
11use zag_orch::summary as orch_summary;
12
13use crate::config::ZigConfig;
14use crate::dry_run::{DryRunContext, DryRunFormat};
15use crate::error::ZigError;
16use crate::memory::{MemoryCollector, render_memory_block};
17use crate::paths::expand_path;
18use crate::resources::{ResourceCollector, render_system_block};
19use crate::session::{OutputStream, SessionCoordinator, SessionStatus, SessionWriter};
20use crate::storage::{FilesystemBackend, StorageManager};
21use crate::workflow::model::{FailurePolicy, MemoryMode, Role, Step, StepCommand, Workflow};
22use crate::workflow::{parser, validate};
23
24const MAX_LOOP_ITERATIONS: usize = 100;
26
27#[allow(clippy::too_many_arguments)]
49pub async fn run_workflow(
50 workflow_path: &str,
51 user_prompt: Option<&str>,
52 disable_resources: bool,
53 disable_memory: bool,
54 disable_storage: bool,
55 dry_run: bool,
56 dry_run_format: DryRunFormat,
57) -> Result<(), ZigError> {
58 let path = resolve_workflow_path(workflow_path)?;
59 let (workflow, source) = parser::parse_workflow(&path)?;
60
61 if let Err(errors) = validate::validate(&workflow) {
62 let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
63 return Err(ZigError::Validation(msgs.join("; ")));
64 }
65
66 execute(
67 &workflow,
68 &path,
69 user_prompt,
70 source.dir(),
71 disable_resources,
72 disable_memory,
73 disable_storage,
74 dry_run,
75 dry_run_format,
76 )
77 .await
78}
79
80pub fn resolve_workflow_path(workflow: &str) -> Result<PathBuf, ZigError> {
93 let mut candidates = vec![
94 PathBuf::from(workflow),
95 PathBuf::from(format!("{workflow}.zwf")),
96 PathBuf::from(format!("{workflow}.zwfz")),
97 ];
98
99 if let Some(local_dir) = crate::paths::cwd_workflows_dir() {
100 candidates.push(local_dir.join(workflow));
101 candidates.push(local_dir.join(format!("{workflow}.zwf")));
102 candidates.push(local_dir.join(format!("{workflow}.zwfz")));
103 }
104
105 if let Some(global_dir) = crate::paths::global_workflows_dir() {
106 candidates.push(global_dir.join(workflow));
107 candidates.push(global_dir.join(format!("{workflow}.zwf")));
108 candidates.push(global_dir.join(format!("{workflow}.zwfz")));
109 }
110
111 for candidate in &candidates {
112 if candidate.exists() {
113 return Ok(candidate.clone());
114 }
115 }
116
117 Err(ZigError::Io(format!(
118 "workflow not found: '{workflow}' (tried: {})",
119 candidates
120 .iter()
121 .map(|p| p.display().to_string())
122 .collect::<Vec<_>>()
123 .join(", ")
124 )))
125}
126
127pub(crate) fn topological_sort(steps: &[Step]) -> Result<Vec<Vec<&Step>>, ZigError> {
133 let step_index: HashMap<&str, usize> = steps
134 .iter()
135 .enumerate()
136 .map(|(i, s)| (s.name.as_str(), i))
137 .collect();
138
139 let mut in_degree = vec![0usize; steps.len()];
140 for (i, step) in steps.iter().enumerate() {
141 for dep in &step.depends_on {
142 if step_index.contains_key(dep.as_str()) {
143 in_degree[i] += 1;
144 }
145 }
146 }
147
148 let mut tiers = Vec::new();
149 let mut remaining = in_degree.clone();
150 let mut completed: Vec<bool> = vec![false; steps.len()];
151
152 loop {
153 let tier: Vec<usize> = (0..steps.len())
154 .filter(|&i| !completed[i] && remaining[i] == 0)
155 .collect();
156
157 if tier.is_empty() {
158 break;
159 }
160
161 for &i in &tier {
162 completed[i] = true;
163 }
164
165 for &i in &tier {
167 for (j, step) in steps.iter().enumerate() {
168 if !completed[j] && step.depends_on.contains(&steps[i].name) {
169 remaining[j] -= 1;
170 }
171 }
172 }
173
174 tiers.push(tier.iter().map(|&i| &steps[i]).collect());
175 }
176
177 let completed_count: usize = completed.iter().filter(|&&c| c).count();
178 if completed_count != steps.len() {
179 return Err(ZigError::Execution(
180 "could not resolve all steps — possible undetected cycle".into(),
181 ));
182 }
183
184 Ok(tiers)
185}
186
187pub(crate) fn substitute_vars(template: &str, vars: &HashMap<String, String>) -> String {
193 let mut result = String::with_capacity(template.len());
194 let mut rest = template;
195
196 while let Some(start) = rest.find("${") {
197 result.push_str(&rest[..start]);
198 let after_start = &rest[start + 2..];
199
200 if let Some(end) = after_start.find('}') {
201 let var_expr = &after_start[..end];
202 let mut parts = var_expr.splitn(2, '.');
203 let root = parts.next().unwrap_or(var_expr);
204
205 if let Some(value) = vars.get(root) {
206 if let Some(path) = parts.next() {
207 if let Ok(json) = serde_json::from_str::<serde_json::Value>(value) {
209 let resolved = json_path_lookup(&json, path);
210 result.push_str(&resolved);
211 } else {
212 result.push_str(value);
213 }
214 } else {
215 result.push_str(value);
216 }
217 } else {
218 result.push_str(&rest[start..start + 2 + end + 1]);
220 }
221
222 rest = &after_start[end + 1..];
223 } else {
224 result.push_str(&rest[start..]);
225 rest = "";
226 }
227 }
228
229 result.push_str(rest);
230 result
231}
232
233fn json_path_lookup(value: &serde_json::Value, path: &str) -> String {
235 let mut current = value;
236 for key in path.split('.') {
237 match current.get(key) {
238 Some(v) => current = v,
239 None => return format!("${{?.{path}}}"),
240 }
241 }
242 match current {
243 serde_json::Value::String(s) => s.clone(),
244 other => other.to_string(),
245 }
246}
247
248#[allow(clippy::too_many_arguments)]
265pub(crate) fn resolve_role_system_prompt(
266 step: &Step,
267 roles: &HashMap<String, Role>,
268 resources: &ResourceCollector<'_>,
269 memory: &MemoryCollector,
270 storage: &StorageManager,
271 vars: &HashMap<String, String>,
272 workflow_dir: &Path,
273 workflow_name: &str,
274) -> Result<Option<String>, ZigError> {
275 let base_prompt: Option<String> = if let Some(ref sp) = step.system_prompt {
277 Some(substitute_vars(sp, vars))
278 } else if let Some(ref role_ref) = step.role {
279 let resolved_name = substitute_vars(role_ref, vars);
280 let role = roles.get(&resolved_name).ok_or_else(|| {
281 ZigError::Execution(format!(
282 "step '{}' references role '{}' which does not exist",
283 step.name, resolved_name
284 ))
285 })?;
286
287 let raw_prompt = if let Some(ref file_path) = role.system_prompt_file {
288 let full_path = workflow_dir.join(expand_path(file_path));
289 Some(std::fs::read_to_string(&full_path).map_err(|e| {
290 ZigError::Execution(format!(
291 "failed to read system_prompt_file '{}' for role '{}': {e}",
292 full_path.display(),
293 resolved_name
294 ))
295 })?)
296 } else {
297 role.system_prompt.clone()
298 };
299
300 raw_prompt.map(|p| substitute_vars(&p, vars))
301 } else {
302 None
303 };
304
305 let set = resources.collect_for_step(&step.resources)?;
307 let resource_block = render_system_block(&set);
308
309 let memory_entries = memory.collect_for_step(step.memory.as_deref())?;
311 let memory_block = render_memory_block(&memory_entries, workflow_name, Some(&step.name));
312
313 let storage_block = match storage.render_block(step.storage.as_deref())? {
315 Some(mut s) => {
316 s.push('\n');
317 s
318 }
319 None => String::new(),
320 };
321
322 let prefix = format!("{resource_block}{memory_block}{storage_block}");
323
324 match (prefix.is_empty(), base_prompt) {
325 (true, None) => Ok(None),
326 (true, Some(p)) => Ok(Some(p)),
327 (false, None) => Ok(Some(prefix.trim_end().to_string())),
328 (false, Some(p)) => Ok(Some(format!("{prefix}{p}"))),
329 }
330}
331
332fn load_file_defaults(
337 vars: &mut HashMap<String, String>,
338 declarations: &HashMap<String, crate::workflow::model::Variable>,
339 workflow_dir: &Path,
340) -> Result<(), ZigError> {
341 for (name, decl) in declarations {
342 if decl.default.is_none() {
343 if let Some(ref file_path) = decl.default_file {
344 let full_path = workflow_dir.join(expand_path(file_path));
345 let content = std::fs::read_to_string(&full_path).map_err(|e| {
346 ZigError::Execution(format!(
347 "failed to read default_file '{}' for variable '{name}': {e}",
348 full_path.display()
349 ))
350 })?;
351 vars.insert(name.clone(), content);
352 }
353 }
354 }
355 Ok(())
356}
357
358pub(crate) fn evaluate_condition(
365 condition: &str,
366 vars: &HashMap<String, String>,
367) -> Result<bool, ZigError> {
368 let condition = condition.trim();
369
370 let operators = ["<=", ">=", "!=", "==", "<", ">"];
372 for op in &operators {
373 if let Some(pos) = condition.find(op) {
374 let lhs = resolve_operand(condition[..pos].trim(), vars);
375 let rhs = resolve_operand(condition[pos + op.len()..].trim(), vars);
376 return Ok(compare(&lhs, &rhs, op));
377 }
378 }
379
380 let value = vars.get(condition).map(|s| s.as_str()).unwrap_or("");
382 Ok(is_truthy(value))
383}
384
385fn resolve_operand(token: &str, vars: &HashMap<String, String>) -> String {
390 if (token.starts_with('"') && token.ends_with('"'))
392 || (token.starts_with('\'') && token.ends_with('\''))
393 {
394 return token[1..token.len() - 1].to_string();
395 }
396 if let Some(val) = vars.get(token) {
398 return val.clone();
399 }
400 token.to_string()
402}
403
404fn compare(lhs: &str, rhs: &str, op: &str) -> bool {
407 if let (Ok(l), Ok(r)) = (lhs.parse::<f64>(), rhs.parse::<f64>()) {
408 return match op {
409 "==" => (l - r).abs() < f64::EPSILON,
410 "!=" => (l - r).abs() >= f64::EPSILON,
411 "<" => l < r,
412 ">" => l > r,
413 "<=" => l <= r,
414 ">=" => l >= r,
415 _ => false,
416 };
417 }
418 match op {
419 "==" => lhs == rhs,
420 "!=" => lhs != rhs,
421 "<" => lhs < rhs,
422 ">" => lhs > rhs,
423 "<=" => lhs <= rhs,
424 ">=" => lhs >= rhs,
425 _ => false,
426 }
427}
428
429fn is_truthy(value: &str) -> bool {
431 !value.is_empty() && value != "false" && value != "0"
432}
433
434pub(crate) fn render_step_prompt(
437 step: &Step,
438 vars: &HashMap<String, String>,
439 user_prompt: Option<&str>,
440 dependency_outputs: &HashMap<String, String>,
441) -> String {
442 let mut prompt = String::new();
443
444 if let Some(ctx) = user_prompt {
446 prompt.push_str(&format!("User context: {ctx}\n\n"));
447 }
448
449 if step.inject_context {
451 for dep in &step.depends_on {
452 if let Some(output) = dependency_outputs.get(dep) {
453 prompt.push_str(&format!("--- Output from '{dep}' ---\n{output}\n\n"));
454 }
455 }
456 }
457
458 prompt.push_str(&substitute_vars(&step.prompt, vars));
460
461 prompt
462}
463
464#[derive(Debug, Clone, Serialize, Default)]
473pub struct AgentConfig {
474 pub command: String,
477
478 pub provider: Option<String>,
480 pub model: Option<String>,
481 pub system_prompt: Option<String>,
482 pub root: Option<String>,
483 pub add_dirs: Vec<String>,
484 #[serde(serialize_with = "serialize_env_pairs")]
485 pub env: Vec<(String, String)>,
486 pub files: Vec<String>,
487 pub auto_approve: bool,
488 #[serde(skip_serializing_if = "Option::is_none")]
490 pub worktree: Option<Option<String>>,
491 #[serde(skip_serializing_if = "Option::is_none")]
492 pub sandbox: Option<String>,
493
494 pub json_mode: bool,
496 #[serde(skip_serializing_if = "Option::is_none")]
497 pub json_schema: Option<String>,
498 #[serde(skip_serializing_if = "Option::is_none")]
499 pub output_format: Option<String>,
500
501 #[serde(skip_serializing_if = "Option::is_none")]
503 pub max_turns: Option<u32>,
504 #[serde(skip_serializing_if = "Option::is_none")]
505 pub timeout: Option<String>,
506 #[serde(skip_serializing_if = "Option::is_none")]
507 pub mcp_config: Option<String>,
508
509 pub session_name: String,
511 #[serde(skip_serializing_if = "Option::is_none")]
512 pub description: Option<String>,
513 pub tags: Vec<String>,
514
515 pub prompt: String,
517
518 pub accepts_agent_args: bool,
521
522 #[serde(skip_serializing_if = "Option::is_none")]
524 pub command_params: Option<CommandParams>,
525
526 pub interactive: bool,
529}
530
531fn serialize_env_pairs<S>(pairs: &[(String, String)], s: S) -> Result<S::Ok, S::Error>
532where
533 S: serde::Serializer,
534{
535 use serde::ser::SerializeSeq;
536 let mut seq = s.serialize_seq(Some(pairs.len()))?;
537 for (k, v) in pairs {
538 seq.serialize_element(&format!("{k}={v}"))?;
539 }
540 seq.end()
541}
542
543#[derive(Debug, Clone, Serialize)]
545#[serde(tag = "kind", rename_all = "snake_case")]
546pub enum CommandParams {
547 Review {
548 uncommitted: bool,
549 base: Option<String>,
550 commit: Option<String>,
551 title: Option<String>,
552 },
553 Plan {
554 output: Option<String>,
555 instructions: Option<String>,
556 },
557 Pipe {
558 session_ids: Vec<String>,
559 },
560 Collect {
561 session_ids: Vec<String>,
562 },
563 Summary {
564 session_ids: Vec<String>,
565 },
566}
567
568#[allow(clippy::too_many_arguments)]
572pub(crate) fn build_agent_config(
573 step: &Step,
574 prompt: &str,
575 workflow_name: &str,
576 model_override: Option<&str>,
577 rendered_system_prompt: Option<&str>,
578 workflow_provider: Option<&str>,
579 workflow_model: Option<&str>,
580 extra_add_dirs: &[std::path::PathBuf],
581) -> AgentConfig {
582 let session_name = |dep: &str| format!("zig-{workflow_name}-{dep}");
583
584 let (command_label, accepts_agent_args, command_params) = match &step.command {
585 None => ("run".to_string(), true, None),
586 Some(StepCommand::Review) => (
587 "review".to_string(),
588 true,
589 Some(CommandParams::Review {
590 uncommitted: step.uncommitted,
591 base: step.base.clone(),
592 commit: step.commit.clone(),
593 title: step.title.clone(),
594 }),
595 ),
596 Some(StepCommand::Plan) => (
597 "plan".to_string(),
598 true,
599 Some(CommandParams::Plan {
600 output: step.plan_output.as_deref().map(expand_path),
601 instructions: step.instructions.clone(),
602 }),
603 ),
604 Some(StepCommand::Pipe) => {
605 let session_ids: Vec<String> =
606 step.depends_on.iter().map(|d| session_name(d)).collect();
607 (
608 "pipe".to_string(),
609 true,
610 Some(CommandParams::Pipe { session_ids }),
611 )
612 }
613 Some(StepCommand::Collect) => {
614 let session_ids: Vec<String> =
615 step.depends_on.iter().map(|d| session_name(d)).collect();
616 (
617 "collect".to_string(),
618 false,
619 Some(CommandParams::Collect { session_ids }),
620 )
621 }
622 Some(StepCommand::Summary) => {
623 let session_ids: Vec<String> =
624 step.depends_on.iter().map(|d| session_name(d)).collect();
625 (
626 "summary".to_string(),
627 false,
628 Some(CommandParams::Summary { session_ids }),
629 )
630 }
631 };
632
633 let mut cfg = AgentConfig {
636 command: command_label,
637 session_name: session_name(&step.name),
638 description: if step.description.is_empty() {
639 None
640 } else {
641 Some(step.description.clone())
642 },
643 tags: {
644 let mut t = vec!["zig-workflow".to_string()];
645 t.extend(step.tags.iter().cloned());
646 t
647 },
648 timeout: step.timeout.clone(),
649 prompt: prompt.to_string(),
650 accepts_agent_args,
651 command_params,
652 interactive: step.interactive,
653 ..Default::default()
654 };
655
656 if !accepts_agent_args {
657 return cfg;
658 }
659
660 cfg.provider = step
661 .provider
662 .clone()
663 .or_else(|| workflow_provider.map(String::from));
664 cfg.model = model_override
665 .map(String::from)
666 .or_else(|| step.model.clone())
667 .or_else(|| workflow_model.map(String::from));
668 cfg.system_prompt = rendered_system_prompt.map(String::from);
669 cfg.max_turns = step.max_turns;
670
671 if let Some(output) = &step.output {
674 cfg.output_format = Some(output.clone());
675 } else if step.json {
676 cfg.json_mode = true;
677 }
678 cfg.json_schema = step.json_schema.clone();
679 cfg.mcp_config = step.mcp_config.as_deref().map(expand_path);
680
681 cfg.auto_approve = step.auto_approve;
682 cfg.root = step.root.as_deref().map(expand_path);
683 cfg.add_dirs = step
684 .add_dirs
685 .iter()
686 .map(|d| expand_path(d))
687 .chain(extra_add_dirs.iter().map(|p| p.display().to_string()))
688 .collect();
689 cfg.env = step
690 .env
691 .iter()
692 .map(|(k, v)| (k.clone(), v.clone()))
693 .collect();
694 cfg.files = step.files.iter().map(|f| expand_path(f)).collect();
695
696 if step.worktree {
698 cfg.worktree = Some(None);
699 }
700 cfg.sandbox = step.sandbox.clone();
701
702 cfg
703}
704
705pub(crate) fn apply_agent_config(mut builder: AgentBuilder, cfg: &AgentConfig) -> AgentBuilder {
708 if let Some(ref p) = cfg.provider {
709 builder = builder.provider(p);
710 }
711 if let Some(ref m) = cfg.model {
712 builder = builder.model(m);
713 }
714 if let Some(ref sp) = cfg.system_prompt {
715 builder = builder.system_prompt(sp);
716 }
717 if let Some(ref r) = cfg.root {
718 builder = builder.root(r);
719 }
720 if cfg.auto_approve {
721 builder = builder.auto_approve(true);
722 }
723 for dir in &cfg.add_dirs {
724 builder = builder.add_dir(dir);
725 }
726 for (k, v) in &cfg.env {
727 builder = builder.env(k, v);
728 }
729 for f in &cfg.files {
730 builder = builder.file(f);
731 }
732 if let Some(ref wt) = cfg.worktree {
733 builder = builder.worktree(wt.as_deref());
734 }
735 if let Some(ref sb) = cfg.sandbox {
736 builder = builder.sandbox(Some(sb));
737 }
738 if let Some(ref fmt) = cfg.output_format {
739 builder = builder.output_format(fmt);
740 }
741 if cfg.json_mode {
742 if let Some(ref schema) = cfg.json_schema {
743 if let Ok(v) = serde_json::from_str::<serde_json::Value>(schema) {
744 builder = builder.json_schema(v);
745 } else {
746 builder = builder.json();
747 }
748 } else {
749 builder = builder.json();
750 }
751 } else if let Some(ref schema) = cfg.json_schema {
752 if let Ok(v) = serde_json::from_str::<serde_json::Value>(schema) {
753 builder = builder.json_schema(v);
754 }
755 }
756 if let Some(turns) = cfg.max_turns {
757 builder = builder.max_turns(turns);
758 }
759 if let Some(ref t) = cfg.timeout {
760 if let Some(dur) = parse_timeout_string(t) {
761 builder = builder.timeout(dur);
762 }
763 }
764 if let Some(ref mcp) = cfg.mcp_config {
765 builder = builder.mcp_config(mcp);
766 }
767 builder = builder.name(&cfg.session_name);
768 if let Some(ref d) = cfg.description {
769 builder = builder.description(d);
770 }
771 for tag in &cfg.tags {
772 builder = builder.tag(tag);
773 }
774 builder
775}
776
777fn parse_timeout_string(s: &str) -> Option<Duration> {
780 let s = s.trim();
784 if s.is_empty() {
785 return None;
786 }
787 if let Ok(secs) = s.parse::<u64>() {
788 return Some(Duration::from_secs(secs));
789 }
790 let mut total = Duration::ZERO;
791 let mut current = String::new();
792 let mut chars = s.chars().peekable();
793 while let Some(c) = chars.next() {
794 if c.is_ascii_digit() || c == '.' {
795 current.push(c);
796 continue;
797 }
798 let mut unit = String::from(c);
799 if c == 'm' && chars.peek() == Some(&'s') {
800 unit.push(chars.next().unwrap());
801 }
802 let value: f64 = current.parse().ok()?;
803 current.clear();
804 let piece = match unit.as_str() {
805 "ms" => Duration::from_millis(value as u64),
806 "s" => Duration::from_secs_f64(value),
807 "m" => Duration::from_secs_f64(value * 60.0),
808 "h" => Duration::from_secs_f64(value * 3600.0),
809 _ => return None,
810 };
811 total += piece;
812 }
813 if !current.is_empty() {
814 return None;
815 }
816 Some(total)
817}
818
819async fn dispatch_agent(
837 cfg: &AgentConfig,
838 step_name: &str,
839 session: Option<&Arc<SessionWriter>>,
840 prefix: Option<&str>,
841) -> Result<String, ZigError> {
842 match cfg.command.as_str() {
843 "run" => {
844 if cfg.interactive {
845 let builder = apply_agent_config(AgentBuilder::new(), cfg);
849 builder.run(Some(&cfg.prompt)).await.map_err(|e| {
850 ZigError::Zag(format!("agent run failed for step '{step_name}': {e}"))
851 })?;
852 Ok(String::new())
853 } else {
854 let mut builder = apply_agent_config(AgentBuilder::new(), cfg);
855 builder = install_live_streaming(builder, step_name, session, prefix);
856 let output = builder.exec(&cfg.prompt).await.map_err(|e| {
857 ZigError::Zag(format!("agent exec failed for step '{step_name}': {e}"))
858 })?;
859 Ok(output.result.unwrap_or_default())
860 }
861 }
862 "review" => {
863 let provider = cfg.provider.clone().unwrap_or_else(|| "claude".to_string());
864 let (uncommitted, base, commit, title) = match &cfg.command_params {
865 Some(CommandParams::Review {
866 uncommitted,
867 base,
868 commit,
869 title,
870 }) => (*uncommitted, base.clone(), commit.clone(), title.clone()),
871 _ => (false, None, None, None),
872 };
873
874 if provider == "codex" {
878 let params = agent_review::ReviewParams {
879 provider,
880 uncommitted,
881 base,
882 commit,
883 title,
884 prompt: if cfg.prompt.is_empty() {
885 None
886 } else {
887 Some(cfg.prompt.clone())
888 },
889 system_prompt: cfg.system_prompt.clone(),
890 model: cfg.model.clone(),
891 root: cfg.root.clone(),
892 auto_approve: cfg.auto_approve,
893 add_dirs: cfg.add_dirs.clone(),
894 progress: Box::new(zag_agent::progress::SilentProgress),
895 };
896 let output = agent_review::run_review(params).await.map_err(|e| {
897 ZigError::Zag(format!("review failed for step '{step_name}': {e}"))
898 })?;
899 return Ok(output.and_then(|o| o.result).unwrap_or_default());
900 }
901
902 let diff = agent_review::gather_diff(
905 uncommitted,
906 base.as_deref(),
907 commit.as_deref(),
908 cfg.root.as_deref(),
909 )
910 .map_err(|e| {
911 ZigError::Zag(format!(
912 "review gather_diff failed for step '{step_name}': {e}"
913 ))
914 })?;
915 let user_prompt = if cfg.prompt.is_empty() {
916 None
917 } else {
918 Some(cfg.prompt.as_str())
919 };
920 let review_prompt =
921 agent_review::build_review_prompt(&diff, title.as_deref(), user_prompt);
922
923 let mut builder = apply_agent_config(AgentBuilder::new(), cfg);
924 builder = install_live_streaming(builder, step_name, session, prefix);
925 let output = builder.exec(&review_prompt).await.map_err(|e| {
926 ZigError::Zag(format!("review exec failed for step '{step_name}': {e}"))
927 })?;
928 Ok(output.result.unwrap_or_default())
929 }
930 "plan" => {
931 let (plan_output_path, instructions) = match &cfg.command_params {
932 Some(CommandParams::Plan {
933 output,
934 instructions,
935 }) => (output.clone(), instructions.clone()),
936 _ => (None, None),
937 };
938
939 let plan_prompt = agent_plan::build_plan_prompt(&cfg.prompt, instructions.as_deref());
940
941 let mut builder = apply_agent_config(AgentBuilder::new(), cfg);
942 builder = install_live_streaming(builder, step_name, session, prefix);
943 let output = builder.exec(&plan_prompt).await.map_err(|e| {
944 ZigError::Zag(format!("plan exec failed for step '{step_name}': {e}"))
945 })?;
946 let text = output.result.unwrap_or_default();
947
948 if let Some(path_str) = plan_output_path {
949 let target = resolve_plan_output_path(&path_str);
950 if let Some(parent) = target.parent()
951 && !parent.as_os_str().is_empty()
952 {
953 std::fs::create_dir_all(parent).map_err(|e| {
954 ZigError::Io(format!(
955 "failed to create plan output directory {}: {e}",
956 parent.display()
957 ))
958 })?;
959 }
960 std::fs::write(&target, &text).map_err(|e| {
961 ZigError::Io(format!(
962 "failed to write plan output to {}: {e}",
963 target.display()
964 ))
965 })?;
966 eprintln!("plan written to {}", target.display());
967 }
968
969 Ok(text)
970 }
971 "pipe" => {
972 let session_ids = match &cfg.command_params {
973 Some(CommandParams::Pipe { session_ids }) => session_ids.as_slice(),
974 _ => &[] as &[String],
975 };
976 let context = build_pipe_context(session_ids, cfg.root.as_deref())?;
977 let combined = format!(
978 "Here are results from previous agent sessions:\n\n{context}\n\n{}",
979 cfg.prompt
980 );
981
982 let mut builder = apply_agent_config(AgentBuilder::new(), cfg);
983 builder = install_live_streaming(builder, step_name, session, prefix);
984 let output = builder.exec(&combined).await.map_err(|e| {
985 ZigError::Zag(format!("pipe exec failed for step '{step_name}': {e}"))
986 })?;
987 Ok(output.result.unwrap_or_default())
988 }
989 "collect" => {
990 let session_ids = match &cfg.command_params {
991 Some(CommandParams::Collect { session_ids }) => session_ids.clone(),
992 _ => Vec::new(),
993 };
994 let params = orch_collect::CollectParams {
995 session_ids,
996 tag: None,
997 json: true,
998 root: cfg.root.clone(),
999 };
1000 let results = orch_collect::collect_results(¶ms).map_err(|e| {
1001 ZigError::Zag(format!("collect failed for step '{step_name}': {e}"))
1002 })?;
1003 let json = serde_json::to_string(&results)
1004 .map_err(|e| ZigError::Execution(format!("collect serialization failed: {e}")))?;
1005 emit_captured(&json, step_name, session, prefix);
1006 Ok(json)
1007 }
1008 "summary" => {
1009 let session_ids = match &cfg.command_params {
1010 Some(CommandParams::Summary { session_ids }) => session_ids.clone(),
1011 _ => Vec::new(),
1012 };
1013 let params = orch_summary::SummaryParams {
1014 session_ids,
1015 tag: None,
1016 stats: false,
1017 json: true,
1018 root: cfg.root.clone(),
1019 };
1020 let results = orch_summary::summarize_sessions(¶ms).map_err(|e| {
1021 ZigError::Zag(format!("summary failed for step '{step_name}': {e}"))
1022 })?;
1023 let json = serde_json::to_string(&results)
1024 .map_err(|e| ZigError::Execution(format!("summary serialization failed: {e}")))?;
1025 emit_captured(&json, step_name, session, prefix);
1026 Ok(json)
1027 }
1028 other => Err(ZigError::Execution(format!(
1029 "unknown command '{other}' for step '{step_name}'"
1030 ))),
1031 }
1032}
1033
1034fn install_live_streaming(
1041 builder: AgentBuilder,
1042 step_name: &str,
1043 session: Option<&Arc<SessionWriter>>,
1044 prefix: Option<&str>,
1045) -> AgentBuilder {
1046 let step_name_owned = step_name.to_string();
1047 let prefix_owned = prefix.map(String::from);
1048 let session_owned = session.cloned();
1049 builder.on_log_event(move |evt| {
1050 let Some(text) = zag_agent::listen::format_event_text(evt, false) else {
1051 return;
1052 };
1053 emit_live_line(
1054 &text,
1055 &step_name_owned,
1056 session_owned.as_ref(),
1057 prefix_owned.as_deref(),
1058 );
1059 })
1060}
1061
1062fn emit_live_line(
1065 text: &str,
1066 step_name: &str,
1067 session: Option<&Arc<SessionWriter>>,
1068 prefix: Option<&str>,
1069) {
1070 use std::io::Write;
1071 if text.is_empty() {
1072 return;
1073 }
1074 let stderr = std::io::stderr();
1075 for line in text.lines() {
1076 if let Some(w) = session {
1077 let _ = w.step_output(step_name, OutputStream::Stdout, line);
1078 }
1079 let mut h = stderr.lock();
1080 let _ = match prefix {
1081 Some(p) => writeln!(h, "[{p}] {line}"),
1082 None => writeln!(h, "{line}"),
1083 };
1084 }
1085}
1086
1087fn emit_captured(
1091 text: &str,
1092 step_name: &str,
1093 session: Option<&Arc<SessionWriter>>,
1094 prefix: Option<&str>,
1095) {
1096 emit_live_line(text, step_name, session, prefix);
1097}
1098
1099fn build_pipe_context(session_ids: &[String], root: Option<&str>) -> Result<String, ZigError> {
1105 let mut parts = Vec::new();
1106 for (i, id) in session_ids.iter().enumerate() {
1107 let Some(text) = orch_collect::extract_last_assistant_message(id, root) else {
1108 eprintln!("warning: no result found for upstream session {id}");
1109 continue;
1110 };
1111 let short = &id[..id.len().min(8)];
1112 let block = if session_ids.len() == 1 {
1113 format!("<session-result session=\"{short}\">\n{text}\n</session-result>")
1114 } else {
1115 format!(
1116 "<session-result index=\"{}\" session=\"{short}\">\n{text}\n</session-result>",
1117 i + 1
1118 )
1119 };
1120 parts.push(block);
1121 }
1122
1123 if parts.is_empty() {
1124 return Err(ZigError::Execution(
1125 "pipe: no results available from the specified sessions".into(),
1126 ));
1127 }
1128 Ok(parts.join("\n\n"))
1129}
1130
1131fn resolve_plan_output_path(path_str: &str) -> std::path::PathBuf {
1136 let expanded = expand_path(path_str);
1137 let path = std::path::PathBuf::from(&expanded);
1138 if path.extension().is_some() {
1139 return path;
1140 }
1141 let stamp = chrono::Utc::now().format("%Y%m%d-%H%M%S");
1142 path.join(format!("plan-{stamp}.md"))
1143}
1144
1145#[allow(clippy::too_many_arguments)]
1150async fn execute_step(
1151 step: &Step,
1152 prompt: &str,
1153 workflow_name: &str,
1154 model_override: Option<&str>,
1155 prefix: Option<&str>,
1156 session: Option<&Arc<SessionWriter>>,
1157 rendered_system_prompt: Option<&str>,
1158 workflow_provider: Option<&str>,
1159 workflow_model: Option<&str>,
1160 extra_add_dirs: &[std::path::PathBuf],
1161) -> Result<String, ZigError> {
1162 let cfg = build_agent_config(
1163 step,
1164 prompt,
1165 workflow_name,
1166 model_override,
1167 rendered_system_prompt,
1168 workflow_provider,
1169 workflow_model,
1170 extra_add_dirs,
1171 );
1172 dispatch_agent(&cfg, &step.name, session, prefix).await
1173}
1174
1175#[allow(clippy::too_many_arguments)]
1180async fn run_step_attempts(
1181 step: &Step,
1182 prompt: &str,
1183 workflow_name: &str,
1184 prefix: Option<&str>,
1185 session: Option<&Arc<SessionWriter>>,
1186 rendered_system_prompt: Option<&str>,
1187 workflow_provider: Option<&str>,
1188 workflow_model: Option<&str>,
1189 extra_add_dirs: &[std::path::PathBuf],
1190) -> Result<String, ZigError> {
1191 let mut attempts = 0;
1192 let max_attempts = if step.on_failure.as_ref() == Some(&FailurePolicy::Retry) {
1193 step.max_retries.unwrap_or(1) + 1
1194 } else {
1195 1
1196 };
1197
1198 loop {
1199 attempts += 1;
1200 let model_override = if attempts > 1 {
1201 step.retry_model.as_deref()
1202 } else {
1203 None
1204 };
1205 match execute_step(
1206 step,
1207 prompt,
1208 workflow_name,
1209 model_override,
1210 prefix,
1211 session,
1212 rendered_system_prompt,
1213 workflow_provider,
1214 workflow_model,
1215 extra_add_dirs,
1216 )
1217 .await
1218 {
1219 Ok(output) => return Ok(output),
1220 Err(e) => {
1221 if let Some(w) = session {
1222 let _ = w.step_failed(&step.name, None, attempts, &e.to_string());
1223 }
1224 if attempts < max_attempts {
1225 eprintln!(
1226 " retry {}/{} for step '{}'",
1227 attempts,
1228 max_attempts - 1,
1229 step.name
1230 );
1231 continue;
1232 }
1233 return Err(e);
1234 }
1235 }
1236 }
1237}
1238
1239fn extract_saves(
1246 output: &str,
1247 saves: &HashMap<String, String>,
1248) -> Result<HashMap<String, String>, ZigError> {
1249 let mut extracted = HashMap::new();
1250
1251 for (var_name, selector) in saves {
1252 let value = if selector == "$" {
1253 output.trim().to_string()
1254 } else if let Some(path) = selector.strip_prefix("$.") {
1255 let json: serde_json::Value = serde_json::from_str(output.trim()).map_err(|e| {
1256 ZigError::Execution(format!(
1257 "saves selector '{selector}' requires JSON output, but got parse error: {e}"
1258 ))
1259 })?;
1260 json_path_lookup(&json, path)
1261 } else {
1262 output.trim().to_string()
1263 };
1264
1265 extracted.insert(var_name.clone(), value);
1266 }
1267
1268 Ok(extracted)
1269}
1270
1271fn partition_tier<'a>(tier: &[&'a Step]) -> (Vec<&'a Step>, HashMap<String, Vec<&'a Step>>) {
1276 let mut sequential = Vec::new();
1277 let mut race_groups: HashMap<String, Vec<&'a Step>> = HashMap::new();
1278
1279 for step in tier {
1280 if let Some(group) = &step.race_group {
1281 race_groups.entry(group.clone()).or_default().push(step);
1282 } else {
1283 sequential.push(*step);
1284 }
1285 }
1286
1287 (sequential, race_groups)
1288}
1289
1290#[allow(clippy::too_many_arguments)]
1296async fn execute_race_group(
1297 steps: &[&Step],
1298 prompts: &HashMap<String, String>,
1299 system_prompts: &HashMap<String, String>,
1300 workflow_name: &str,
1301 tier_index: usize,
1302 session: Option<&Arc<SessionWriter>>,
1303 workflow_provider: Option<&str>,
1304 workflow_model: Option<&str>,
1305 storage_dirs: &HashMap<String, Vec<std::path::PathBuf>>,
1306) -> Result<(String, String), ZigError> {
1307 if let Some(w) = session {
1308 for step in steps {
1309 let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
1310 let preview = prompts
1311 .get(&step.name)
1312 .map(|p| prompt_preview(p))
1313 .unwrap_or_default();
1314 let _ = w.step_started(
1315 &step.name,
1316 tier_index,
1317 &zag_session_id,
1318 zag_command_name(&step.command),
1319 step.model.as_deref(),
1320 &preview,
1321 );
1322 }
1323 }
1324
1325 let race_started = Instant::now();
1326 let mut set: JoinSet<(String, Result<String, ZigError>)> = JoinSet::new();
1327
1328 for step in steps {
1329 let prompt = prompts
1330 .get(&step.name)
1331 .ok_or_else(|| ZigError::Execution(format!("missing prompt for step '{}'", step.name)))?
1332 .clone();
1333 eprintln!(" racing step '{}'...", step.name);
1334 let rendered_sp = system_prompts.get(&step.name).cloned();
1335 let empty: Vec<std::path::PathBuf> = Vec::new();
1336 let extra_dirs = storage_dirs.get(&step.name).unwrap_or(&empty).clone();
1337 let step_clone: Step = (*step).clone();
1338 let wf_name = workflow_name.to_string();
1339 let wf_provider = workflow_provider.map(String::from);
1340 let wf_model = workflow_model.map(String::from);
1341 let session_clone = session.cloned();
1342 let name = step.name.clone();
1343 set.spawn(async move {
1344 let res = execute_step(
1345 &step_clone,
1346 &prompt,
1347 &wf_name,
1348 None,
1349 None,
1350 session_clone.as_ref(),
1351 rendered_sp.as_deref(),
1352 wf_provider.as_deref(),
1353 wf_model.as_deref(),
1354 &extra_dirs,
1355 )
1356 .await;
1357 (name, res)
1358 });
1359 }
1360
1361 while let Some(joined) = set.join_next().await {
1363 let (winner_name, result) = match joined {
1364 Ok(pair) => pair,
1365 Err(e) if e.is_cancelled() => continue,
1366 Err(e) => return Err(ZigError::Execution(format!("race task panicked: {e}"))),
1367 };
1368 match result {
1369 Ok(stdout) => {
1370 set.abort_all();
1372 while let Some(r) = set.join_next().await {
1373 if let Ok((name, _)) = r {
1374 eprintln!(" cancelling step '{name}' (race lost)");
1375 }
1376 }
1377 let elapsed = race_started.elapsed().as_millis() as u64;
1378 eprintln!(" race won by '{winner_name}'");
1379 if let Some(w) = session {
1380 let _ = w.step_completed(&winner_name, 0, elapsed, Vec::new());
1381 }
1382 return Ok((winner_name, stdout));
1383 }
1384 Err(e) => {
1385 if let Some(w) = session {
1386 let _ = w.step_failed(&winner_name, None, 1, &e.to_string());
1387 }
1388 continue;
1390 }
1391 }
1392 }
1393
1394 Err(ZigError::Execution(
1395 "all racers failed without a winner".into(),
1396 ))
1397}
1398
1399#[allow(clippy::too_many_arguments)]
1401async fn execute_sequential_step(
1402 step: &Step,
1403 vars: &mut HashMap<String, String>,
1404 user_prompt: Option<&str>,
1405 step_outputs: &mut HashMap<String, String>,
1406 workflow_name: &str,
1407 pending_next: &mut Option<String>,
1408 tier_index: usize,
1409 session: Option<&Arc<SessionWriter>>,
1410 roles: &HashMap<String, Role>,
1411 resources: &ResourceCollector<'_>,
1412 memory: &MemoryCollector,
1413 storage: &StorageManager,
1414 workflow_dir: &Path,
1415 workflow_provider: Option<&str>,
1416 workflow_model: Option<&str>,
1417) -> Result<(), ZigError> {
1418 if let Some(condition) = &step.condition {
1419 if !evaluate_condition(condition, vars)? {
1420 eprintln!(
1421 " skipping '{}' (condition not met: {condition})",
1422 step.name
1423 );
1424 if let Some(w) = session {
1425 let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
1426 }
1427 return Ok(());
1428 }
1429 }
1430
1431 eprintln!(" running step '{}'...", step.name);
1432
1433 let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
1434 let rendered_sp = resolve_role_system_prompt(
1435 step,
1436 roles,
1437 resources,
1438 memory,
1439 storage,
1440 vars,
1441 workflow_dir,
1442 workflow_name,
1443 )?;
1444 let storage_dirs = storage.add_dirs_for_step(step.storage.as_deref());
1445 if let Some(w) = session {
1446 let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
1447 let _ = w.step_started(
1448 &step.name,
1449 tier_index,
1450 &zag_session_id,
1451 zag_command_name(&step.command),
1452 step.model.as_deref(),
1453 &prompt_preview(&prompt),
1454 );
1455 }
1456 let started = Instant::now();
1457 let result = run_step_attempts(
1458 step,
1459 &prompt,
1460 workflow_name,
1461 None,
1462 session,
1463 rendered_sp.as_deref(),
1464 workflow_provider,
1465 workflow_model,
1466 &storage_dirs,
1467 )
1468 .await;
1469
1470 match result {
1471 Ok(output) => {
1472 let mut saved_keys: Vec<String> = Vec::new();
1473 if !step.saves.is_empty() {
1474 let saved = extract_saves(&output, &step.saves)?;
1475 for (k, v) in &saved {
1476 eprintln!(" saved {k} = {v}");
1477 saved_keys.push(k.clone());
1478 }
1479 vars.extend(saved);
1480 }
1481
1482 step_outputs.insert(step.name.clone(), output);
1483 eprintln!(" completed '{}'", step.name);
1484 if let Some(w) = session {
1485 let _ = w.step_completed(
1486 &step.name,
1487 0,
1488 started.elapsed().as_millis() as u64,
1489 saved_keys,
1490 );
1491 }
1492
1493 if step.next.is_some() {
1494 *pending_next = step.next.clone();
1495 }
1496 }
1497 Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1498 FailurePolicy::Fail => return Err(e),
1499 FailurePolicy::Continue => {
1500 eprintln!(" step '{}' failed (continuing): {e}", step.name);
1501 }
1502 FailurePolicy::Retry => {
1503 return Err(e);
1504 }
1505 },
1506 }
1507
1508 Ok(())
1509}
1510
1511#[allow(clippy::too_many_arguments)]
1520async fn execute_parallel_tier(
1521 steps: &[&Step],
1522 vars: &mut HashMap<String, String>,
1523 user_prompt: Option<&str>,
1524 step_outputs: &mut HashMap<String, String>,
1525 workflow_name: &str,
1526 pending_next: &mut Option<String>,
1527 tier_index: usize,
1528 session: Option<&Arc<SessionWriter>>,
1529 roles: &HashMap<String, Role>,
1530 resources: &ResourceCollector<'_>,
1531 memory: &MemoryCollector,
1532 storage: &StorageManager,
1533 workflow_dir: &Path,
1534 workflow_provider: Option<&str>,
1535 workflow_model: Option<&str>,
1536) -> Result<(), ZigError> {
1537 let mut active: Vec<&Step> = Vec::new();
1540 let mut prompts: HashMap<String, String> = HashMap::new();
1541 let mut rendered_sps: HashMap<String, String> = HashMap::new();
1542 let mut storage_dirs_map: HashMap<String, Vec<std::path::PathBuf>> = HashMap::new();
1543 for step in steps {
1544 if let Some(condition) = &step.condition {
1545 if !evaluate_condition(condition, vars)? {
1546 eprintln!(
1547 " skipping '{}' (condition not met: {condition})",
1548 step.name
1549 );
1550 if let Some(w) = session {
1551 let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
1552 }
1553 continue;
1554 }
1555 }
1556 let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
1557 prompts.insert(step.name.clone(), prompt);
1558 if let Some(sp) = resolve_role_system_prompt(
1559 step,
1560 roles,
1561 resources,
1562 memory,
1563 storage,
1564 vars,
1565 workflow_dir,
1566 workflow_name,
1567 )? {
1568 rendered_sps.insert(step.name.clone(), sp);
1569 }
1570 storage_dirs_map.insert(
1571 step.name.clone(),
1572 storage.add_dirs_for_step(step.storage.as_deref()),
1573 );
1574 active.push(*step);
1575 }
1576
1577 if active.is_empty() {
1578 return Ok(());
1579 }
1580
1581 eprintln!(" running {} steps in parallel...", active.len());
1582
1583 let mut start_times: HashMap<String, Instant> = HashMap::new();
1584 let mut set: JoinSet<(String, Result<String, ZigError>)> = JoinSet::new();
1585 for step in &active {
1586 let step_clone: Step = (*step).clone();
1587 let prompt = prompts.remove(&step.name).unwrap_or_default();
1588 let rendered_sp = rendered_sps.remove(&step.name);
1589 let workflow_name_owned = workflow_name.to_string();
1590 let name = step.name.clone();
1591 eprintln!(" starting '{name}'...");
1592 if let Some(w) = session {
1593 let zag_session_id = format!("zig-{workflow_name}-{name}");
1594 let _ = w.step_started(
1595 &name,
1596 tier_index,
1597 &zag_session_id,
1598 zag_command_name(&step.command),
1599 step.model.as_deref(),
1600 &prompt_preview(&prompt),
1601 );
1602 }
1603 start_times.insert(name.clone(), Instant::now());
1604 let session_clone = session.cloned();
1605 let wf_provider = workflow_provider.map(String::from);
1606 let wf_model = workflow_model.map(String::from);
1607 let storage_dirs = storage_dirs_map.remove(&step.name).unwrap_or_default();
1608 set.spawn(async move {
1609 let res = run_step_attempts(
1610 &step_clone,
1611 &prompt,
1612 &workflow_name_owned,
1613 Some(&name),
1614 session_clone.as_ref(),
1615 rendered_sp.as_deref(),
1616 wf_provider.as_deref(),
1617 wf_model.as_deref(),
1618 &storage_dirs,
1619 )
1620 .await;
1621 (name, res)
1622 });
1623 }
1624
1625 let mut results: HashMap<String, Result<String, ZigError>> = HashMap::new();
1626 while let Some(joined) = set.join_next().await {
1627 match joined {
1628 Ok((name, res)) => {
1629 results.insert(name, res);
1630 }
1631 Err(e) => {
1632 return Err(ZigError::Execution(format!(
1633 "parallel step task panicked: {e}"
1634 )));
1635 }
1636 }
1637 }
1638
1639 let mut errors: Vec<String> = Vec::new();
1641 for step in &active {
1642 let Some(res) = results.remove(&step.name) else {
1643 continue;
1644 };
1645 let elapsed = start_times
1646 .remove(&step.name)
1647 .map(|t| t.elapsed().as_millis() as u64)
1648 .unwrap_or(0);
1649 match res {
1650 Ok(output) => {
1651 let mut saved_keys: Vec<String> = Vec::new();
1652 if !step.saves.is_empty() {
1653 let saved = extract_saves(&output, &step.saves)?;
1654 for (k, v) in &saved {
1655 eprintln!(" saved {k} = {v}");
1656 saved_keys.push(k.clone());
1657 }
1658 vars.extend(saved);
1659 }
1660 step_outputs.insert(step.name.clone(), output);
1661 eprintln!(" completed '{}'", step.name);
1662 if let Some(w) = session {
1663 let _ = w.step_completed(&step.name, 0, elapsed, saved_keys);
1664 }
1665 if step.next.is_some() && pending_next.is_none() {
1666 *pending_next = step.next.clone();
1667 }
1668 }
1669 Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1670 FailurePolicy::Continue => {
1671 eprintln!(" step '{}' failed (continuing): {e}", step.name);
1672 }
1673 FailurePolicy::Fail | FailurePolicy::Retry => {
1674 errors.push(format!("'{}': {e}", step.name));
1675 }
1676 },
1677 }
1678 }
1679
1680 if !errors.is_empty() {
1681 return Err(ZigError::Execution(format!(
1682 "parallel step(s) failed: {}",
1683 errors.join("; ")
1684 )));
1685 }
1686
1687 Ok(())
1688}
1689
1690fn init_vars(workflow: &Workflow) -> HashMap<String, String> {
1693 let mut vars = HashMap::new();
1694 for (name, var) in &workflow.vars {
1695 let value = match &var.default {
1696 Some(toml::Value::String(s)) => s.clone(),
1697 Some(toml::Value::Integer(n)) => n.to_string(),
1698 Some(toml::Value::Float(f)) => f.to_string(),
1699 Some(toml::Value::Boolean(b)) => b.to_string(),
1700 Some(other) => other.to_string(),
1701 None => String::new(),
1702 };
1703 vars.insert(name.clone(), value);
1704 }
1705 vars
1706}
1707
1708#[allow(clippy::too_many_arguments)]
1710async fn execute(
1711 workflow: &Workflow,
1712 workflow_path: &std::path::Path,
1713 user_prompt: Option<&str>,
1714 workflow_dir: &Path,
1715 disable_resources: bool,
1716 disable_memory: bool,
1717 disable_storage: bool,
1718 dry_run: bool,
1719 dry_run_format: DryRunFormat,
1720) -> Result<(), ZigError> {
1721 let mut vars = init_vars(workflow);
1722
1723 let resource_collector = ResourceCollector::from_env(
1724 &workflow.workflow.name,
1725 &workflow.workflow.resources,
1726 workflow_dir,
1727 disable_resources,
1728 );
1729
1730 let config = ZigConfig::load();
1731 let workflow_memory_mode = MemoryMode::from_str_opt(workflow.workflow.memory.as_deref());
1732 let memory_collector = MemoryCollector::from_env(
1733 &workflow.workflow.name,
1734 workflow_memory_mode,
1735 &config,
1736 disable_memory,
1737 );
1738
1739 let storage_manager = if disable_storage || workflow.storage.is_empty() {
1747 StorageManager::empty()
1748 } else if dry_run {
1749 let backend = FilesystemBackend::from_cwd()?;
1750 StorageManager::build_dry(&workflow.storage, backend)
1751 } else {
1752 let backend = FilesystemBackend::from_cwd()?;
1753 StorageManager::build(&workflow.storage, backend)?
1754 };
1755
1756 load_file_defaults(&mut vars, &workflow.vars, workflow_dir)?;
1758
1759 let prompt_var = workflow
1761 .vars
1762 .iter()
1763 .find(|(_, v)| v.from.as_deref() == Some("prompt"))
1764 .map(|(name, _)| name.clone());
1765
1766 if let Some(ref var_name) = prompt_var {
1767 if let Some(prompt) = user_prompt {
1768 vars.insert(var_name.clone(), prompt.to_string());
1769 }
1770 }
1771
1772 if let Err(errors) = validate::validate_var_values(&vars, &workflow.vars) {
1774 let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
1775 return Err(ZigError::Validation(msgs.join("; ")));
1776 }
1777
1778 let effective_user_prompt = if prompt_var.is_some() {
1780 None
1781 } else {
1782 user_prompt
1783 };
1784
1785 let mut step_outputs: HashMap<String, String> = HashMap::new();
1786
1787 let wf_provider = workflow.workflow.provider.as_deref();
1788 let wf_model = workflow.workflow.model.as_deref();
1789
1790 let tiers = topological_sort(&workflow.steps)?;
1791
1792 if dry_run {
1793 let ctx = DryRunContext {
1794 workflow,
1795 workflow_path,
1796 workflow_dir,
1797 vars: &vars,
1798 user_prompt: effective_user_prompt,
1799 roles: &workflow.roles,
1800 resources: &resource_collector,
1801 memory: &memory_collector,
1802 storage: &storage_manager,
1803 wf_provider,
1804 wf_model,
1805 disable_resources,
1806 disable_memory,
1807 disable_storage,
1808 };
1809 return crate::dry_run::print_plan(&ctx, &tiers, dry_run_format);
1810 }
1811
1812 eprintln!(
1813 "running workflow '{}' ({} steps in {} tiers)",
1814 workflow.workflow.name,
1815 workflow.steps.len(),
1816 tiers.len()
1817 );
1818
1819 let coordinator = match SessionWriter::create(
1824 &workflow.workflow.name,
1825 &workflow_path.to_string_lossy(),
1826 user_prompt,
1827 tiers.len(),
1828 ) {
1829 Ok(writer) => {
1830 eprintln!("zig session: {}", writer.session_id());
1831 Some(SessionCoordinator::start(writer))
1832 }
1833 Err(e) => {
1834 eprintln!("warning: failed to open zig session log: {e}");
1835 None
1836 }
1837 };
1838 let session_writer: Option<Arc<SessionWriter>> = coordinator.as_ref().map(|c| c.writer());
1839 let session_ref = session_writer.as_ref();
1840
1841 let mut iteration = 0;
1842 let mut pending_next: Option<String> = None;
1843
1844 loop {
1845 let tiers_to_run = if let Some(ref next_step) = pending_next {
1846 let remaining: Vec<Vec<&Step>> = tiers
1848 .iter()
1849 .map(|tier| {
1850 tier.iter()
1851 .filter(|s| s.name == *next_step)
1852 .copied()
1853 .collect::<Vec<_>>()
1854 })
1855 .filter(|tier| !tier.is_empty())
1856 .collect();
1857 pending_next = None;
1858 remaining
1859 } else if iteration == 0 {
1860 tiers.clone()
1861 } else {
1862 break;
1863 };
1864
1865 for (tier_index, tier) in tiers_to_run.iter().enumerate() {
1866 let (non_race, race_groups) = partition_tier(tier);
1867
1868 if let Some(w) = session_ref {
1869 let names: Vec<String> = tier.iter().map(|s| s.name.clone()).collect();
1870 let _ = w.tier_started(tier_index, names);
1871 }
1872
1873 if non_race.len() <= 1 {
1877 for step in &non_race {
1878 execute_sequential_step(
1879 step,
1880 &mut vars,
1881 effective_user_prompt,
1882 &mut step_outputs,
1883 &workflow.workflow.name,
1884 &mut pending_next,
1885 tier_index,
1886 session_ref,
1887 &workflow.roles,
1888 &resource_collector,
1889 &memory_collector,
1890 &storage_manager,
1891 workflow_dir,
1892 wf_provider,
1893 wf_model,
1894 )
1895 .await?;
1896 }
1897 } else {
1898 execute_parallel_tier(
1899 &non_race,
1900 &mut vars,
1901 effective_user_prompt,
1902 &mut step_outputs,
1903 &workflow.workflow.name,
1904 &mut pending_next,
1905 tier_index,
1906 session_ref,
1907 &workflow.roles,
1908 &resource_collector,
1909 &memory_collector,
1910 &storage_manager,
1911 workflow_dir,
1912 wf_provider,
1913 wf_model,
1914 )
1915 .await?;
1916 }
1917
1918 for (group_name, race_steps) in &race_groups {
1920 eprintln!(" starting race group '{group_name}'...");
1921
1922 let mut prompts = HashMap::new();
1924 let mut race_sps: HashMap<String, String> = HashMap::new();
1925 let mut race_storage_dirs: HashMap<String, Vec<std::path::PathBuf>> =
1926 HashMap::new();
1927 let mut active_steps: Vec<&Step> = Vec::new();
1928 for step in race_steps {
1929 if let Some(condition) = &step.condition {
1930 if !evaluate_condition(condition, &vars)? {
1931 eprintln!(
1932 " skipping '{}' (condition not met: {condition})",
1933 step.name
1934 );
1935 continue;
1936 }
1937 }
1938 let prompt =
1939 render_step_prompt(step, &vars, effective_user_prompt, &step_outputs);
1940 prompts.insert(step.name.clone(), prompt);
1941 if let Some(sp) = resolve_role_system_prompt(
1942 step,
1943 &workflow.roles,
1944 &resource_collector,
1945 &memory_collector,
1946 &storage_manager,
1947 &vars,
1948 workflow_dir,
1949 &workflow.workflow.name,
1950 )? {
1951 race_sps.insert(step.name.clone(), sp);
1952 }
1953 race_storage_dirs.insert(
1954 step.name.clone(),
1955 storage_manager.add_dirs_for_step(step.storage.as_deref()),
1956 );
1957 active_steps.push(step);
1958 }
1959
1960 if active_steps.is_empty() {
1961 continue;
1962 }
1963
1964 match execute_race_group(
1965 &active_steps,
1966 &prompts,
1967 &race_sps,
1968 &workflow.workflow.name,
1969 tier_index,
1970 session_ref,
1971 wf_provider,
1972 wf_model,
1973 &race_storage_dirs,
1974 )
1975 .await
1976 {
1977 Ok((winner_name, output)) => {
1978 if let Some(winner) = active_steps.iter().find(|s| s.name == winner_name) {
1980 if !winner.saves.is_empty() {
1981 let saved = extract_saves(&output, &winner.saves)?;
1982 for (k, v) in &saved {
1983 eprintln!(" saved {k} = {v}");
1984 }
1985 vars.extend(saved);
1986 }
1987 if winner.next.is_some() {
1988 pending_next = winner.next.clone();
1989 }
1990 }
1991 step_outputs.insert(winner_name.clone(), output);
1992 eprintln!(
1993 " completed race group '{group_name}' (winner: '{winner_name}')"
1994 );
1995 }
1996 Err(e) => return Err(e),
1997 }
1998 }
1999 }
2000
2001 iteration += 1;
2002 if pending_next.is_none() || iteration >= MAX_LOOP_ITERATIONS {
2003 if iteration >= MAX_LOOP_ITERATIONS {
2004 eprintln!("warning: reached maximum loop iterations ({MAX_LOOP_ITERATIONS})");
2005 }
2006 break;
2007 }
2008 }
2009
2010 eprintln!("workflow '{}' completed", workflow.workflow.name);
2011 if let Some(c) = coordinator {
2012 let _ = c.finish(SessionStatus::Success);
2013 }
2014 Ok(())
2015}
2016
2017fn zag_command_name(cmd: &Option<StepCommand>) -> &'static str {
2020 match cmd {
2021 None => "run",
2022 Some(StepCommand::Review) => "review",
2023 Some(StepCommand::Plan) => "plan",
2024 Some(StepCommand::Pipe) => "pipe",
2025 Some(StepCommand::Collect) => "collect",
2026 Some(StepCommand::Summary) => "summary",
2027 }
2028}
2029
2030fn prompt_preview(prompt: &str) -> String {
2032 const MAX: usize = 200;
2033 let collapsed: String = prompt
2034 .chars()
2035 .map(|c| if c == '\n' { ' ' } else { c })
2036 .collect();
2037 if collapsed.chars().count() <= MAX {
2038 collapsed
2039 } else {
2040 let truncated: String = collapsed.chars().take(MAX).collect();
2041 format!("{truncated}…")
2042 }
2043}
2044
2045#[cfg(test)]
2046#[path = "run_tests.rs"]
2047mod tests;