1pub mod context;
2pub mod state;
3mod template;
4
5use std::collections::HashMap;
6use std::path::PathBuf;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use anyhow::{bail, Result};
11use colored::Colorize;
12use regex::Regex;
13use serde::Serialize;
14use tokio::sync::Mutex;
15use tokio::task::JoinHandle;
16
17use crate::cli::display;
18use crate::config::{ConfigManager, StepConfig};
19use crate::control_flow::ControlFlow;
20use crate::error::StepError;
21use crate::events::subscribers::{FileSubscriber, WebhookSubscriber};
22use crate::events::types::Event;
23use crate::events::EventBus;
24use crate::plugins::loader::PluginLoader;
25use crate::plugins::registry::PluginRegistry;
26use crate::prompts::{
27 detector::{StackDetector, StackInfo},
28 registry::Registry,
29};
30use crate::sandbox::config::SandboxConfig;
31use crate::sandbox::docker::DockerSandbox;
32use crate::sandbox::SandboxMode;
33use crate::steps::*;
34use crate::steps::{
35 agent::AgentExecutor, call::CallExecutor, chat::ChatExecutor, cmd::CmdExecutor,
36 gate::GateExecutor, map::MapExecutor, parallel::ParallelExecutor, repeat::RepeatExecutor,
37 script::ScriptExecutor, template_step::TemplateStepExecutor,
38};
39use crate::workflow::schema::{OutputType, StepDef, StepType, WorkflowDef};
40use context::Context;
41use state::WorkflowState;
42
43#[derive(Debug, Default)]
45pub struct EngineOptions {
46 pub verbose: bool,
47 pub quiet: bool,
48 pub json: bool,
50 pub dry_run: bool,
52 pub resume_from: Option<String>,
54 pub sandbox_mode: SandboxMode,
56}
57
58#[derive(Debug, Clone, Serialize)]
60pub struct StepRecord {
61 pub name: String,
62 pub step_type: String,
63 pub status: String,
64 pub duration_secs: f64,
65 pub output_summary: String,
66 #[serde(skip_serializing_if = "Option::is_none")]
67 pub input_tokens: Option<u64>,
68 #[serde(skip_serializing_if = "Option::is_none")]
69 pub output_tokens: Option<u64>,
70 #[serde(skip_serializing_if = "Option::is_none")]
71 pub cost_usd: Option<f64>,
72 #[serde(skip_serializing_if = "std::ops::Not::not")]
74 pub sandboxed: bool,
75}
76
77#[derive(Debug, Serialize)]
79pub struct WorkflowJsonOutput {
80 pub workflow_name: String,
81 pub status: String,
82 pub sandbox_mode: String,
83 pub steps: Vec<StepRecord>,
84 pub total_duration_secs: f64,
85 pub total_tokens: u64,
86 pub total_cost_usd: f64,
87}
88
89pub struct Engine {
90 pub workflow: WorkflowDef,
91 pub context: Context,
92 config_manager: ConfigManager,
93 pub verbose: bool,
94 pub quiet: bool,
95 pub json: bool,
96 pub dry_run: bool,
97 resume_from: Option<String>,
98 sandbox_mode: SandboxMode,
99 sandbox: SharedSandbox,
101 step_records: Vec<StepRecord>,
102 state: Option<WorkflowState>,
103 state_file: Option<PathBuf>,
104 pending_futures: HashMap<String, JoinHandle<Result<StepOutput, StepError>>>,
106 plugin_registry: Arc<Mutex<PluginRegistry>>,
108 pub event_bus: EventBus,
110 pub stack_info: Option<StackInfo>,
112}
113
114impl Engine {
115 pub async fn new(
116 workflow: WorkflowDef,
117 target: String,
118 vars: HashMap<String, serde_json::Value>,
119 verbose: bool,
120 quiet: bool,
121 ) -> Self {
122 let options = EngineOptions {
123 verbose,
124 quiet,
125 ..Default::default()
126 };
127 Self::with_options(workflow, target, vars, options).await
128 }
129
130 pub async fn with_options(
131 workflow: WorkflowDef,
132 target: String,
133 vars: HashMap<String, serde_json::Value>,
134 options: EngineOptions,
135 ) -> Self {
136 let mut context = Context::new(target, vars.clone());
137 let args_obj: serde_json::Map<String, serde_json::Value> = vars.into_iter().collect();
139 context.insert_var("args", serde_json::Value::Object(args_obj));
140 let config_manager = ConfigManager::new(workflow.config.clone());
141 let quiet = options.quiet || options.json;
143
144 let mut registry = PluginRegistry::new();
146 for plugin_cfg in &workflow.config.plugins {
147 match PluginLoader::load_plugin(&plugin_cfg.path) {
148 Ok(plugin) => {
149 tracing::info!(name = %plugin_cfg.name, path = %plugin_cfg.path, "Loaded plugin");
150 registry.register(plugin);
151 }
152 Err(e) => {
153 tracing::warn!(
154 name = %plugin_cfg.name,
155 path = %plugin_cfg.path,
156 error = %e,
157 "Failed to load plugin"
158 );
159 }
160 }
161 }
162
163 let mut event_bus = EventBus::new();
165 if let Some(ref events_cfg) = workflow.config.events {
166 if let Some(ref webhook_url) = events_cfg.webhook {
167 event_bus.add_subscriber(Box::new(WebhookSubscriber::new(webhook_url.clone())));
168 tracing::info!(url = %webhook_url, "Registered webhook event subscriber");
169 }
170 if let Some(ref file_path) = events_cfg.file {
171 event_bus.add_subscriber(Box::new(FileSubscriber::new(file_path.clone())));
172 tracing::info!(path = %file_path, "Registered file event subscriber");
173 }
174 }
175
176 let stack_info = detect_stack_if_registry_exists().await;
178 if let Some(ref info) = stack_info {
179 let stack_val = serde_json::json!({
180 "name": info.name,
181 "parent": info.parent_chain.first().cloned().unwrap_or_else(|| "_default".to_string()),
182 "tools": {
183 "lint": info.tools.get("lint").cloned().unwrap_or_default(),
184 "test": info.tools.get("test").cloned().unwrap_or_default(),
185 "build": info.tools.get("build").cloned().unwrap_or_default(),
186 "install": info.tools.get("install").cloned().unwrap_or_default(),
187 }
188 });
189 context.insert_var("stack", stack_val);
190 context.stack_info = Some(info.clone());
191 }
192 if let Some(ref pd) = workflow.prompts_dir {
194 context.prompts_dir = std::path::PathBuf::from(pd);
195 }
196
197 Self {
198 workflow,
199 context,
200 config_manager,
201 verbose: options.verbose,
202 quiet,
203 json: options.json,
204 dry_run: options.dry_run,
205 resume_from: options.resume_from,
206 sandbox_mode: options.sandbox_mode,
207 sandbox: None,
208 step_records: Vec::new(),
209 state: None,
210 state_file: None,
211 pending_futures: HashMap::new(),
212 plugin_registry: Arc::new(Mutex::new(registry)),
213 event_bus,
214 stack_info,
215 }
216 }
217
218 pub fn step_records(&self) -> &[StepRecord] {
220 &self.step_records
221 }
222
223 pub fn json_output(&self, status: &str, total_duration: Duration) -> WorkflowJsonOutput {
225 let total_tokens: u64 = self
226 .step_records
227 .iter()
228 .map(|r| r.input_tokens.unwrap_or(0) + r.output_tokens.unwrap_or(0))
229 .sum();
230 let total_cost: f64 = self.step_records.iter().filter_map(|r| r.cost_usd).sum();
231
232 WorkflowJsonOutput {
233 workflow_name: self.workflow.name.clone(),
234 status: status.to_string(),
235 sandbox_mode: format!("{:?}", self.sandbox_mode),
236 steps: self.step_records.clone(),
237 total_duration_secs: total_duration.as_secs_f64(),
238 total_tokens,
239 total_cost_usd: total_cost,
240 }
241 }
242
243 async fn sandbox_up(&mut self) -> Result<()> {
248 let sandbox_config = SandboxConfig::from_global_config(&self.workflow.config.global);
249 let workspace = std::env::current_dir()
250 .map(|p| p.to_string_lossy().to_string())
251 .unwrap_or_else(|_| ".".to_string());
252
253 let mut docker = DockerSandbox::new(sandbox_config, &workspace);
254
255 if !self.quiet {
256 println!(" {} Creating Docker sandbox container…", "🐳".cyan());
257 }
258
259 let t0 = Instant::now();
260 docker.create().await?;
261 let create_ms = t0.elapsed().as_millis();
262
263 let t1 = Instant::now();
264 docker.copy_workspace(&workspace).await?;
265 let copy_ms = t1.elapsed().as_millis();
266
267 let t2 = Instant::now();
272 let _ = docker
273 .run_command(
274 "git config --global --add safe.directory /workspace \
275 && git config --global user.name 'Minion Engine' \
276 && git config --global user.email 'minion@localhost' \
277 && if [ -n \"$GH_TOKEN\" ]; then \
278 git config --global credential.helper '!f() { echo \"password=$GH_TOKEN\"; }; f'; \
279 git config --global credential.https://github.com.username x-access-token; \
280 fi",
281 )
282 .await;
283 let git_ms = t2.elapsed().as_millis();
284
285 let total_ms = t0.elapsed().as_millis();
286
287 if !self.quiet {
288 println!(
289 " {} Sandbox ready — container {:.1}s, copy {:.1}s, git {:.1}s (total {:.1}s)",
290 "🔒".green(),
291 create_ms as f64 / 1000.0,
292 copy_ms as f64 / 1000.0,
293 git_ms as f64 / 1000.0,
294 total_ms as f64 / 1000.0,
295 );
296 }
297
298 tracing::info!(
299 create_ms,
300 copy_ms,
301 git_ms,
302 total_ms,
303 "Sandbox setup complete"
304 );
305
306 self.sandbox = Some(Arc::new(Mutex::new(docker)));
307 Ok(())
308 }
309
310 async fn sandbox_down(&mut self) -> Result<()> {
312 if let Some(sb) = self.sandbox.take() {
313 let mut docker = sb.lock().await;
314
315 let workspace = std::env::current_dir()
316 .map(|p| p.to_string_lossy().to_string())
317 .unwrap_or_else(|_| ".".to_string());
318
319 if !self.quiet {
320 println!(" {} Copying results from sandbox…", "📦".cyan());
321 }
322
323 let t0 = Instant::now();
324 docker.copy_results(&workspace).await?;
325 let copy_back_ms = t0.elapsed().as_millis();
326
327 let t1 = Instant::now();
328 docker.destroy().await?;
329 let destroy_ms = t1.elapsed().as_millis();
330
331 if !self.quiet {
332 println!(
333 " {} Sandbox destroyed — copy-back {:.1}s, destroy {:.1}s",
334 "🗑️ ".dimmed(),
335 copy_back_ms as f64 / 1000.0,
336 destroy_ms as f64 / 1000.0,
337 );
338 }
339
340 tracing::info!(copy_back_ms, destroy_ms, "Sandbox teardown complete");
341 }
342 Ok(())
343 }
344
345 fn should_sandbox_step(&self, step_type: &StepType) -> bool {
347 if *step_type == StepType::Script {
349 return false;
350 }
351 match self.sandbox_mode {
352 SandboxMode::Disabled => false,
353 SandboxMode::FullWorkflow | SandboxMode::Devbox => {
354 matches!(step_type, StepType::Cmd | StepType::Agent)
356 }
357 SandboxMode::AgentOnly => {
358 matches!(step_type, StepType::Agent)
360 }
361 }
362 }
363
364 pub async fn run(&mut self) -> Result<StepOutput> {
367 let state_file = WorkflowState::state_file_path(&self.workflow.name);
369 self.state_file = Some(state_file.clone());
370
371 let mut loaded_state: Option<WorkflowState> = None;
372 if let Some(ref resume_step) = self.resume_from.clone() {
373 match WorkflowState::find_latest(&self.workflow.name) {
374 Some(path) => match WorkflowState::load(&path) {
375 Ok(s) => {
376 let exists = self.workflow.steps.iter().any(|s| &s.name == resume_step);
377 if !exists {
378 bail!(
379 "Resume step '{}' not found in workflow '{}'. \
380 Available steps: {}",
381 resume_step,
382 self.workflow.name,
383 self.workflow
384 .steps
385 .iter()
386 .map(|s| s.name.as_str())
387 .collect::<Vec<_>>()
388 .join(", ")
389 );
390 }
391 if !self.quiet {
392 println!(
393 " {} Resuming from step '{}' (state: {})",
394 "↺".cyan(),
395 resume_step,
396 path.display()
397 );
398 }
399 loaded_state = Some(s);
400 }
401 Err(e) => bail!("Failed to load state file {}: {e}", path.display()),
402 },
403 None => {
404 bail!(
405 "No state file found for workflow '{}'. \
406 Cannot resume. Run the workflow without --resume first.",
407 self.workflow.name
408 );
409 }
410 }
411 }
412
413 let mut current_state = WorkflowState::new(&self.workflow.name);
415
416 if !self.quiet {
418 display::workflow_start(&self.workflow.name);
419 if self.sandbox_mode != SandboxMode::Disabled {
420 println!(" {} Sandbox mode: {:?}", "🔒".cyan(), self.sandbox_mode);
421 }
422 }
423
424 if self.sandbox_mode != SandboxMode::Disabled {
426 self.sandbox_up().await?;
427 }
428
429 self.event_bus
431 .emit(Event::WorkflowStarted {
432 timestamp: chrono::Utc::now(),
433 })
434 .await;
435
436 let start = Instant::now();
437 let steps = self.workflow.steps.clone();
438 let mut last_output = StepOutput::Empty;
439 let mut step_count = 0;
440
441 let resume_from = self.resume_from.clone();
442 let mut resuming = resume_from.is_some();
443
444 let run_result: Result<(), anyhow::Error> = async {
446 for step_def in &steps {
447 if resuming {
449 let is_resume_point = resume_from.as_deref() == Some(&step_def.name);
450 if !is_resume_point {
451 if let Some(ref ls) = loaded_state {
452 if let Some(output) = ls.steps.get(&step_def.name) {
453 self.context.store(&step_def.name, output.clone());
454 if !self.quiet {
455 println!(
456 " {} {} {}",
457 "⏭".yellow(),
458 step_def.name,
459 "(skipped — loaded from state)".dimmed()
460 );
461 }
462 self.step_records.push(StepRecord {
463 name: step_def.name.clone(),
464 step_type: step_def.step_type.to_string(),
465 status: "skipped_resume".to_string(),
466 duration_secs: 0.0,
467 output_summary: truncate(output.text(), 100),
468 input_tokens: None,
469 output_tokens: None,
470 cost_usd: None,
471 sandboxed: false,
472 });
473 }
474 }
475 continue;
476 }
477 resuming = false;
478 }
479
480 if step_def.async_exec == Some(true) {
482 let handle = self.spawn_async_step(step_def);
483 self.pending_futures.insert(step_def.name.clone(), handle);
484 if !self.quiet {
485 println!(
486 " {} {} {} {}",
487 "⚡".yellow(),
488 step_def.name,
489 format!("[{}]", step_def.step_type).cyan(),
490 "(async — spawned)".dimmed()
491 );
492 }
493 step_count += 1;
494 continue;
495 }
496
497 self.await_pending_deps(step_def).await?;
499
500 match self.execute_step(step_def).await {
501 Ok(output) => {
502 current_state
503 .steps
504 .insert(step_def.name.clone(), output.clone());
505 if let Some(ref p) = self.state_file {
506 let _ = current_state.save(p);
507 }
508 last_output = output;
509 step_count += 1;
510 }
511 Err(StepError::ControlFlow(ControlFlow::Skip { message })) => {
512 self.context.store(&step_def.name, StepOutput::Empty);
513 if !self.quiet {
514 let pb = display::step_start(
515 &step_def.name,
516 &step_def.step_type.to_string(),
517 );
518 display::step_skip(&pb, &step_def.name, &message);
519 }
520 self.step_records.push(StepRecord {
521 name: step_def.name.clone(),
522 step_type: step_def.step_type.to_string(),
523 status: "skipped".to_string(),
524 duration_secs: 0.0,
525 output_summary: message.clone(),
526 input_tokens: None,
527 output_tokens: None,
528 cost_usd: None,
529 sandboxed: false,
530 });
531 }
532 Err(StepError::ControlFlow(ControlFlow::Fail { message })) => {
533 if !self.quiet {
534 display::workflow_failed(&step_def.name, &message);
535 }
536 bail!("Step '{}' failed: {}", step_def.name, message);
537 }
538 Err(StepError::ControlFlow(ControlFlow::Break { .. })) => {
539 break;
540 }
541 Err(e) => {
542 if !self.quiet {
543 display::workflow_failed(&step_def.name, &e.to_string());
544 }
545 return Err(e.into());
546 }
547 }
548 }
549 Ok(())
550 }
551 .await;
552
553 let remaining: Vec<(String, JoinHandle<Result<StepOutput, StepError>>)> =
555 self.pending_futures.drain().collect();
556 for (name, handle) in remaining {
557 let step_type = self
558 .workflow
559 .steps
560 .iter()
561 .find(|s| s.name == name)
562 .map(|s| s.step_type.to_string())
563 .unwrap_or_else(|| "async".to_string());
564 match handle.await {
565 Ok(Ok(output)) => {
566 self.context.store(&name, output.clone());
567 self.step_records.push(StepRecord {
568 name: name.clone(),
569 step_type: step_type.clone(),
570 status: "ok".to_string(),
571 duration_secs: 0.0,
572 output_summary: truncate(output.text(), 100),
573 input_tokens: None,
574 output_tokens: None,
575 cost_usd: None,
576 sandboxed: false,
577 });
578 }
579 Ok(Err(e)) => {
580 self.step_records.push(StepRecord {
581 name: name.clone(),
582 step_type: step_type.clone(),
583 status: "failed".to_string(),
584 duration_secs: 0.0,
585 output_summary: e.to_string(),
586 input_tokens: None,
587 output_tokens: None,
588 cost_usd: None,
589 sandboxed: false,
590 });
591 if !self.quiet {
592 eprintln!(" {} Async step '{}' failed: {}", "✗".red(), name, e);
593 }
594 }
595 Err(e) => {
596 let msg = format!("Async step '{}' panicked: {e}", name);
597 self.step_records.push(StepRecord {
598 name: name.clone(),
599 step_type,
600 status: "failed".to_string(),
601 duration_secs: 0.0,
602 output_summary: msg.clone(),
603 input_tokens: None,
604 output_tokens: None,
605 cost_usd: None,
606 sandboxed: false,
607 });
608 if !self.quiet {
609 eprintln!(" {} {}", "✗".red(), msg);
610 }
611 }
612 }
613 }
614
615 if self.sandbox_mode != SandboxMode::Disabled {
617 if let Err(e) = self.sandbox_down().await {
618 if !self.quiet {
619 eprintln!(" {} Sandbox cleanup warning: {e}", "⚠".yellow());
620 }
621 }
622 }
623
624 self.event_bus
626 .emit(Event::WorkflowCompleted {
627 duration_ms: start.elapsed().as_millis() as u64,
628 timestamp: chrono::Utc::now(),
629 })
630 .await;
631
632 run_result?;
634
635 if !self.quiet {
636 display::workflow_done(start.elapsed(), step_count);
637 }
638
639 self.state = Some(current_state);
640 Ok(last_output)
641 }
642
643 pub async fn execute_step(&mut self, step_def: &StepDef) -> Result<StepOutput, StepError> {
644 let config = self.resolve_config(step_def);
645 let use_sandbox = self.should_sandbox_step(&step_def.step_type);
646
647 let pb = if !self.quiet {
648 let label = if use_sandbox {
649 format!("{} 🐳", step_def.step_type)
650 } else {
651 step_def.step_type.to_string()
652 };
653 Some(display::step_start(&step_def.name, &label))
654 } else {
655 None
656 };
657
658 let start = Instant::now();
659
660 self.event_bus
662 .emit(Event::StepStarted {
663 step_name: step_def.name.clone(),
664 step_type: step_def.step_type.to_string(),
665 timestamp: chrono::Utc::now(),
666 })
667 .await;
668
669 tracing::debug!(
670 step = %step_def.name,
671 step_type = %step_def.step_type,
672 sandboxed = use_sandbox,
673 "Executing step"
674 );
675
676 let sandbox_ref = if use_sandbox { &self.sandbox } else { &None };
678
679 let result = match step_def.step_type {
680 StepType::Cmd => {
681 CmdExecutor
682 .execute_sandboxed(step_def, &config, &self.context, sandbox_ref)
683 .await
684 }
685 StepType::Agent => {
686 AgentExecutor
687 .execute_sandboxed(step_def, &config, &self.context, sandbox_ref)
688 .await
689 }
690 StepType::Gate => GateExecutor.execute(step_def, &config, &self.context).await,
691 StepType::Repeat => {
692 RepeatExecutor::new(&self.workflow.scopes)
693 .execute(step_def, &config, &self.context)
694 .await
695 }
696 StepType::Chat => ChatExecutor.execute(step_def, &config, &self.context).await,
697 StepType::Map => {
698 MapExecutor::new(&self.workflow.scopes)
699 .execute(step_def, &config, &self.context)
700 .await
701 }
702 StepType::Parallel => {
703 ParallelExecutor::new(&self.workflow.scopes)
704 .execute(step_def, &config, &self.context)
705 .await
706 }
707 StepType::Call => {
708 CallExecutor::new(&self.workflow.scopes)
709 .execute(step_def, &config, &self.context)
710 .await
711 }
712 StepType::Template => {
713 let prompts_dir = self.workflow.prompts_dir.as_deref();
714 TemplateStepExecutor::new(prompts_dir)
715 .execute(step_def, &config, &self.context)
716 .await
717 }
718 StepType::Script => {
719 ScriptExecutor
720 .execute(step_def, &config, &self.context)
721 .await
722 } };
725
726 let elapsed = start.elapsed();
727 let duration_ms = elapsed.as_millis() as u64;
728
729 let result = match result {
732 Ok(output) => parse_step_output(output, step_def),
733 err => err,
734 };
735
736 match &result {
737 Ok(output) => {
738 tracing::info!(
739 step = %step_def.name,
740 step_type = %step_def.step_type,
741 duration_ms = elapsed.as_millis(),
742 sandboxed = use_sandbox,
743 status = "ok",
744 "Step completed"
745 );
746 self.context.store(&step_def.name, output.clone());
747 if let Some(parsed) = extract_parsed_value(output, step_def) {
749 self.context.store_parsed(&step_def.name, parsed);
750 }
751
752 let (it, ot, cost) = token_stats(output);
753 self.step_records.push(StepRecord {
754 name: step_def.name.clone(),
755 step_type: step_def.step_type.to_string(),
756 status: "ok".to_string(),
757 duration_secs: elapsed.as_secs_f64(),
758 output_summary: truncate(output.text(), 100),
759 input_tokens: it,
760 output_tokens: ot,
761 cost_usd: cost,
762 sandboxed: use_sandbox,
763 });
764
765 if let Some(pb) = &pb {
766 display::step_ok(pb, &step_def.name, elapsed);
767 }
768 }
769 Err(StepError::ControlFlow(cf)) => {
770 let msg = match cf {
771 ControlFlow::Skip { message } => format!("skipped: {message}"),
772 ControlFlow::Break { message, .. } => format!("break: {message}"),
773 ControlFlow::Fail { message } => format!("failed: {message}"),
774 ControlFlow::Next { message } => format!("next: {message}"),
775 };
776 tracing::info!(
777 step = %step_def.name,
778 step_type = %step_def.step_type,
779 duration_ms = elapsed.as_millis(),
780 status = "control_flow",
781 message = %msg,
782 "Step control flow"
783 );
784 if let Some(pb) = &pb {
785 display::step_skip(pb, &step_def.name, &msg);
786 }
787 }
788 Err(e) => {
789 tracing::warn!(
790 step = %step_def.name,
791 step_type = %step_def.step_type,
792 duration_ms = elapsed.as_millis(),
793 status = "error",
794 error = %e,
795 "Step failed"
796 );
797 self.step_records.push(StepRecord {
798 name: step_def.name.clone(),
799 step_type: step_def.step_type.to_string(),
800 status: "failed".to_string(),
801 duration_secs: elapsed.as_secs_f64(),
802 output_summary: e.to_string(),
803 input_tokens: None,
804 output_tokens: None,
805 cost_usd: None,
806 sandboxed: use_sandbox,
807 });
808 if let Some(pb) = &pb {
809 display::step_fail(pb, &step_def.name, &e.to_string());
810 }
811 }
812 }
813
814 match &result {
816 Ok(_) => {
817 self.event_bus
818 .emit(Event::StepCompleted {
819 step_name: step_def.name.clone(),
820 step_type: step_def.step_type.to_string(),
821 duration_ms,
822 timestamp: chrono::Utc::now(),
823 })
824 .await;
825 }
826 Err(e) if !matches!(e, StepError::ControlFlow(_)) => {
827 self.event_bus
828 .emit(Event::StepFailed {
829 step_name: step_def.name.clone(),
830 step_type: step_def.step_type.to_string(),
831 error: e.to_string(),
832 duration_ms,
833 timestamp: chrono::Utc::now(),
834 })
835 .await;
836 }
837 _ => {}
838 }
839
840 result
841 }
842
843 pub fn dry_run(&self) {
845 use colored::Colorize;
846
847 println!(
848 "{} {} (dry-run)",
849 "▶".cyan().bold(),
850 self.workflow.name.bold()
851 );
852 if self.sandbox_mode != SandboxMode::Disabled {
853 println!(" {} Sandbox mode: {:?}", "🔒".cyan(), self.sandbox_mode);
854 }
855 println!();
856
857 let steps = &self.workflow.steps;
858 let total = steps.len();
859 for (i, step) in steps.iter().enumerate() {
860 let is_last = i + 1 == total;
861 let branch = if is_last { "└──" } else { "├──" };
862 let config = self.resolve_config(step);
863
864 let sandbox_indicator = if self.should_sandbox_step(&step.step_type) {
865 " 🐳"
866 } else {
867 ""
868 };
869
870 let async_indicator = if step.async_exec == Some(true) {
871 " ⚡"
872 } else {
873 ""
874 };
875
876 println!(
877 "{} {} {}{}{}",
878 branch.dimmed(),
879 step.name.bold(),
880 format!("[{}]", step.step_type).cyan(),
881 sandbox_indicator,
882 async_indicator
883 );
884
885 let indent = if is_last { " " } else { "│ " };
886 self.print_step_details(step, &config, indent);
887
888 if !is_last {
889 println!("│");
890 }
891 }
892 }
893
894 fn print_step_details(&self, step: &StepDef, config: &StepConfig, indent: &str) {
895 use colored::Colorize;
896
897 match step.step_type {
898 StepType::Cmd => {
899 if let Some(ref run) = step.run {
900 let preview = truncate(run, 80);
901 println!("{} run: {}", indent, preview.dimmed());
902 }
903 }
904 StepType::Agent | StepType::Chat => {
905 if let Some(ref prompt) = step.prompt {
906 let preview = truncate(&prompt.replace('\n', " "), 80);
907 println!("{} prompt: {}", indent, preview.dimmed());
908 }
909 if let Some(model) = config.get_str("model") {
910 println!("{} model: {}", indent, model.dimmed());
911 }
912 }
913 StepType::Gate => {
914 if let Some(ref cond) = step.condition {
915 println!("{} condition: {}", indent, cond.dimmed());
916 }
917 println!(
918 "{} on_pass: {} / on_fail: {}",
919 indent,
920 step.on_pass.as_deref().unwrap_or("continue").dimmed(),
921 step.on_fail.as_deref().unwrap_or("continue").dimmed()
922 );
923 }
924 StepType::Repeat => {
925 let scope_name = step.scope.as_deref().unwrap_or("<none>");
926 let max_iter = step.max_iterations.unwrap_or(1);
927 println!("{} scope: {}", indent, scope_name.dimmed());
928 println!(
929 "{} max_iterations: {}",
930 indent,
931 max_iter.to_string().dimmed()
932 );
933 self.print_scope_steps(scope_name, indent);
934 }
935 StepType::Map => {
936 let scope_name = step.scope.as_deref().unwrap_or("<none>");
937 let items = step.items.as_deref().unwrap_or("<none>");
938 println!("{} items: {}", indent, items.dimmed());
939 println!("{} scope: {}", indent, scope_name.dimmed());
940 if let Some(p) = step.parallel {
941 println!("{} parallel: {}", indent, p.to_string().dimmed());
942 }
943 self.print_scope_steps(scope_name, indent);
944 }
945 StepType::Call => {
946 let scope_name = step.scope.as_deref().unwrap_or("<none>");
947 println!("{} scope: {}", indent, scope_name.dimmed());
948 self.print_scope_steps(scope_name, indent);
949 }
950 StepType::Parallel => {
951 if let Some(ref sub_steps) = step.steps {
952 println!("{} parallel steps:", indent);
953 for sub in sub_steps {
954 println!(
955 "{} • {} [{}]",
956 indent,
957 sub.name.bold(),
958 sub.step_type.to_string().cyan()
959 );
960 }
961 }
962 }
963 StepType::Template => {
964 if let Some(ref run) = step.run {
965 println!("{} template: {}", indent, run.dimmed());
966 }
967 }
968 StepType::Script => {
969 if let Some(ref run) = step.run {
970 let preview = truncate(&run.replace('\n', " "), 80);
971 println!("{} script: {}", indent, preview.dimmed());
972 }
973 }
974 }
975
976 if let Some(t) = config.get_str("timeout") {
977 println!("{} timeout: {}", indent, t.dimmed());
978 }
979 }
980
981 fn print_scope_steps(&self, scope_name: &str, indent: &str) {
982 use colored::Colorize;
983 if let Some(scope) = self.workflow.scopes.get(scope_name) {
984 println!("{} scope steps:", indent);
985 for step in &scope.steps {
986 println!(
987 "{} • {} [{}]",
988 indent,
989 step.name.bold(),
990 step.step_type.to_string().cyan()
991 );
992 }
993 }
994 }
995
996 fn resolve_config(&self, step_def: &StepDef) -> StepConfig {
997 self.config_manager
998 .resolve(&step_def.name, &step_def.step_type, &step_def.config)
999 }
1000
1001 fn spawn_async_step(&self, step_def: &StepDef) -> JoinHandle<Result<StepOutput, StepError>> {
1007 let step = step_def.clone();
1008 let config = self.resolve_config(step_def);
1009 let target = self
1010 .context
1011 .get_var("target")
1012 .and_then(|v| v.as_str())
1013 .unwrap_or("")
1014 .to_string();
1015
1016 tokio::spawn(async move {
1017 let ctx = Context::new(target, HashMap::new());
1018 match step.step_type {
1019 StepType::Cmd => CmdExecutor.execute(&step, &config, &ctx).await,
1020 StepType::Agent => AgentExecutor.execute(&step, &config, &ctx).await,
1021 StepType::Script => ScriptExecutor.execute(&step, &config, &ctx).await,
1022 _ => Err(StepError::Fail(format!(
1023 "Async execution not supported for step type '{}'",
1024 step.step_type
1025 ))),
1026 }
1027 })
1028 }
1029
1030 async fn await_pending_deps(&mut self, step_def: &StepDef) -> Result<(), StepError> {
1033 let pattern = Regex::new(r"steps\.(\w+)\.").unwrap();
1034
1035 let mut templates: Vec<String> = Vec::new();
1037 if let Some(ref run) = step_def.run {
1038 templates.push(run.clone());
1039 }
1040 if let Some(ref prompt) = step_def.prompt {
1041 templates.push(prompt.clone());
1042 }
1043 if let Some(ref condition) = step_def.condition {
1044 templates.push(condition.clone());
1045 }
1046
1047 let mut deps: Vec<String> = Vec::new();
1049 for tmpl in &templates {
1050 for cap in pattern.captures_iter(tmpl) {
1051 let name = cap[1].to_string();
1052 if self.pending_futures.contains_key(&name) && !deps.contains(&name) {
1053 deps.push(name);
1054 }
1055 }
1056 }
1057
1058 for name in deps {
1060 self.await_pending_step(&name).await?;
1061 }
1062
1063 Ok(())
1064 }
1065
1066 async fn await_pending_step(&mut self, name: &str) -> Result<(), StepError> {
1068 if let Some(handle) = self.pending_futures.remove(name) {
1069 match handle.await {
1070 Ok(Ok(output)) => {
1071 self.context.store(name, output.clone());
1072 self.step_records.push(StepRecord {
1073 name: name.to_string(),
1074 step_type: "async".to_string(),
1075 status: "ok".to_string(),
1076 duration_secs: 0.0,
1077 output_summary: truncate(output.text(), 100),
1078 input_tokens: None,
1079 output_tokens: None,
1080 cost_usd: None,
1081 sandboxed: false,
1082 });
1083 }
1084 Ok(Err(e)) => {
1085 return Err(StepError::Fail(format!(
1086 "Async step '{}' failed: {e}",
1087 name
1088 )));
1089 }
1090 Err(e) => {
1091 return Err(StepError::Fail(format!(
1092 "Async step '{}' panicked: {e}",
1093 name
1094 )));
1095 }
1096 }
1097 }
1098 Ok(())
1099 }
1100}
1101
1102fn token_stats(output: &StepOutput) -> (Option<u64>, Option<u64>, Option<f64>) {
1104 match output {
1105 StepOutput::Agent(o) => (
1106 Some(o.stats.input_tokens),
1107 Some(o.stats.output_tokens),
1108 Some(o.stats.cost_usd),
1109 ),
1110 StepOutput::Chat(o) => (Some(o.input_tokens), Some(o.output_tokens), None),
1111 _ => (None, None, None),
1112 }
1113}
1114
1115fn parse_step_output(output: StepOutput, step_def: &StepDef) -> Result<StepOutput, StepError> {
1118 let output_type = match &step_def.output_type {
1119 Some(t) => t,
1120 None => return Ok(output),
1121 };
1122
1123 if *output_type == OutputType::Text {
1124 return Ok(output);
1125 }
1126
1127 let text = output.text().trim().to_string();
1128
1129 match output_type {
1130 OutputType::Integer => {
1131 text.parse::<i64>()
1132 .map_err(|_| StepError::Fail(format!("Failed to parse '{}' as integer", text)))?;
1133 }
1134 OutputType::Json => {
1135 serde_json::from_str::<serde_json::Value>(&text)
1136 .map_err(|e| StepError::Fail(format!("Failed to parse output as JSON: {e}")))?;
1137 }
1138 OutputType::Boolean => match text.to_lowercase().as_str() {
1139 "true" | "1" | "yes" | "false" | "0" | "no" => {}
1140 _ => {
1141 return Err(StepError::Fail(format!(
1142 "Failed to parse '{}' as boolean",
1143 text
1144 )));
1145 }
1146 },
1147 OutputType::Lines | OutputType::Text => {}
1148 }
1149
1150 Ok(output)
1151}
1152
1153fn extract_parsed_value(output: &StepOutput, step_def: &StepDef) -> Option<ParsedValue> {
1156 let output_type = step_def.output_type.as_ref()?;
1157
1158 let text = output.text().trim().to_string();
1159
1160 let parsed = match output_type {
1161 OutputType::Text => ParsedValue::Text(text),
1162 OutputType::Integer => ParsedValue::Integer(text.parse::<i64>().ok()?),
1163 OutputType::Json => {
1164 let val = serde_json::from_str::<serde_json::Value>(&text).ok()?;
1165 ParsedValue::Json(val)
1166 }
1167 OutputType::Lines => {
1168 let lines: Vec<String> = text
1169 .lines()
1170 .filter(|l| !l.is_empty())
1171 .map(|l| l.to_string())
1172 .collect();
1173 ParsedValue::Lines(lines)
1174 }
1175 OutputType::Boolean => {
1176 let b = match text.to_lowercase().as_str() {
1177 "true" | "1" | "yes" => true,
1178 _ => false,
1179 };
1180 ParsedValue::Boolean(b)
1181 }
1182 };
1183
1184 Some(parsed)
1185}
1186
1187fn truncate(s: &str, max: usize) -> String {
1190 let char_count = s.chars().count();
1191 if char_count <= max {
1192 s.to_string()
1193 } else {
1194 let end: usize = s.char_indices().nth(max).map(|(i, _)| i).unwrap_or(s.len());
1195 format!("{}…", &s[..end])
1196 }
1197}
1198
1199async fn detect_stack_if_registry_exists() -> Option<StackInfo> {
1202 let registry_path = std::path::Path::new("prompts/registry.yaml");
1203 if !registry_path.exists() {
1204 return None;
1205 }
1206 match Registry::from_file(registry_path).await {
1207 Ok(registry) => {
1208 let workspace = std::path::Path::new(".");
1209 match StackDetector::detect(®istry, workspace).await {
1210 Ok(info) => {
1211 tracing::info!(stack = %info.name, "Detected project stack");
1212 Some(info)
1213 }
1214 Err(e) => {
1215 tracing::debug!("Stack detection failed: {e}");
1216 None
1217 }
1218 }
1219 }
1220 Err(e) => {
1221 tracing::warn!("Failed to parse prompts/registry.yaml: {e}");
1222 None
1223 }
1224 }
1225}
1226
1227#[cfg(test)]
1228mod tests {
1229 use super::*;
1230 use crate::workflow::parser;
1231
1232 static CWD_LOCK: std::sync::LazyLock<std::sync::Mutex<()>> =
1234 std::sync::LazyLock::new(|| std::sync::Mutex::new(()));
1235
1236 #[tokio::test]
1237 async fn engine_runs_sequential_cmd_steps() {
1238 let yaml = r#"
1239name: test
1240steps:
1241 - name: step1
1242 type: cmd
1243 run: "echo first"
1244 - name: step2
1245 type: cmd
1246 run: "echo second"
1247"#;
1248 let wf = parser::parse_str(yaml).unwrap();
1249 let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1250 let result = engine.run().await.unwrap();
1251 assert_eq!(result.text().trim(), "second");
1252 assert!(engine.context.get_step("step1").is_some());
1253 assert_eq!(
1254 engine.context.get_step("step1").unwrap().text().trim(),
1255 "first"
1256 );
1257 }
1258
1259 #[tokio::test]
1260 async fn engine_exposes_step_output_to_next_step() {
1261 let yaml = r#"
1262name: test
1263steps:
1264 - name: produce
1265 type: cmd
1266 run: "echo hello_world"
1267 - name: consume
1268 type: cmd
1269 run: "echo {{ steps.produce.stdout }}"
1270"#;
1271 let wf = parser::parse_str(yaml).unwrap();
1272 let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1273 let result = engine.run().await.unwrap();
1274 assert!(result.text().contains("hello_world"));
1275 }
1276
1277 #[tokio::test]
1278 async fn engine_collects_step_records_in_json_mode() {
1279 let yaml = r#"
1280name: json-test
1281steps:
1282 - name: alpha
1283 type: cmd
1284 run: "echo alpha"
1285 - name: beta
1286 type: cmd
1287 run: "echo beta"
1288"#;
1289 let wf = parser::parse_str(yaml).unwrap();
1290 let opts = EngineOptions {
1291 json: true,
1292 ..Default::default()
1293 };
1294 let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts).await;
1295 engine.run().await.unwrap();
1296
1297 let records = engine.step_records();
1298 assert_eq!(records.len(), 2);
1299 assert_eq!(records[0].name, "alpha");
1300 assert_eq!(records[0].status, "ok");
1301 assert!(!records[0].sandboxed);
1302 assert_eq!(records[1].name, "beta");
1303 assert_eq!(records[1].status, "ok");
1304 }
1305
1306 #[tokio::test]
1307 async fn json_output_includes_sandbox_mode() {
1308 let yaml = r#"
1309name: json-output-test
1310steps:
1311 - name: greet
1312 type: cmd
1313 run: "echo hello"
1314"#;
1315 let wf = parser::parse_str(yaml).unwrap();
1316 let opts = EngineOptions {
1317 json: true,
1318 ..Default::default()
1319 };
1320 let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts).await;
1321 let start = Instant::now();
1322 engine.run().await.unwrap();
1323 let out = engine.json_output("success", start.elapsed());
1324
1325 let json = serde_json::to_string(&out).unwrap();
1326 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1327
1328 assert_eq!(parsed["workflow_name"], "json-output-test");
1329 assert_eq!(parsed["status"], "success");
1330 assert_eq!(parsed["sandbox_mode"], "Disabled");
1331 assert!(parsed["steps"].is_array());
1332 assert_eq!(parsed["steps"][0]["name"], "greet");
1333 }
1334
1335 #[tokio::test]
1336 async fn should_sandbox_step_logic() {
1337 let yaml = r#"
1338name: test
1339steps:
1340 - name: s
1341 type: cmd
1342 run: "echo test"
1343"#;
1344 let wf = parser::parse_str(yaml).unwrap();
1345
1346 let engine = Engine::new(wf.clone(), "".to_string(), HashMap::new(), false, true).await;
1348 assert!(!engine.should_sandbox_step(&StepType::Cmd));
1349 assert!(!engine.should_sandbox_step(&StepType::Agent));
1350 assert!(!engine.should_sandbox_step(&StepType::Gate));
1351
1352 let opts = EngineOptions {
1354 sandbox_mode: SandboxMode::FullWorkflow,
1355 quiet: true,
1356 ..Default::default()
1357 };
1358 let engine = Engine::with_options(wf.clone(), "".to_string(), HashMap::new(), opts).await;
1359 assert!(engine.should_sandbox_step(&StepType::Cmd));
1360 assert!(engine.should_sandbox_step(&StepType::Agent));
1361 assert!(!engine.should_sandbox_step(&StepType::Gate));
1362
1363 let opts = EngineOptions {
1365 sandbox_mode: SandboxMode::AgentOnly,
1366 quiet: true,
1367 ..Default::default()
1368 };
1369 let engine = Engine::with_options(wf.clone(), "".to_string(), HashMap::new(), opts).await;
1370 assert!(!engine.should_sandbox_step(&StepType::Cmd));
1371 assert!(engine.should_sandbox_step(&StepType::Agent));
1372 assert!(!engine.should_sandbox_step(&StepType::Gate));
1373 }
1374
1375 #[tokio::test]
1376 async fn dry_run_does_not_panic() {
1377 let yaml = r#"
1378name: dry-run-test
1379scopes:
1380 lint_fix:
1381 steps:
1382 - name: lint
1383 type: cmd
1384 run: "npm run lint"
1385 - name: fix_lint
1386 type: agent
1387 prompt: "Fix lint errors"
1388steps:
1389 - name: setup
1390 type: cmd
1391 run: "echo setup"
1392 - name: validate
1393 type: gate
1394 condition: "{{ steps.setup.exit_code == 0 }}"
1395 on_pass: continue
1396 on_fail: fail
1397 - name: lint_gate
1398 type: repeat
1399 scope: lint_fix
1400 max_iterations: 3
1401"#;
1402 let wf = crate::workflow::parser::parse_str(yaml).unwrap();
1403 let engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1404 engine.dry_run();
1405 }
1406
1407 #[tokio::test]
1408 async fn dry_run_all_step_types() {
1409 let yaml = r#"
1410name: all-types
1411steps:
1412 - name: c
1413 type: cmd
1414 run: "ls"
1415 - name: g
1416 type: gate
1417 condition: "{{ true }}"
1418 on_pass: continue
1419 - name: p
1420 type: parallel
1421 steps:
1422 - name: p1
1423 type: cmd
1424 run: "echo p1"
1425"#;
1426 let wf = crate::workflow::parser::parse_str(yaml).unwrap();
1427 let engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1428 engine.dry_run();
1429 }
1430
1431 #[test]
1432 fn truncate_helper() {
1433 assert_eq!(truncate("hello", 10), "hello");
1434 assert_eq!(truncate("hello world", 5), "hello…");
1435 }
1436
1437 #[tokio::test]
1438 async fn resume_fails_when_no_state_file() {
1439 let yaml = r#"
1440name: no-state-workflow-xyz-unique
1441steps:
1442 - name: step1
1443 type: cmd
1444 run: "echo 1"
1445"#;
1446 let wf = crate::workflow::parser::parse_str(yaml).unwrap();
1447 let opts = EngineOptions {
1448 resume_from: Some("step1".to_string()),
1449 quiet: true,
1450 ..Default::default()
1451 };
1452 let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts).await;
1453 let err = engine.run().await.unwrap_err();
1454 assert!(
1455 err.to_string().contains("No state file found"),
1456 "Expected 'No state file found' but got: {err}"
1457 );
1458 }
1459
1460 #[tokio::test]
1461 async fn resume_fails_for_unknown_step() {
1462 let workflow_name = "test-resume-unknown-step";
1463 let state = WorkflowState::new(workflow_name);
1464 let tmp_path = format!("/tmp/minion-{workflow_name}-20991231235959.state.json");
1465 let path = PathBuf::from(&tmp_path);
1466 state.save(&path).unwrap();
1467
1468 let yaml = format!(
1469 r#"
1470name: {workflow_name}
1471steps:
1472 - name: step1
1473 type: cmd
1474 run: "echo 1"
1475"#
1476 );
1477 let wf = crate::workflow::parser::parse_str(&yaml).unwrap();
1478 let opts = EngineOptions {
1479 resume_from: Some("nonexistent_step".to_string()),
1480 quiet: true,
1481 ..Default::default()
1482 };
1483 let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts).await;
1484 let err = engine.run().await.unwrap_err();
1485 assert!(
1486 err.to_string().contains("not found in workflow"),
1487 "Expected 'not found in workflow' but got: {err}"
1488 );
1489
1490 let _ = std::fs::remove_file(&path);
1491 }
1492
1493 #[tokio::test]
1494 async fn safe_accessor_returns_empty_for_missing_step() {
1495 let yaml = r#"
1496name: test-safe-accessor
1497steps:
1498 - name: use_missing
1499 type: cmd
1500 run: "echo '{{ missing.output? }}'"
1501"#;
1502 let wf = parser::parse_str(yaml).unwrap();
1503 let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1504 let result = engine.run().await.unwrap();
1505 assert_eq!(result.text().trim(), "");
1507 }
1508
1509 #[tokio::test]
1510 async fn safe_accessor_returns_value_when_present() {
1511 let yaml = r#"
1512name: test-safe-accessor-present
1513steps:
1514 - name: produce
1515 type: cmd
1516 run: "echo hello"
1517 - name: consume
1518 type: cmd
1519 run: "echo '{{ produce.output? }}'"
1520"#;
1521 let wf = parser::parse_str(yaml).unwrap();
1522 let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1523 let result = engine.run().await.unwrap();
1524 assert!(result.text().contains("hello"));
1525 }
1526
1527 #[tokio::test]
1528 async fn strict_accessor_fails_when_step_missing() {
1529 let yaml = r#"
1530name: test-strict-accessor-fail
1531steps:
1532 - name: use_missing
1533 type: cmd
1534 run: "echo '{{ nonexistent.output! }}'"
1535"#;
1536 let wf = parser::parse_str(yaml).unwrap();
1537 let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1538 let err = engine.run().await.unwrap_err();
1539 assert!(err.to_string().contains("strict access"), "{err}");
1540 }
1541
1542 #[tokio::test]
1543 async fn output_type_integer_parses_number() {
1544 let yaml = r#"
1545name: test-parse
1546steps:
1547 - name: count
1548 type: cmd
1549 run: "echo 42"
1550 output_type: integer
1551 - name: use_count
1552 type: cmd
1553 run: "echo {{ count.output }}"
1554"#;
1555 let wf = parser::parse_str(yaml).unwrap();
1556 let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1557 let result = engine.run().await.unwrap();
1558 assert_eq!(result.text().trim(), "42");
1559 }
1560
1561 #[tokio::test]
1562 async fn output_type_integer_fails_on_non_number() {
1563 let yaml = r#"
1564name: test-parse-fail
1565steps:
1566 - name: count
1567 type: cmd
1568 run: "echo not_a_number"
1569 output_type: integer
1570"#;
1571 let wf = parser::parse_str(yaml).unwrap();
1572 let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1573 let err = engine.run().await.unwrap_err();
1574 assert!(err.to_string().contains("integer"), "{err}");
1575 }
1576
1577 #[tokio::test]
1578 async fn output_type_json_allows_dot_access() {
1579 let yaml = r#"
1580name: test-json
1581steps:
1582 - name: scan
1583 type: cmd
1584 run: "echo '{\"count\": 5}'"
1585 output_type: json
1586 - name: use_scan
1587 type: cmd
1588 run: "echo {{ scan.output.count }}"
1589"#;
1590 let wf = parser::parse_str(yaml).unwrap();
1591 let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1592 let result = engine.run().await.unwrap();
1593 assert_eq!(result.text().trim(), "5");
1594 }
1595
1596 #[tokio::test]
1597 async fn output_type_lines_allows_length_filter() {
1598 let yaml = r#"
1599name: test-lines
1600steps:
1601 - name: files
1602 type: cmd
1603 run: "printf 'a.rs\nb.rs\nc.rs'"
1604 output_type: lines
1605 - name: count_files
1606 type: cmd
1607 run: "echo {{ files.output | length }}"
1608"#;
1609 let wf = parser::parse_str(yaml).unwrap();
1610 let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1611 let result = engine.run().await.unwrap();
1612 assert_eq!(result.text().trim(), "3");
1613 }
1614
1615 #[tokio::test]
1618 async fn async_step_is_spawned_and_completes() {
1619 let yaml = r#"
1620name: async-test
1621steps:
1622 - name: bg_task
1623 type: cmd
1624 run: "echo async_result"
1625 async_exec: true
1626 - name: sync_step
1627 type: cmd
1628 run: "echo sync_result"
1629"#;
1630 let wf = parser::parse_str(yaml).unwrap();
1631 let opts = EngineOptions {
1632 quiet: true,
1633 ..Default::default()
1634 };
1635 let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts).await;
1636 let result = engine.run().await.unwrap();
1637 assert!(result.text().contains("sync_result"));
1639 let records = engine.step_records();
1641 assert!(
1642 records.iter().any(|r| r.name == "bg_task"),
1643 "bg_task should be in records"
1644 );
1645 }
1646
1647 #[tokio::test]
1648 async fn dry_run_shows_async_lightning_indicator() {
1649 let yaml = r#"
1650name: dry-async
1651steps:
1652 - name: fast_bg
1653 type: cmd
1654 run: "echo bg"
1655 async_exec: true
1656 - name: normal
1657 type: cmd
1658 run: "echo normal"
1659"#;
1660 let wf = parser::parse_str(yaml).unwrap();
1662 let engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1663 engine.dry_run();
1665 }
1666
1667 #[tokio::test]
1668 async fn should_sandbox_step_script_always_false() {
1669 let yaml = r#"
1670name: test
1671steps:
1672 - name: s
1673 type: cmd
1674 run: "echo test"
1675"#;
1676 let wf = parser::parse_str(yaml).unwrap();
1677
1678 let opts = EngineOptions {
1680 sandbox_mode: SandboxMode::FullWorkflow,
1681 quiet: true,
1682 ..Default::default()
1683 };
1684 let engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts).await;
1685 assert!(!engine.should_sandbox_step(&StepType::Script));
1686 }
1687
1688 #[tokio::test]
1691 async fn multiple_async_steps_all_complete_by_workflow_end() {
1692 let yaml = r#"
1693name: multi-async
1694steps:
1695 - name: task_a
1696 type: cmd
1697 run: "echo result_a"
1698 async_exec: true
1699 - name: task_b
1700 type: cmd
1701 run: "echo result_b"
1702 async_exec: true
1703 - name: sync_done
1704 type: cmd
1705 run: "echo done"
1706"#;
1707 let wf = parser::parse_str(yaml).unwrap();
1708 let opts = EngineOptions {
1709 quiet: true,
1710 ..Default::default()
1711 };
1712 let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts).await;
1713 engine.run().await.unwrap();
1714
1715 let records = engine.step_records();
1716 assert!(
1717 records.iter().any(|r| r.name == "task_a"),
1718 "task_a should be recorded"
1719 );
1720 assert!(
1721 records.iter().any(|r| r.name == "task_b"),
1722 "task_b should be recorded"
1723 );
1724 assert!(
1725 records.iter().any(|r| r.name == "sync_done"),
1726 "sync_done should be recorded"
1727 );
1728 }
1729
1730 #[tokio::test]
1733 async fn engine_dispatches_script_step() {
1734 let yaml = r#"
1735name: script-dispatch
1736steps:
1737 - name: calc
1738 type: script
1739 run: |
1740 let x = 6 * 7;
1741 x.to_string()
1742"#;
1743 let wf = parser::parse_str(yaml).unwrap();
1744 let opts = EngineOptions {
1745 quiet: true,
1746 ..Default::default()
1747 };
1748 let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts).await;
1749 let result = engine.run().await.unwrap();
1750 assert_eq!(result.text().trim(), "42");
1751 }
1752
1753 #[tokio::test]
1754 async fn stack_context_injected_when_registry_exists() {
1755 let _lock = CWD_LOCK.lock().unwrap_or_else(|e| e.into_inner());
1756
1757 let dir = tempfile::tempdir().unwrap();
1758 let orig_dir = std::env::current_dir().unwrap();
1759
1760 let prompts_dir = dir.path().join("prompts");
1762 std::fs::create_dir_all(&prompts_dir).unwrap();
1763 let registry_yaml = r#"
1764version: 1
1765detection_order:
1766 - rust
1767stacks:
1768 rust:
1769 file_markers:
1770 - Cargo.toml
1771 tools:
1772 lint: "cargo clippy"
1773 test: "cargo test"
1774 build: "cargo build"
1775 install: "cargo fetch"
1776"#;
1777 std::fs::write(prompts_dir.join("registry.yaml"), registry_yaml).unwrap();
1778 std::fs::write(dir.path().join("Cargo.toml"), "[package]").unwrap();
1780
1781 std::env::set_current_dir(dir.path()).unwrap();
1782
1783 let yaml = "name: test\nsteps:\n - name: s\n type: cmd\n run: \"echo hi\"\n";
1784 let wf = parser::parse_str(yaml).unwrap();
1785 let engine = Engine::with_options(
1786 wf,
1787 "target".to_string(),
1788 HashMap::new(),
1789 EngineOptions {
1790 quiet: true,
1791 ..Default::default()
1792 },
1793 )
1794 .await;
1795
1796 let result = engine.context.render_template("{{ stack.name }}").unwrap();
1797 assert_eq!(result, "rust");
1798
1799 let lint = engine
1800 .context
1801 .render_template("{{ stack.tools.lint }}")
1802 .unwrap();
1803 assert_eq!(lint, "cargo clippy");
1804
1805 assert!(engine.stack_info.is_some());
1806 assert_eq!(engine.stack_info.as_ref().unwrap().name, "rust");
1807
1808 std::env::set_current_dir(orig_dir).unwrap();
1809 }
1810
1811 #[tokio::test]
1812 async fn stack_context_skipped_when_no_registry() {
1813 let _lock = CWD_LOCK.lock().unwrap_or_else(|e| e.into_inner());
1814
1815 let yaml = "name: test\nsteps:\n - name: s\n type: cmd\n run: \"echo hi\"\n";
1816 let wf = parser::parse_str(yaml).unwrap();
1817 let dir = tempfile::tempdir().unwrap();
1819 let orig_dir = std::env::current_dir().unwrap();
1820 std::env::set_current_dir(dir.path()).unwrap();
1821
1822 let engine = Engine::with_options(
1823 wf,
1824 "target".to_string(),
1825 HashMap::new(),
1826 EngineOptions {
1827 quiet: true,
1828 ..Default::default()
1829 },
1830 )
1831 .await;
1832 assert!(engine.stack_info.is_none());
1834 assert!(engine.context.get_var("stack").is_none());
1836
1837 std::env::set_current_dir(orig_dir).unwrap();
1838 }
1839}