1pub mod context;
2pub mod state;
3mod template;
4
5use std::collections::HashMap;
6use std::path::PathBuf;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use anyhow::{bail, Result};
11use colored::Colorize;
12use regex::Regex;
13use serde::Serialize;
14use tokio::sync::Mutex;
15use tokio::task::JoinHandle;
16
17use crate::cli::display;
18use crate::config::{ConfigManager, StepConfig};
19use crate::control_flow::ControlFlow;
20use crate::error::StepError;
21use crate::events::subscribers::{FileSubscriber, WebhookSubscriber};
22use crate::events::types::Event;
23use crate::events::EventBus;
24use crate::plugins::loader::PluginLoader;
25use crate::plugins::registry::PluginRegistry;
26use crate::prompts::{
27 detector::{StackDetector, StackInfo},
28 registry::Registry,
29};
30use crate::sandbox::config::SandboxConfig;
31use crate::sandbox::docker::DockerSandbox;
32use crate::sandbox::proxy::ApiProxy;
33use crate::sandbox::SandboxMode;
34use crate::steps::*;
35use crate::steps::{
36 agent::AgentExecutor, call::CallExecutor, chat::ChatExecutor, cmd::CmdExecutor,
37 gate::GateExecutor, map::MapExecutor, parallel::ParallelExecutor, repeat::RepeatExecutor,
38 script::ScriptExecutor, template_step::TemplateStepExecutor,
39};
40use crate::workflow::schema::{OutputType, StepDef, StepType, WorkflowDef};
41use context::Context;
42use state::WorkflowState;
43
44#[derive(Debug, Default)]
46pub struct EngineOptions {
47 pub verbose: bool,
48 pub quiet: bool,
49 pub json: bool,
51 pub dry_run: bool,
53 pub resume_from: Option<String>,
55 pub sandbox_mode: SandboxMode,
57 pub repo: Option<String>,
59}
60
61#[derive(Debug, Clone, Serialize)]
63pub struct StepRecord {
64 pub name: String,
65 pub step_type: String,
66 pub status: String,
67 pub duration_secs: f64,
68 pub output_summary: String,
69 #[serde(skip_serializing_if = "Option::is_none")]
70 pub input_tokens: Option<u64>,
71 #[serde(skip_serializing_if = "Option::is_none")]
72 pub output_tokens: Option<u64>,
73 #[serde(skip_serializing_if = "Option::is_none")]
74 pub cost_usd: Option<f64>,
75 #[serde(skip_serializing_if = "std::ops::Not::not")]
77 pub sandboxed: bool,
78}
79
80#[derive(Debug, Serialize)]
82pub struct WorkflowJsonOutput {
83 pub workflow_name: String,
84 pub status: String,
85 pub sandbox_mode: String,
86 pub steps: Vec<StepRecord>,
87 pub total_duration_secs: f64,
88 pub total_tokens: u64,
89 pub total_cost_usd: f64,
90}
91
92#[allow(dead_code)]
93pub struct Engine {
94 pub workflow: WorkflowDef,
95 pub context: Context,
96 config_manager: Arc<ConfigManager>,
97 pub verbose: bool,
98 pub quiet: bool,
99 pub json: bool,
100 pub dry_run: bool,
101 resume_from: Option<String>,
102 sandbox_mode: SandboxMode,
103 sandbox: SharedSandbox,
105 step_records: Vec<StepRecord>,
106 state: Option<WorkflowState>,
107 state_file: Option<PathBuf>,
108 pending_futures: HashMap<String, JoinHandle<Result<StepOutput, StepError>>>,
110 plugin_registry: Arc<Mutex<PluginRegistry>>,
112 pub event_bus: EventBus,
114 pub stack_info: Option<StackInfo>,
116 repo: Option<String>,
118 api_proxy: Option<ApiProxy>,
120}
121
122#[allow(dead_code)]
123impl Engine {
124 pub async fn new(
125 workflow: WorkflowDef,
126 target: String,
127 vars: HashMap<String, serde_json::Value>,
128 verbose: bool,
129 quiet: bool,
130 ) -> Self {
131 let options = EngineOptions {
132 verbose,
133 quiet,
134 ..Default::default()
135 };
136 Self::with_options(workflow, target, vars, options).await
137 }
138
139 pub async fn with_options(
140 workflow: WorkflowDef,
141 target: String,
142 vars: HashMap<String, serde_json::Value>,
143 options: EngineOptions,
144 ) -> Self {
145 let mut context = Context::new(target, vars.clone());
146 let args_obj: serde_json::Map<String, serde_json::Value> = vars.into_iter().collect();
148 context.insert_var("args", serde_json::Value::Object(args_obj));
149 let config_manager = Arc::new(ConfigManager::new(workflow.config.clone()));
150 let quiet = options.quiet || options.json;
152
153 let mut registry = PluginRegistry::new();
155 for plugin_cfg in &workflow.config.plugins {
156 match PluginLoader::load_plugin(&plugin_cfg.path) {
157 Ok(plugin) => {
158 tracing::info!(name = %plugin_cfg.name, path = %plugin_cfg.path, "Loaded plugin");
159 registry.register(plugin);
160 }
161 Err(e) => {
162 tracing::warn!(
163 name = %plugin_cfg.name,
164 path = %plugin_cfg.path,
165 error = %e,
166 "Failed to load plugin"
167 );
168 }
169 }
170 }
171
172 let mut event_bus = EventBus::new();
174 if let Some(ref events_cfg) = workflow.config.events {
175 if let Some(ref webhook_url) = events_cfg.webhook {
176 event_bus.add_subscriber(Box::new(WebhookSubscriber::new(webhook_url.clone())));
177 tracing::info!(url = %webhook_url, "Registered webhook event subscriber");
178 }
179 if let Some(ref file_path) = events_cfg.file {
180 event_bus.add_subscriber(Box::new(FileSubscriber::new(file_path.clone())));
181 tracing::info!(path = %file_path, "Registered file event subscriber");
182 }
183 }
184
185 let stack_info = detect_stack_if_registry_exists().await;
187 if let Some(ref info) = stack_info {
188 let stack_val = serde_json::json!({
189 "name": info.name,
190 "parent": info.parent_chain.first().cloned().unwrap_or_else(|| "_default".to_string()),
191 "tools": {
192 "lint": info.tools.get("lint").cloned().unwrap_or_default(),
193 "test": info.tools.get("test").cloned().unwrap_or_default(),
194 "build": info.tools.get("build").cloned().unwrap_or_default(),
195 "install": info.tools.get("install").cloned().unwrap_or_default(),
196 }
197 });
198 context.insert_var("stack", stack_val);
199 context.stack_info = Some(info.clone());
200 }
201 if let Some(ref pd) = workflow.prompts_dir {
203 context.prompts_dir = std::path::PathBuf::from(pd);
204 }
205
206 Self {
207 workflow,
208 context,
209 config_manager,
210 verbose: options.verbose,
211 quiet,
212 json: options.json,
213 dry_run: options.dry_run,
214 resume_from: options.resume_from,
215 sandbox_mode: options.sandbox_mode,
216 sandbox: None,
217 step_records: Vec::new(),
218 state: None,
219 state_file: None,
220 pending_futures: HashMap::new(),
221 plugin_registry: Arc::new(Mutex::new(registry)),
222 event_bus,
223 stack_info,
224 repo: options.repo,
225 api_proxy: None,
226 }
227 }
228
229 pub fn step_records(&self) -> &[StepRecord] {
231 &self.step_records
232 }
233
234 pub fn json_output(&self, status: &str, total_duration: Duration) -> WorkflowJsonOutput {
236 let total_tokens: u64 = self
237 .step_records
238 .iter()
239 .map(|r| r.input_tokens.unwrap_or(0) + r.output_tokens.unwrap_or(0))
240 .sum();
241 let total_cost: f64 = self.step_records.iter().filter_map(|r| r.cost_usd).sum();
242
243 WorkflowJsonOutput {
244 workflow_name: self.workflow.name.clone(),
245 status: status.to_string(),
246 sandbox_mode: format!("{:?}", self.sandbox_mode),
247 steps: self.step_records.clone(),
248 total_duration_secs: total_duration.as_secs_f64(),
249 total_tokens,
250 total_cost_usd: total_cost,
251 }
252 }
253
254 async fn sandbox_up(&mut self) -> Result<()> {
259 let sandbox_config = SandboxConfig::from_global_config(&self.workflow.config.global);
260 let workspace = std::env::current_dir()
261 .map(|p| p.to_string_lossy().to_string())
262 .unwrap_or_else(|_| ".".to_string());
263
264 let is_repo_mode = self.repo.is_some();
265
266 let effective_workspace = if is_repo_mode {
269 let tmp = std::env::temp_dir().join("minion-repo-workspace");
270 std::fs::create_dir_all(&tmp).ok();
271 let _ = std::process::Command::new("git")
273 .args(["init", "--bare"])
274 .current_dir(&tmp)
275 .stdout(std::process::Stdio::null())
276 .stderr(std::process::Stdio::null())
277 .status();
278 tmp.to_string_lossy().to_string()
279 } else {
280 workspace.clone()
281 };
282
283 let mut docker = DockerSandbox::new(sandbox_config, &effective_workspace);
284
285 if let Ok(api_key) = std::env::var("ANTHROPIC_API_KEY") {
287 match ApiProxy::start(api_key).await {
288 Ok(proxy) => {
289 docker.set_proxy(proxy.port());
290 if !self.quiet {
291 println!(
292 " {} API proxy started on port {} — secrets stay on host",
293 "🔐".green(),
294 proxy.port()
295 );
296 }
297 self.api_proxy = Some(proxy);
298 }
299 Err(e) => {
300 tracing::warn!(error = %e, "Failed to start API proxy — falling back to env vars");
301 }
302 }
303 }
304
305 if !self.quiet {
306 if let Some(ref repo) = self.repo {
307 println!(" {} Creating Docker sandbox (repo: {})…", "🐳".cyan(), repo);
308 } else {
309 println!(" {} Creating Docker sandbox container…", "🐳".cyan());
310 }
311 }
312
313 let t0 = Instant::now();
314 docker.create().await?;
315 let create_ms = t0.elapsed().as_millis();
316
317 let t1 = Instant::now();
318 if is_repo_mode {
319 let repo = self.repo.as_ref().unwrap();
321 let clone_output = docker
322 .run_command(&format!(
323 "cd /workspace && rm -rf * .* 2>/dev/null; \
324 git clone --depth=50 https://x-access-token:$GH_TOKEN@github.com/{repo}.git ."
325 ))
326 .await?;
327
328 if clone_output.exit_code != 0 {
329 bail!(
330 "Failed to clone repo '{}' inside sandbox: {}",
331 repo,
332 clone_output.stderr
333 );
334 }
335 tracing::info!(repo = %repo, "Cloned repository inside sandbox");
336 } else {
337 docker.copy_workspace(&workspace).await?;
338 }
339 let copy_ms = t1.elapsed().as_millis();
340
341 let t2 = Instant::now();
346 let _ = docker
347 .run_command(
348 "git config --global --add safe.directory /workspace \
349 && git config --global user.name 'Minion Engine' \
350 && git config --global user.email 'minion@localhost' \
351 && if [ -n \"$GH_TOKEN\" ]; then \
352 git config --global credential.helper '!f() { echo \"password=$GH_TOKEN\"; }; f'; \
353 git config --global credential.https://github.com.username x-access-token; \
354 fi",
355 )
356 .await;
357
358 let _ = docker
361 .run_command(
362 "if id minion >/dev/null 2>&1; then \
363 chown -R minion:minion /workspace 2>/dev/null; \
364 su - minion -c 'git config --global --add safe.directory /workspace \
365 && git config --global user.name \"Minion Engine\" \
366 && git config --global user.email \"minion@localhost\"'; \
367 if [ -n \"$GH_TOKEN\" ]; then \
368 su - minion -c \"git config --global credential.helper '!f() { echo password=\\$GH_TOKEN; }; f' \
369 && git config --global credential.https://github.com.username x-access-token\"; \
370 fi; \
371 fi",
372 )
373 .await;
374 let git_ms = t2.elapsed().as_millis();
375
376 let total_ms = t0.elapsed().as_millis();
377
378 if !self.quiet {
379 let mode_label = if is_repo_mode { "clone" } else { "copy" };
380 println!(
381 " {} Sandbox ready — container {:.1}s, {} {:.1}s, git {:.1}s (total {:.1}s)",
382 "🔒".green(),
383 create_ms as f64 / 1000.0,
384 mode_label,
385 copy_ms as f64 / 1000.0,
386 git_ms as f64 / 1000.0,
387 total_ms as f64 / 1000.0,
388 );
389 }
390
391 tracing::info!(
392 create_ms,
393 copy_ms,
394 git_ms,
395 total_ms,
396 repo_mode = is_repo_mode,
397 "Sandbox setup complete"
398 );
399
400 self.sandbox = Some(Arc::new(Mutex::new(docker)));
401 Ok(())
402 }
403
404 async fn sandbox_down(&mut self) -> Result<()> {
408 if let Some(sb) = self.sandbox.take() {
409 let mut docker = sb.lock().await;
410 let is_repo_mode = self.repo.is_some();
411
412 let copy_back_ms = if is_repo_mode {
413 if !self.quiet {
416 println!(" {} Repo mode — skipping copy-back (all changes pushed from container)", "📦".cyan());
417 }
418 0u128
419 } else {
420 let workspace = std::env::current_dir()
421 .map(|p| p.to_string_lossy().to_string())
422 .unwrap_or_else(|_| ".".to_string());
423
424 if !self.quiet {
425 println!(" {} Copying results from sandbox…", "📦".cyan());
426 }
427
428 let t0 = Instant::now();
429 docker.copy_results(&workspace).await?;
430 t0.elapsed().as_millis()
431 };
432
433 let t1 = Instant::now();
434 docker.destroy().await?;
435 let destroy_ms = t1.elapsed().as_millis();
436
437 if !self.quiet {
438 println!(
439 " {} Sandbox destroyed — copy-back {:.1}s, destroy {:.1}s",
440 "🗑️ ".dimmed(),
441 copy_back_ms as f64 / 1000.0,
442 destroy_ms as f64 / 1000.0,
443 );
444 }
445
446 tracing::info!(copy_back_ms, destroy_ms, repo_mode = is_repo_mode, "Sandbox teardown complete");
447 }
448
449 if let Some(proxy) = self.api_proxy.take() {
451 proxy.stop().await;
452 }
453
454 Ok(())
455 }
456
457 fn should_sandbox_step(&self, step_type: &StepType) -> bool {
459 if *step_type == StepType::Script {
461 return false;
462 }
463 match self.sandbox_mode {
464 SandboxMode::Disabled => false,
465 SandboxMode::FullWorkflow | SandboxMode::Devbox => {
466 matches!(step_type, StepType::Cmd | StepType::Agent)
468 }
469 SandboxMode::AgentOnly => {
470 matches!(step_type, StepType::Agent)
472 }
473 }
474 }
475
476 pub async fn run(&mut self) -> Result<StepOutput> {
479 let state_file = WorkflowState::state_file_path(&self.workflow.name);
481 self.state_file = Some(state_file.clone());
482
483 let mut loaded_state: Option<WorkflowState> = None;
484 if let Some(ref resume_step) = self.resume_from.clone() {
485 match WorkflowState::find_latest(&self.workflow.name) {
486 Some(path) => match WorkflowState::load(&path) {
487 Ok(s) => {
488 let exists = self.workflow.steps.iter().any(|s| &s.name == resume_step);
489 if !exists {
490 bail!(
491 "Resume step '{}' not found in workflow '{}'. \
492 Available steps: {}",
493 resume_step,
494 self.workflow.name,
495 self.workflow
496 .steps
497 .iter()
498 .map(|s| s.name.as_str())
499 .collect::<Vec<_>>()
500 .join(", ")
501 );
502 }
503 if !self.quiet {
504 println!(
505 " {} Resuming from step '{}' (state: {})",
506 "↺".cyan(),
507 resume_step,
508 path.display()
509 );
510 }
511 loaded_state = Some(s);
512 }
513 Err(e) => bail!("Failed to load state file {}: {e}", path.display()),
514 },
515 None => {
516 bail!(
517 "No state file found for workflow '{}'. \
518 Cannot resume. Run the workflow without --resume first.",
519 self.workflow.name
520 );
521 }
522 }
523 }
524
525 let mut current_state = WorkflowState::new(&self.workflow.name);
527
528 if !self.quiet {
530 display::workflow_start(&self.workflow.name);
531 if self.sandbox_mode != SandboxMode::Disabled {
532 println!(" {} Sandbox mode: {:?}", "🔒".cyan(), self.sandbox_mode);
533 }
534 }
535
536 if self.sandbox_mode != SandboxMode::Disabled {
538 self.sandbox_up().await?;
539 }
540
541 self.event_bus
543 .emit(Event::WorkflowStarted {
544 timestamp: chrono::Utc::now(),
545 })
546 .await;
547
548 let start = Instant::now();
549 let steps = self.workflow.steps.clone();
550 let mut last_output = StepOutput::Empty;
551 let mut step_count = 0;
552
553 let resume_from = self.resume_from.clone();
554 let mut resuming = resume_from.is_some();
555
556 let run_result: Result<(), anyhow::Error> = async {
558 for step_def in &steps {
559 if resuming {
561 let is_resume_point = resume_from.as_deref() == Some(&step_def.name);
562 if !is_resume_point {
563 if let Some(ref ls) = loaded_state {
564 if let Some(output) = ls.steps.get(&step_def.name) {
565 self.context.store(&step_def.name, output.clone());
566 if !self.quiet {
567 println!(
568 " {} {} {}",
569 "⏭".yellow(),
570 step_def.name,
571 "(skipped — loaded from state)".dimmed()
572 );
573 }
574 self.step_records.push(StepRecord {
575 name: step_def.name.clone(),
576 step_type: step_def.step_type.to_string(),
577 status: "skipped_resume".to_string(),
578 duration_secs: 0.0,
579 output_summary: truncate(output.text(), 100),
580 input_tokens: None,
581 output_tokens: None,
582 cost_usd: None,
583 sandboxed: false,
584 });
585 }
586 }
587 continue;
588 }
589 resuming = false;
590 }
591
592 if step_def.async_exec == Some(true) {
594 let handle = self.spawn_async_step(step_def);
595 self.pending_futures.insert(step_def.name.clone(), handle);
596 if !self.quiet {
597 println!(
598 " {} {} {} {}",
599 "⚡".yellow(),
600 step_def.name,
601 format!("[{}]", step_def.step_type).cyan(),
602 "(async — spawned)".dimmed()
603 );
604 }
605 step_count += 1;
606 continue;
607 }
608
609 self.await_pending_deps(step_def).await?;
611
612 match self.execute_step(step_def).await {
613 Ok(output) => {
614 current_state
615 .steps
616 .insert(step_def.name.clone(), output.clone());
617 if let Some(ref p) = self.state_file {
618 let _ = current_state.save(p);
619 }
620 last_output = output;
621 step_count += 1;
622 }
623 Err(StepError::ControlFlow(ControlFlow::Skip { message })) => {
624 self.context.store(&step_def.name, StepOutput::Empty);
625 if !self.quiet {
626 let pb = display::step_start(
627 &step_def.name,
628 &step_def.step_type.to_string(),
629 );
630 display::step_skip(&pb, &step_def.name, &message);
631 }
632 self.step_records.push(StepRecord {
633 name: step_def.name.clone(),
634 step_type: step_def.step_type.to_string(),
635 status: "skipped".to_string(),
636 duration_secs: 0.0,
637 output_summary: message.clone(),
638 input_tokens: None,
639 output_tokens: None,
640 cost_usd: None,
641 sandboxed: false,
642 });
643 }
644 Err(StepError::ControlFlow(ControlFlow::Fail { message })) => {
645 if !self.quiet {
646 display::workflow_failed(&step_def.name, &message);
647 }
648 bail!("Step '{}' failed: {}", step_def.name, message);
649 }
650 Err(StepError::ControlFlow(ControlFlow::Break { .. })) => {
651 break;
652 }
653 Err(e) => {
654 if !self.quiet {
655 display::workflow_failed(&step_def.name, &e.to_string());
656 }
657 return Err(e.into());
658 }
659 }
660 }
661 Ok(())
662 }
663 .await;
664
665 let remaining: Vec<(String, JoinHandle<Result<StepOutput, StepError>>)> =
667 self.pending_futures.drain().collect();
668 for (name, handle) in remaining {
669 let step_type = self
670 .workflow
671 .steps
672 .iter()
673 .find(|s| s.name == name)
674 .map(|s| s.step_type.to_string())
675 .unwrap_or_else(|| "async".to_string());
676 match handle.await {
677 Ok(Ok(output)) => {
678 self.context.store(&name, output.clone());
679 self.step_records.push(StepRecord {
680 name: name.clone(),
681 step_type: step_type.clone(),
682 status: "ok".to_string(),
683 duration_secs: 0.0,
684 output_summary: truncate(output.text(), 100),
685 input_tokens: None,
686 output_tokens: None,
687 cost_usd: None,
688 sandboxed: false,
689 });
690 }
691 Ok(Err(e)) => {
692 self.step_records.push(StepRecord {
693 name: name.clone(),
694 step_type: step_type.clone(),
695 status: "failed".to_string(),
696 duration_secs: 0.0,
697 output_summary: e.to_string(),
698 input_tokens: None,
699 output_tokens: None,
700 cost_usd: None,
701 sandboxed: false,
702 });
703 if !self.quiet {
704 eprintln!(" {} Async step '{}' failed: {}", "✗".red(), name, e);
705 }
706 }
707 Err(e) => {
708 let msg = format!("Async step '{}' panicked: {e}", name);
709 self.step_records.push(StepRecord {
710 name: name.clone(),
711 step_type,
712 status: "failed".to_string(),
713 duration_secs: 0.0,
714 output_summary: msg.clone(),
715 input_tokens: None,
716 output_tokens: None,
717 cost_usd: None,
718 sandboxed: false,
719 });
720 if !self.quiet {
721 eprintln!(" {} {}", "✗".red(), msg);
722 }
723 }
724 }
725 }
726
727 if self.sandbox_mode != SandboxMode::Disabled {
729 if let Err(e) = self.sandbox_down().await {
730 if !self.quiet {
731 eprintln!(" {} Sandbox cleanup warning: {e}", "⚠".yellow());
732 }
733 }
734 }
735
736 self.event_bus
738 .emit(Event::WorkflowCompleted {
739 duration_ms: start.elapsed().as_millis() as u64,
740 timestamp: chrono::Utc::now(),
741 })
742 .await;
743
744 run_result?;
746
747 if !self.quiet {
748 display::workflow_done(start.elapsed(), step_count);
749 }
750
751 self.state = Some(current_state);
752 Ok(last_output)
753 }
754
755 pub async fn execute_step(&mut self, step_def: &StepDef) -> Result<StepOutput, StepError> {
756 let config = self.resolve_config(step_def);
757 let use_sandbox = self.should_sandbox_step(&step_def.step_type);
758
759 let pb = if !self.quiet {
760 let label = if use_sandbox {
761 format!("{} 🐳", step_def.step_type)
762 } else {
763 step_def.step_type.to_string()
764 };
765 Some(display::step_start(&step_def.name, &label))
766 } else {
767 None
768 };
769
770 let start = Instant::now();
771
772 self.event_bus
774 .emit(Event::StepStarted {
775 step_name: step_def.name.clone(),
776 step_type: step_def.step_type.to_string(),
777 timestamp: chrono::Utc::now(),
778 })
779 .await;
780
781 tracing::debug!(
782 step = %step_def.name,
783 step_type = %step_def.step_type,
784 sandboxed = use_sandbox,
785 "Executing step"
786 );
787
788 let sandbox_ref = if use_sandbox { &self.sandbox } else { &None };
790
791 let result = match step_def.step_type {
792 StepType::Cmd => {
793 CmdExecutor
794 .execute_sandboxed(step_def, &config, &self.context, sandbox_ref)
795 .await
796 }
797 StepType::Agent => {
798 AgentExecutor
799 .execute_sandboxed(step_def, &config, &self.context, sandbox_ref)
800 .await
801 }
802 StepType::Gate => GateExecutor.execute(step_def, &config, &self.context).await,
803 StepType::Repeat => {
804 RepeatExecutor::new(&self.workflow.scopes, self.sandbox.clone())
805 .with_config_manager(Some(Arc::clone(&self.config_manager)))
806 .execute(step_def, &config, &self.context)
807 .await
808 }
809 StepType::Chat => ChatExecutor.execute(step_def, &config, &self.context).await,
810 StepType::Map => {
811 MapExecutor::new(&self.workflow.scopes, self.sandbox.clone())
812 .with_config_manager(Some(Arc::clone(&self.config_manager)))
813 .execute(step_def, &config, &self.context)
814 .await
815 }
816 StepType::Parallel => {
817 ParallelExecutor::new(&self.workflow.scopes, self.sandbox.clone())
818 .with_config_manager(Some(Arc::clone(&self.config_manager)))
819 .execute(step_def, &config, &self.context)
820 .await
821 }
822 StepType::Call => {
823 CallExecutor::new(&self.workflow.scopes, self.sandbox.clone())
824 .with_config_manager(Some(Arc::clone(&self.config_manager)))
825 .execute(step_def, &config, &self.context)
826 .await
827 }
828 StepType::Template => {
829 let prompts_dir = self.workflow.prompts_dir.as_deref();
830 TemplateStepExecutor::new(prompts_dir)
831 .execute(step_def, &config, &self.context)
832 .await
833 }
834 StepType::Script => {
835 ScriptExecutor
836 .execute(step_def, &config, &self.context)
837 .await
838 } };
841
842 let elapsed = start.elapsed();
843 let duration_ms = elapsed.as_millis() as u64;
844
845 let result = match result {
848 Ok(output) => parse_step_output(output, step_def),
849 err => err,
850 };
851
852 match &result {
853 Ok(output) => {
854 tracing::info!(
855 step = %step_def.name,
856 step_type = %step_def.step_type,
857 duration_ms = elapsed.as_millis(),
858 sandboxed = use_sandbox,
859 status = "ok",
860 "Step completed"
861 );
862 self.context.store(&step_def.name, output.clone());
863 if let Some(parsed) = extract_parsed_value(output, step_def) {
865 self.context.store_parsed(&step_def.name, parsed);
866 }
867
868 let (it, ot, cost) = token_stats(output);
869 self.step_records.push(StepRecord {
870 name: step_def.name.clone(),
871 step_type: step_def.step_type.to_string(),
872 status: "ok".to_string(),
873 duration_secs: elapsed.as_secs_f64(),
874 output_summary: truncate(output.text(), 100),
875 input_tokens: it,
876 output_tokens: ot,
877 cost_usd: cost,
878 sandboxed: use_sandbox,
879 });
880
881 if let Some(pb) = &pb {
882 display::step_ok(pb, &step_def.name, elapsed);
883 }
884 }
885 Err(StepError::ControlFlow(cf)) => {
886 let msg = match cf {
887 ControlFlow::Skip { message } => format!("skipped: {message}"),
888 ControlFlow::Break { message, .. } => format!("break: {message}"),
889 ControlFlow::Fail { message } => format!("failed: {message}"),
890 ControlFlow::Next { message } => format!("next: {message}"),
891 };
892 tracing::info!(
893 step = %step_def.name,
894 step_type = %step_def.step_type,
895 duration_ms = elapsed.as_millis(),
896 status = "control_flow",
897 message = %msg,
898 "Step control flow"
899 );
900 if let Some(pb) = &pb {
901 display::step_skip(pb, &step_def.name, &msg);
902 }
903 }
904 Err(e) => {
905 tracing::warn!(
906 step = %step_def.name,
907 step_type = %step_def.step_type,
908 duration_ms = elapsed.as_millis(),
909 status = "error",
910 error = %e,
911 "Step failed"
912 );
913 self.step_records.push(StepRecord {
914 name: step_def.name.clone(),
915 step_type: step_def.step_type.to_string(),
916 status: "failed".to_string(),
917 duration_secs: elapsed.as_secs_f64(),
918 output_summary: e.to_string(),
919 input_tokens: None,
920 output_tokens: None,
921 cost_usd: None,
922 sandboxed: use_sandbox,
923 });
924 if let Some(pb) = &pb {
925 display::step_fail(pb, &step_def.name, &e.to_string());
926 }
927 }
928 }
929
930 match &result {
932 Ok(_) => {
933 self.event_bus
934 .emit(Event::StepCompleted {
935 step_name: step_def.name.clone(),
936 step_type: step_def.step_type.to_string(),
937 duration_ms,
938 timestamp: chrono::Utc::now(),
939 })
940 .await;
941 }
942 Err(e) if !matches!(e, StepError::ControlFlow(_)) => {
943 self.event_bus
944 .emit(Event::StepFailed {
945 step_name: step_def.name.clone(),
946 step_type: step_def.step_type.to_string(),
947 error: e.to_string(),
948 duration_ms,
949 timestamp: chrono::Utc::now(),
950 })
951 .await;
952 }
953 _ => {}
954 }
955
956 result
957 }
958
959 pub fn dry_run(&self) {
961 use colored::Colorize;
962
963 println!(
964 "{} {} (dry-run)",
965 "▶".cyan().bold(),
966 self.workflow.name.bold()
967 );
968 if self.sandbox_mode != SandboxMode::Disabled {
969 println!(" {} Sandbox mode: {:?}", "🔒".cyan(), self.sandbox_mode);
970 }
971 println!();
972
973 let steps = &self.workflow.steps;
974 let total = steps.len();
975 for (i, step) in steps.iter().enumerate() {
976 let is_last = i + 1 == total;
977 let branch = if is_last { "└──" } else { "├──" };
978 let config = self.resolve_config(step);
979
980 let sandbox_indicator = if self.should_sandbox_step(&step.step_type) {
981 " 🐳"
982 } else {
983 ""
984 };
985
986 let async_indicator = if step.async_exec == Some(true) {
987 " ⚡"
988 } else {
989 ""
990 };
991
992 println!(
993 "{} {} {}{}{}",
994 branch.dimmed(),
995 step.name.bold(),
996 format!("[{}]", step.step_type).cyan(),
997 sandbox_indicator,
998 async_indicator
999 );
1000
1001 let indent = if is_last { " " } else { "│ " };
1002 self.print_step_details(step, &config, indent);
1003
1004 if !is_last {
1005 println!("│");
1006 }
1007 }
1008 }
1009
1010 fn print_step_details(&self, step: &StepDef, config: &StepConfig, indent: &str) {
1011 use colored::Colorize;
1012
1013 match step.step_type {
1014 StepType::Cmd => {
1015 if let Some(ref run) = step.run {
1016 let preview = truncate(run, 80);
1017 println!("{} run: {}", indent, preview.dimmed());
1018 }
1019 }
1020 StepType::Agent | StepType::Chat => {
1021 if let Some(ref prompt) = step.prompt {
1022 let preview = truncate(&prompt.replace('\n', " "), 80);
1023 println!("{} prompt: {}", indent, preview.dimmed());
1024 }
1025 if let Some(model) = config.get_str("model") {
1026 println!("{} model: {}", indent, model.dimmed());
1027 }
1028 }
1029 StepType::Gate => {
1030 if let Some(ref cond) = step.condition {
1031 println!("{} condition: {}", indent, cond.dimmed());
1032 }
1033 println!(
1034 "{} on_pass: {} / on_fail: {}",
1035 indent,
1036 step.on_pass.as_deref().unwrap_or("continue").dimmed(),
1037 step.on_fail.as_deref().unwrap_or("continue").dimmed()
1038 );
1039 }
1040 StepType::Repeat => {
1041 let scope_name = step.scope.as_deref().unwrap_or("<none>");
1042 let max_iter = step.max_iterations.unwrap_or(1);
1043 println!("{} scope: {}", indent, scope_name.dimmed());
1044 println!(
1045 "{} max_iterations: {}",
1046 indent,
1047 max_iter.to_string().dimmed()
1048 );
1049 self.print_scope_steps(scope_name, indent);
1050 }
1051 StepType::Map => {
1052 let scope_name = step.scope.as_deref().unwrap_or("<none>");
1053 let items = step.items.as_deref().unwrap_or("<none>");
1054 println!("{} items: {}", indent, items.dimmed());
1055 println!("{} scope: {}", indent, scope_name.dimmed());
1056 if let Some(p) = step.parallel {
1057 println!("{} parallel: {}", indent, p.to_string().dimmed());
1058 }
1059 self.print_scope_steps(scope_name, indent);
1060 }
1061 StepType::Call => {
1062 let scope_name = step.scope.as_deref().unwrap_or("<none>");
1063 println!("{} scope: {}", indent, scope_name.dimmed());
1064 self.print_scope_steps(scope_name, indent);
1065 }
1066 StepType::Parallel => {
1067 if let Some(ref sub_steps) = step.steps {
1068 println!("{} parallel steps:", indent);
1069 for sub in sub_steps {
1070 println!(
1071 "{} • {} [{}]",
1072 indent,
1073 sub.name.bold(),
1074 sub.step_type.to_string().cyan()
1075 );
1076 }
1077 }
1078 }
1079 StepType::Template => {
1080 if let Some(ref run) = step.run {
1081 println!("{} template: {}", indent, run.dimmed());
1082 }
1083 }
1084 StepType::Script => {
1085 if let Some(ref run) = step.run {
1086 let preview = truncate(&run.replace('\n', " "), 80);
1087 println!("{} script: {}", indent, preview.dimmed());
1088 }
1089 }
1090 }
1091
1092 if let Some(t) = config.get_str("timeout") {
1093 println!("{} timeout: {}", indent, t.dimmed());
1094 }
1095 }
1096
1097 fn print_scope_steps(&self, scope_name: &str, indent: &str) {
1098 use colored::Colorize;
1099 if let Some(scope) = self.workflow.scopes.get(scope_name) {
1100 println!("{} scope steps:", indent);
1101 for step in &scope.steps {
1102 println!(
1103 "{} • {} [{}]",
1104 indent,
1105 step.name.bold(),
1106 step.step_type.to_string().cyan()
1107 );
1108 }
1109 }
1110 }
1111
1112 fn resolve_config(&self, step_def: &StepDef) -> StepConfig {
1113 self.config_manager
1114 .resolve(&step_def.name, &step_def.step_type, &step_def.config)
1115 }
1116
1117 fn spawn_async_step(&self, step_def: &StepDef) -> JoinHandle<Result<StepOutput, StepError>> {
1123 let step = step_def.clone();
1124 let config = self.resolve_config(step_def);
1125 let target = self
1126 .context
1127 .get_var("target")
1128 .and_then(|v| v.as_str())
1129 .unwrap_or("")
1130 .to_string();
1131
1132 tokio::spawn(async move {
1133 let ctx = Context::new(target, HashMap::new());
1134 match step.step_type {
1135 StepType::Cmd => CmdExecutor.execute(&step, &config, &ctx).await,
1136 StepType::Agent => AgentExecutor.execute(&step, &config, &ctx).await,
1137 StepType::Script => ScriptExecutor.execute(&step, &config, &ctx).await,
1138 _ => Err(StepError::Fail(format!(
1139 "Async execution not supported for step type '{}'",
1140 step.step_type
1141 ))),
1142 }
1143 })
1144 }
1145
1146 async fn await_pending_deps(&mut self, step_def: &StepDef) -> Result<(), StepError> {
1149 let pattern = Regex::new(r"steps\.(\w+)\.").unwrap();
1150
1151 let mut templates: Vec<String> = Vec::new();
1153 if let Some(ref run) = step_def.run {
1154 templates.push(run.clone());
1155 }
1156 if let Some(ref prompt) = step_def.prompt {
1157 templates.push(prompt.clone());
1158 }
1159 if let Some(ref condition) = step_def.condition {
1160 templates.push(condition.clone());
1161 }
1162
1163 let mut deps: Vec<String> = Vec::new();
1165 for tmpl in &templates {
1166 for cap in pattern.captures_iter(tmpl) {
1167 let name = cap[1].to_string();
1168 if self.pending_futures.contains_key(&name) && !deps.contains(&name) {
1169 deps.push(name);
1170 }
1171 }
1172 }
1173
1174 for name in deps {
1176 self.await_pending_step(&name).await?;
1177 }
1178
1179 Ok(())
1180 }
1181
1182 async fn await_pending_step(&mut self, name: &str) -> Result<(), StepError> {
1184 if let Some(handle) = self.pending_futures.remove(name) {
1185 match handle.await {
1186 Ok(Ok(output)) => {
1187 self.context.store(name, output.clone());
1188 self.step_records.push(StepRecord {
1189 name: name.to_string(),
1190 step_type: "async".to_string(),
1191 status: "ok".to_string(),
1192 duration_secs: 0.0,
1193 output_summary: truncate(output.text(), 100),
1194 input_tokens: None,
1195 output_tokens: None,
1196 cost_usd: None,
1197 sandboxed: false,
1198 });
1199 }
1200 Ok(Err(e)) => {
1201 return Err(StepError::Fail(format!(
1202 "Async step '{}' failed: {e}",
1203 name
1204 )));
1205 }
1206 Err(e) => {
1207 return Err(StepError::Fail(format!(
1208 "Async step '{}' panicked: {e}",
1209 name
1210 )));
1211 }
1212 }
1213 }
1214 Ok(())
1215 }
1216}
1217
1218fn token_stats(output: &StepOutput) -> (Option<u64>, Option<u64>, Option<f64>) {
1220 match output {
1221 StepOutput::Agent(o) => (
1222 Some(o.stats.input_tokens),
1223 Some(o.stats.output_tokens),
1224 Some(o.stats.cost_usd),
1225 ),
1226 StepOutput::Chat(o) => (Some(o.input_tokens), Some(o.output_tokens), None),
1227 _ => (None, None, None),
1228 }
1229}
1230
1231fn parse_step_output(output: StepOutput, step_def: &StepDef) -> Result<StepOutput, StepError> {
1234 let output_type = match &step_def.output_type {
1235 Some(t) => t,
1236 None => return Ok(output),
1237 };
1238
1239 if *output_type == OutputType::Text {
1240 return Ok(output);
1241 }
1242
1243 let text = output.text().trim().to_string();
1244
1245 match output_type {
1246 OutputType::Integer => {
1247 text.parse::<i64>()
1248 .map_err(|_| StepError::Fail(format!("Failed to parse '{}' as integer", text)))?;
1249 }
1250 OutputType::Json => {
1251 serde_json::from_str::<serde_json::Value>(&text)
1252 .map_err(|e| StepError::Fail(format!("Failed to parse output as JSON: {e}")))?;
1253 }
1254 OutputType::Boolean => match text.to_lowercase().as_str() {
1255 "true" | "1" | "yes" | "false" | "0" | "no" => {}
1256 _ => {
1257 return Err(StepError::Fail(format!(
1258 "Failed to parse '{}' as boolean",
1259 text
1260 )));
1261 }
1262 },
1263 OutputType::Lines | OutputType::Text => {}
1264 }
1265
1266 Ok(output)
1267}
1268
1269fn extract_parsed_value(output: &StepOutput, step_def: &StepDef) -> Option<ParsedValue> {
1272 let output_type = step_def.output_type.as_ref()?;
1273
1274 let text = output.text().trim().to_string();
1275
1276 let parsed = match output_type {
1277 OutputType::Text => ParsedValue::Text(text),
1278 OutputType::Integer => ParsedValue::Integer(text.parse::<i64>().ok()?),
1279 OutputType::Json => {
1280 let val = serde_json::from_str::<serde_json::Value>(&text).ok()?;
1281 ParsedValue::Json(val)
1282 }
1283 OutputType::Lines => {
1284 let lines: Vec<String> = text
1285 .lines()
1286 .filter(|l| !l.is_empty())
1287 .map(|l| l.to_string())
1288 .collect();
1289 ParsedValue::Lines(lines)
1290 }
1291 OutputType::Boolean => {
1292 let b = matches!(text.to_lowercase().as_str(), "true" | "1" | "yes");
1293 ParsedValue::Boolean(b)
1294 }
1295 };
1296
1297 Some(parsed)
1298}
1299
1300fn truncate(s: &str, max: usize) -> String {
1303 let char_count = s.chars().count();
1304 if char_count <= max {
1305 s.to_string()
1306 } else {
1307 let end: usize = s.char_indices().nth(max).map(|(i, _)| i).unwrap_or(s.len());
1308 format!("{}…", &s[..end])
1309 }
1310}
1311
1312async fn detect_stack_if_registry_exists() -> Option<StackInfo> {
1315 let registry_path = std::path::Path::new("prompts/registry.yaml");
1316 if !registry_path.exists() {
1317 return None;
1318 }
1319 match Registry::from_file(registry_path).await {
1320 Ok(registry) => {
1321 let workspace = std::path::Path::new(".");
1322 match StackDetector::detect(®istry, workspace).await {
1323 Ok(info) => {
1324 tracing::info!(stack = %info.name, "Detected project stack");
1325 Some(info)
1326 }
1327 Err(e) => {
1328 tracing::debug!("Stack detection failed: {e}");
1329 None
1330 }
1331 }
1332 }
1333 Err(e) => {
1334 tracing::warn!("Failed to parse prompts/registry.yaml: {e}");
1335 None
1336 }
1337 }
1338}
1339
1340#[cfg(test)]
1341mod tests {
1342 use super::*;
1343 use crate::workflow::parser;
1344
1345 static CWD_LOCK: std::sync::LazyLock<std::sync::Mutex<()>> =
1347 std::sync::LazyLock::new(|| std::sync::Mutex::new(()));
1348
1349 #[tokio::test]
1350 async fn engine_runs_sequential_cmd_steps() {
1351 let yaml = r#"
1352name: test
1353steps:
1354 - name: step1
1355 type: cmd
1356 run: "echo first"
1357 - name: step2
1358 type: cmd
1359 run: "echo second"
1360"#;
1361 let wf = parser::parse_str(yaml).unwrap();
1362 let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1363 let result = engine.run().await.unwrap();
1364 assert_eq!(result.text().trim(), "second");
1365 assert!(engine.context.get_step("step1").is_some());
1366 assert_eq!(
1367 engine.context.get_step("step1").unwrap().text().trim(),
1368 "first"
1369 );
1370 }
1371
1372 #[tokio::test]
1373 async fn engine_exposes_step_output_to_next_step() {
1374 let yaml = r#"
1375name: test
1376steps:
1377 - name: produce
1378 type: cmd
1379 run: "echo hello_world"
1380 - name: consume
1381 type: cmd
1382 run: "echo {{ steps.produce.stdout }}"
1383"#;
1384 let wf = parser::parse_str(yaml).unwrap();
1385 let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1386 let result = engine.run().await.unwrap();
1387 assert!(result.text().contains("hello_world"));
1388 }
1389
1390 #[tokio::test]
1391 async fn engine_collects_step_records_in_json_mode() {
1392 let yaml = r#"
1393name: json-test
1394steps:
1395 - name: alpha
1396 type: cmd
1397 run: "echo alpha"
1398 - name: beta
1399 type: cmd
1400 run: "echo beta"
1401"#;
1402 let wf = parser::parse_str(yaml).unwrap();
1403 let opts = EngineOptions {
1404 json: true,
1405 ..Default::default()
1406 };
1407 let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts).await;
1408 engine.run().await.unwrap();
1409
1410 let records = engine.step_records();
1411 assert_eq!(records.len(), 2);
1412 assert_eq!(records[0].name, "alpha");
1413 assert_eq!(records[0].status, "ok");
1414 assert!(!records[0].sandboxed);
1415 assert_eq!(records[1].name, "beta");
1416 assert_eq!(records[1].status, "ok");
1417 }
1418
1419 #[tokio::test]
1420 async fn json_output_includes_sandbox_mode() {
1421 let yaml = r#"
1422name: json-output-test
1423steps:
1424 - name: greet
1425 type: cmd
1426 run: "echo hello"
1427"#;
1428 let wf = parser::parse_str(yaml).unwrap();
1429 let opts = EngineOptions {
1430 json: true,
1431 ..Default::default()
1432 };
1433 let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts).await;
1434 let start = Instant::now();
1435 engine.run().await.unwrap();
1436 let out = engine.json_output("success", start.elapsed());
1437
1438 let json = serde_json::to_string(&out).unwrap();
1439 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1440
1441 assert_eq!(parsed["workflow_name"], "json-output-test");
1442 assert_eq!(parsed["status"], "success");
1443 assert_eq!(parsed["sandbox_mode"], "Disabled");
1444 assert!(parsed["steps"].is_array());
1445 assert_eq!(parsed["steps"][0]["name"], "greet");
1446 }
1447
1448 #[tokio::test]
1449 async fn should_sandbox_step_logic() {
1450 let yaml = r#"
1451name: test
1452steps:
1453 - name: s
1454 type: cmd
1455 run: "echo test"
1456"#;
1457 let wf = parser::parse_str(yaml).unwrap();
1458
1459 let engine = Engine::new(wf.clone(), "".to_string(), HashMap::new(), false, true).await;
1461 assert!(!engine.should_sandbox_step(&StepType::Cmd));
1462 assert!(!engine.should_sandbox_step(&StepType::Agent));
1463 assert!(!engine.should_sandbox_step(&StepType::Gate));
1464
1465 let opts = EngineOptions {
1467 sandbox_mode: SandboxMode::FullWorkflow,
1468 quiet: true,
1469 ..Default::default()
1470 };
1471 let engine = Engine::with_options(wf.clone(), "".to_string(), HashMap::new(), opts).await;
1472 assert!(engine.should_sandbox_step(&StepType::Cmd));
1473 assert!(engine.should_sandbox_step(&StepType::Agent));
1474 assert!(!engine.should_sandbox_step(&StepType::Gate));
1475
1476 let opts = EngineOptions {
1478 sandbox_mode: SandboxMode::AgentOnly,
1479 quiet: true,
1480 ..Default::default()
1481 };
1482 let engine = Engine::with_options(wf.clone(), "".to_string(), HashMap::new(), opts).await;
1483 assert!(!engine.should_sandbox_step(&StepType::Cmd));
1484 assert!(engine.should_sandbox_step(&StepType::Agent));
1485 assert!(!engine.should_sandbox_step(&StepType::Gate));
1486 }
1487
1488 #[tokio::test]
1489 async fn dry_run_does_not_panic() {
1490 let yaml = r#"
1491name: dry-run-test
1492scopes:
1493 lint_fix:
1494 steps:
1495 - name: lint
1496 type: cmd
1497 run: "npm run lint"
1498 - name: fix_lint
1499 type: agent
1500 prompt: "Fix lint errors"
1501steps:
1502 - name: setup
1503 type: cmd
1504 run: "echo setup"
1505 - name: validate
1506 type: gate
1507 condition: "{{ steps.setup.exit_code == 0 }}"
1508 on_pass: continue
1509 on_fail: fail
1510 - name: lint_gate
1511 type: repeat
1512 scope: lint_fix
1513 max_iterations: 3
1514"#;
1515 let wf = crate::workflow::parser::parse_str(yaml).unwrap();
1516 let engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1517 engine.dry_run();
1518 }
1519
1520 #[tokio::test]
1521 async fn dry_run_all_step_types() {
1522 let yaml = r#"
1523name: all-types
1524steps:
1525 - name: c
1526 type: cmd
1527 run: "ls"
1528 - name: g
1529 type: gate
1530 condition: "{{ true }}"
1531 on_pass: continue
1532 - name: p
1533 type: parallel
1534 steps:
1535 - name: p1
1536 type: cmd
1537 run: "echo p1"
1538"#;
1539 let wf = crate::workflow::parser::parse_str(yaml).unwrap();
1540 let engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1541 engine.dry_run();
1542 }
1543
1544 #[test]
1545 fn truncate_helper() {
1546 assert_eq!(truncate("hello", 10), "hello");
1547 assert_eq!(truncate("hello world", 5), "hello…");
1548 }
1549
1550 #[tokio::test]
1551 async fn resume_fails_when_no_state_file() {
1552 let yaml = r#"
1553name: no-state-workflow-xyz-unique
1554steps:
1555 - name: step1
1556 type: cmd
1557 run: "echo 1"
1558"#;
1559 let wf = crate::workflow::parser::parse_str(yaml).unwrap();
1560 let opts = EngineOptions {
1561 resume_from: Some("step1".to_string()),
1562 quiet: true,
1563 ..Default::default()
1564 };
1565 let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts).await;
1566 let err = engine.run().await.unwrap_err();
1567 assert!(
1568 err.to_string().contains("No state file found"),
1569 "Expected 'No state file found' but got: {err}"
1570 );
1571 }
1572
1573 #[tokio::test]
1574 async fn resume_fails_for_unknown_step() {
1575 let workflow_name = "test-resume-unknown-step";
1576 let state = WorkflowState::new(workflow_name);
1577 let tmp_path = format!("/tmp/minion-{workflow_name}-20991231235959.state.json");
1578 let path = PathBuf::from(&tmp_path);
1579 state.save(&path).unwrap();
1580
1581 let yaml = format!(
1582 r#"
1583name: {workflow_name}
1584steps:
1585 - name: step1
1586 type: cmd
1587 run: "echo 1"
1588"#
1589 );
1590 let wf = crate::workflow::parser::parse_str(&yaml).unwrap();
1591 let opts = EngineOptions {
1592 resume_from: Some("nonexistent_step".to_string()),
1593 quiet: true,
1594 ..Default::default()
1595 };
1596 let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts).await;
1597 let err = engine.run().await.unwrap_err();
1598 assert!(
1599 err.to_string().contains("not found in workflow"),
1600 "Expected 'not found in workflow' but got: {err}"
1601 );
1602
1603 let _ = std::fs::remove_file(&path);
1604 }
1605
1606 #[tokio::test]
1607 async fn safe_accessor_returns_empty_for_missing_step() {
1608 let yaml = r#"
1609name: test-safe-accessor
1610steps:
1611 - name: use_missing
1612 type: cmd
1613 run: "echo '{{ missing.output? }}'"
1614"#;
1615 let wf = parser::parse_str(yaml).unwrap();
1616 let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1617 let result = engine.run().await.unwrap();
1618 assert_eq!(result.text().trim(), "");
1620 }
1621
1622 #[tokio::test]
1623 async fn safe_accessor_returns_value_when_present() {
1624 let yaml = r#"
1625name: test-safe-accessor-present
1626steps:
1627 - name: produce
1628 type: cmd
1629 run: "echo hello"
1630 - name: consume
1631 type: cmd
1632 run: "echo '{{ produce.output? }}'"
1633"#;
1634 let wf = parser::parse_str(yaml).unwrap();
1635 let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1636 let result = engine.run().await.unwrap();
1637 assert!(result.text().contains("hello"));
1638 }
1639
1640 #[tokio::test]
1641 async fn strict_accessor_fails_when_step_missing() {
1642 let yaml = r#"
1643name: test-strict-accessor-fail
1644steps:
1645 - name: use_missing
1646 type: cmd
1647 run: "echo '{{ nonexistent.output! }}'"
1648"#;
1649 let wf = parser::parse_str(yaml).unwrap();
1650 let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1651 let err = engine.run().await.unwrap_err();
1652 assert!(err.to_string().contains("strict access"), "{err}");
1653 }
1654
1655 #[tokio::test]
1656 async fn output_type_integer_parses_number() {
1657 let yaml = r#"
1658name: test-parse
1659steps:
1660 - name: count
1661 type: cmd
1662 run: "echo 42"
1663 output_type: integer
1664 - name: use_count
1665 type: cmd
1666 run: "echo {{ count.output }}"
1667"#;
1668 let wf = parser::parse_str(yaml).unwrap();
1669 let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1670 let result = engine.run().await.unwrap();
1671 assert_eq!(result.text().trim(), "42");
1672 }
1673
1674 #[tokio::test]
1675 async fn output_type_integer_fails_on_non_number() {
1676 let yaml = r#"
1677name: test-parse-fail
1678steps:
1679 - name: count
1680 type: cmd
1681 run: "echo not_a_number"
1682 output_type: integer
1683"#;
1684 let wf = parser::parse_str(yaml).unwrap();
1685 let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1686 let err = engine.run().await.unwrap_err();
1687 assert!(err.to_string().contains("integer"), "{err}");
1688 }
1689
1690 #[tokio::test]
1691 async fn output_type_json_allows_dot_access() {
1692 let yaml = r#"
1693name: test-json
1694steps:
1695 - name: scan
1696 type: cmd
1697 run: "echo '{\"count\": 5}'"
1698 output_type: json
1699 - name: use_scan
1700 type: cmd
1701 run: "echo {{ scan.output.count }}"
1702"#;
1703 let wf = parser::parse_str(yaml).unwrap();
1704 let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1705 let result = engine.run().await.unwrap();
1706 assert_eq!(result.text().trim(), "5");
1707 }
1708
1709 #[tokio::test]
1710 async fn output_type_lines_allows_length_filter() {
1711 let yaml = r#"
1712name: test-lines
1713steps:
1714 - name: files
1715 type: cmd
1716 run: "printf 'a.rs\nb.rs\nc.rs'"
1717 output_type: lines
1718 - name: count_files
1719 type: cmd
1720 run: "echo {{ files.output | length }}"
1721"#;
1722 let wf = parser::parse_str(yaml).unwrap();
1723 let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1724 let result = engine.run().await.unwrap();
1725 assert_eq!(result.text().trim(), "3");
1726 }
1727
1728 #[tokio::test]
1731 async fn async_step_is_spawned_and_completes() {
1732 let yaml = r#"
1733name: async-test
1734steps:
1735 - name: bg_task
1736 type: cmd
1737 run: "echo async_result"
1738 async_exec: true
1739 - name: sync_step
1740 type: cmd
1741 run: "echo sync_result"
1742"#;
1743 let wf = parser::parse_str(yaml).unwrap();
1744 let opts = EngineOptions {
1745 quiet: true,
1746 ..Default::default()
1747 };
1748 let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts).await;
1749 let result = engine.run().await.unwrap();
1750 assert!(result.text().contains("sync_result"));
1752 let records = engine.step_records();
1754 assert!(
1755 records.iter().any(|r| r.name == "bg_task"),
1756 "bg_task should be in records"
1757 );
1758 }
1759
1760 #[tokio::test]
1761 async fn dry_run_shows_async_lightning_indicator() {
1762 let yaml = r#"
1763name: dry-async
1764steps:
1765 - name: fast_bg
1766 type: cmd
1767 run: "echo bg"
1768 async_exec: true
1769 - name: normal
1770 type: cmd
1771 run: "echo normal"
1772"#;
1773 let wf = parser::parse_str(yaml).unwrap();
1775 let engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1776 engine.dry_run();
1778 }
1779
1780 #[tokio::test]
1781 async fn should_sandbox_step_script_always_false() {
1782 let yaml = r#"
1783name: test
1784steps:
1785 - name: s
1786 type: cmd
1787 run: "echo test"
1788"#;
1789 let wf = parser::parse_str(yaml).unwrap();
1790
1791 let opts = EngineOptions {
1793 sandbox_mode: SandboxMode::FullWorkflow,
1794 quiet: true,
1795 ..Default::default()
1796 };
1797 let engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts).await;
1798 assert!(!engine.should_sandbox_step(&StepType::Script));
1799 }
1800
1801 #[tokio::test]
1804 async fn multiple_async_steps_all_complete_by_workflow_end() {
1805 let yaml = r#"
1806name: multi-async
1807steps:
1808 - name: task_a
1809 type: cmd
1810 run: "echo result_a"
1811 async_exec: true
1812 - name: task_b
1813 type: cmd
1814 run: "echo result_b"
1815 async_exec: true
1816 - name: sync_done
1817 type: cmd
1818 run: "echo done"
1819"#;
1820 let wf = parser::parse_str(yaml).unwrap();
1821 let opts = EngineOptions {
1822 quiet: true,
1823 ..Default::default()
1824 };
1825 let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts).await;
1826 engine.run().await.unwrap();
1827
1828 let records = engine.step_records();
1829 assert!(
1830 records.iter().any(|r| r.name == "task_a"),
1831 "task_a should be recorded"
1832 );
1833 assert!(
1834 records.iter().any(|r| r.name == "task_b"),
1835 "task_b should be recorded"
1836 );
1837 assert!(
1838 records.iter().any(|r| r.name == "sync_done"),
1839 "sync_done should be recorded"
1840 );
1841 }
1842
1843 #[tokio::test]
1846 async fn engine_dispatches_script_step() {
1847 let yaml = r#"
1848name: script-dispatch
1849steps:
1850 - name: calc
1851 type: script
1852 run: |
1853 let x = 6 * 7;
1854 x.to_string()
1855"#;
1856 let wf = parser::parse_str(yaml).unwrap();
1857 let opts = EngineOptions {
1858 quiet: true,
1859 ..Default::default()
1860 };
1861 let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts).await;
1862 let result = engine.run().await.unwrap();
1863 assert_eq!(result.text().trim(), "42");
1864 }
1865
1866 #[tokio::test]
1867 #[allow(clippy::await_holding_lock)]
1868 async fn stack_context_injected_when_registry_exists() {
1869 let _lock = CWD_LOCK.lock().unwrap_or_else(|e| e.into_inner());
1870
1871 let dir = tempfile::tempdir().unwrap();
1872 let orig_dir = std::env::current_dir().unwrap();
1873
1874 let prompts_dir = dir.path().join("prompts");
1876 std::fs::create_dir_all(&prompts_dir).unwrap();
1877 let registry_yaml = r#"
1878version: 1
1879detection_order:
1880 - rust
1881stacks:
1882 rust:
1883 file_markers:
1884 - Cargo.toml
1885 tools:
1886 lint: "cargo clippy"
1887 test: "cargo test"
1888 build: "cargo build"
1889 install: "cargo fetch"
1890"#;
1891 std::fs::write(prompts_dir.join("registry.yaml"), registry_yaml).unwrap();
1892 std::fs::write(dir.path().join("Cargo.toml"), "[package]").unwrap();
1894
1895 std::env::set_current_dir(dir.path()).unwrap();
1896
1897 let yaml = "name: test\nsteps:\n - name: s\n type: cmd\n run: \"echo hi\"\n";
1898 let wf = parser::parse_str(yaml).unwrap();
1899 let engine = Engine::with_options(
1900 wf,
1901 "target".to_string(),
1902 HashMap::new(),
1903 EngineOptions {
1904 quiet: true,
1905 ..Default::default()
1906 },
1907 )
1908 .await;
1909
1910 let result = engine.context.render_template("{{ stack.name }}").unwrap();
1911 assert_eq!(result, "rust");
1912
1913 let lint = engine
1914 .context
1915 .render_template("{{ stack.tools.lint }}")
1916 .unwrap();
1917 assert_eq!(lint, "cargo clippy");
1918
1919 assert!(engine.stack_info.is_some());
1920 assert_eq!(engine.stack_info.as_ref().unwrap().name, "rust");
1921
1922 std::env::set_current_dir(orig_dir).unwrap();
1923 }
1924
1925 #[tokio::test]
1926 #[allow(clippy::await_holding_lock)]
1927 async fn stack_context_skipped_when_no_registry() {
1928 let _lock = CWD_LOCK.lock().unwrap_or_else(|e| e.into_inner());
1929
1930 let yaml = "name: test\nsteps:\n - name: s\n type: cmd\n run: \"echo hi\"\n";
1931 let wf = parser::parse_str(yaml).unwrap();
1932 let dir = tempfile::tempdir().unwrap();
1934 let orig_dir = std::env::current_dir().unwrap();
1935 std::env::set_current_dir(dir.path()).unwrap();
1936
1937 let engine = Engine::with_options(
1938 wf,
1939 "target".to_string(),
1940 HashMap::new(),
1941 EngineOptions {
1942 quiet: true,
1943 ..Default::default()
1944 },
1945 )
1946 .await;
1947 assert!(engine.stack_info.is_none());
1949 assert!(engine.context.get_var("stack").is_none());
1951
1952 std::env::set_current_dir(orig_dir).unwrap();
1953 }
1954}