1pub mod context;
2pub mod state;
3mod template;
4
5use std::collections::HashMap;
6use std::path::PathBuf;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use anyhow::{bail, Result};
11use colored::Colorize;
12use regex::Regex;
13use serde::Serialize;
14use tokio::sync::Mutex;
15use tokio::task::JoinHandle;
16
17use crate::cli::display;
18use crate::config::{ConfigManager, StepConfig};
19use crate::control_flow::ControlFlow;
20use crate::error::StepError;
21use crate::events::subscribers::{FileSubscriber, WebhookSubscriber};
22use crate::events::types::Event;
23use crate::events::EventBus;
24use crate::plugins::registry::PluginRegistry;
25use crate::sandbox::config::SandboxConfig;
26use crate::sandbox::docker::DockerSandbox;
27use crate::sandbox::SandboxMode;
28use crate::steps::*;
29use crate::steps::{
30 agent::AgentExecutor, call::CallExecutor, chat::ChatExecutor, cmd::CmdExecutor,
31 gate::GateExecutor, map::MapExecutor, parallel::ParallelExecutor,
32 repeat::RepeatExecutor, script::ScriptExecutor, template_step::TemplateStepExecutor,
33};
34use crate::plugins::loader::PluginLoader;
35use crate::workflow::schema::{OutputType, StepDef, StepType, WorkflowDef};
36use context::Context;
37use state::WorkflowState;
38
39#[derive(Debug, Default)]
41pub struct EngineOptions {
42 pub verbose: bool,
43 pub quiet: bool,
44 pub json: bool,
46 pub dry_run: bool,
48 pub resume_from: Option<String>,
50 pub sandbox_mode: SandboxMode,
52}
53
54#[derive(Debug, Clone, Serialize)]
56pub struct StepRecord {
57 pub name: String,
58 pub step_type: String,
59 pub status: String,
60 pub duration_secs: f64,
61 pub output_summary: String,
62 #[serde(skip_serializing_if = "Option::is_none")]
63 pub input_tokens: Option<u64>,
64 #[serde(skip_serializing_if = "Option::is_none")]
65 pub output_tokens: Option<u64>,
66 #[serde(skip_serializing_if = "Option::is_none")]
67 pub cost_usd: Option<f64>,
68 #[serde(skip_serializing_if = "std::ops::Not::not")]
70 pub sandboxed: bool,
71}
72
73#[derive(Debug, Serialize)]
75pub struct WorkflowJsonOutput {
76 pub workflow_name: String,
77 pub status: String,
78 pub sandbox_mode: String,
79 pub steps: Vec<StepRecord>,
80 pub total_duration_secs: f64,
81 pub total_tokens: u64,
82 pub total_cost_usd: f64,
83}
84
85pub struct Engine {
86 pub workflow: WorkflowDef,
87 pub context: Context,
88 config_manager: ConfigManager,
89 pub verbose: bool,
90 pub quiet: bool,
91 pub json: bool,
92 pub dry_run: bool,
93 resume_from: Option<String>,
94 sandbox_mode: SandboxMode,
95 sandbox: SharedSandbox,
97 step_records: Vec<StepRecord>,
98 state: Option<WorkflowState>,
99 state_file: Option<PathBuf>,
100 pending_futures: HashMap<String, JoinHandle<Result<StepOutput, StepError>>>,
102 plugin_registry: Arc<Mutex<PluginRegistry>>,
104 pub event_bus: EventBus,
106}
107
108impl Engine {
109 pub fn new(
110 workflow: WorkflowDef,
111 target: String,
112 vars: HashMap<String, serde_json::Value>,
113 verbose: bool,
114 quiet: bool,
115 ) -> Self {
116 let options = EngineOptions {
117 verbose,
118 quiet,
119 ..Default::default()
120 };
121 Self::with_options(workflow, target, vars, options)
122 }
123
124 pub fn with_options(
125 workflow: WorkflowDef,
126 target: String,
127 vars: HashMap<String, serde_json::Value>,
128 options: EngineOptions,
129 ) -> Self {
130 let context = Context::new(target, vars);
131 let config_manager = ConfigManager::new(workflow.config.clone());
132 let quiet = options.quiet || options.json;
134
135 let mut registry = PluginRegistry::new();
137 for plugin_cfg in &workflow.config.plugins {
138 match PluginLoader::load_plugin(&plugin_cfg.path) {
139 Ok(plugin) => {
140 tracing::info!(name = %plugin_cfg.name, path = %plugin_cfg.path, "Loaded plugin");
141 registry.register(plugin);
142 }
143 Err(e) => {
144 tracing::warn!(
145 name = %plugin_cfg.name,
146 path = %plugin_cfg.path,
147 error = %e,
148 "Failed to load plugin"
149 );
150 }
151 }
152 }
153
154 let mut event_bus = EventBus::new();
156 if let Some(ref events_cfg) = workflow.config.events {
157 if let Some(ref webhook_url) = events_cfg.webhook {
158 event_bus.add_subscriber(Box::new(WebhookSubscriber::new(webhook_url.clone())));
159 tracing::info!(url = %webhook_url, "Registered webhook event subscriber");
160 }
161 if let Some(ref file_path) = events_cfg.file {
162 event_bus.add_subscriber(Box::new(FileSubscriber::new(file_path.clone())));
163 tracing::info!(path = %file_path, "Registered file event subscriber");
164 }
165 }
166
167 Self {
168 workflow,
169 context,
170 config_manager,
171 verbose: options.verbose,
172 quiet,
173 json: options.json,
174 dry_run: options.dry_run,
175 resume_from: options.resume_from,
176 sandbox_mode: options.sandbox_mode,
177 sandbox: None,
178 step_records: Vec::new(),
179 state: None,
180 state_file: None,
181 pending_futures: HashMap::new(),
182 plugin_registry: Arc::new(Mutex::new(registry)),
183 event_bus,
184 }
185 }
186
187 pub fn step_records(&self) -> &[StepRecord] {
189 &self.step_records
190 }
191
192 pub fn json_output(&self, status: &str, total_duration: Duration) -> WorkflowJsonOutput {
194 let total_tokens: u64 = self
195 .step_records
196 .iter()
197 .map(|r| r.input_tokens.unwrap_or(0) + r.output_tokens.unwrap_or(0))
198 .sum();
199 let total_cost: f64 = self
200 .step_records
201 .iter()
202 .filter_map(|r| r.cost_usd)
203 .sum();
204
205 WorkflowJsonOutput {
206 workflow_name: self.workflow.name.clone(),
207 status: status.to_string(),
208 sandbox_mode: format!("{:?}", self.sandbox_mode),
209 steps: self.step_records.clone(),
210 total_duration_secs: total_duration.as_secs_f64(),
211 total_tokens,
212 total_cost_usd: total_cost,
213 }
214 }
215
216 async fn sandbox_up(&mut self) -> Result<()> {
221 let sandbox_config = SandboxConfig::from_global_config(&self.workflow.config.global);
222 let workspace = std::env::current_dir()
223 .map(|p| p.to_string_lossy().to_string())
224 .unwrap_or_else(|_| ".".to_string());
225
226 let mut docker = DockerSandbox::new(sandbox_config, &workspace);
227
228 if !self.quiet {
229 println!(" {} Creating Docker sandbox container…", "🐳".cyan());
230 }
231
232 let t0 = Instant::now();
233 docker.create().await?;
234 let create_ms = t0.elapsed().as_millis();
235
236 let t1 = Instant::now();
237 docker.copy_workspace(&workspace).await?;
238 let copy_ms = t1.elapsed().as_millis();
239
240 let t2 = Instant::now();
245 let _ = docker
246 .run_command(
247 "git config --global --add safe.directory /workspace \
248 && git config --global user.name 'Minion Engine' \
249 && git config --global user.email 'minion@localhost'",
250 )
251 .await;
252 let git_ms = t2.elapsed().as_millis();
253
254 let total_ms = t0.elapsed().as_millis();
255
256 if !self.quiet {
257 println!(
258 " {} Sandbox ready — container {:.1}s, copy {:.1}s, git {:.1}s (total {:.1}s)",
259 "🔒".green(),
260 create_ms as f64 / 1000.0,
261 copy_ms as f64 / 1000.0,
262 git_ms as f64 / 1000.0,
263 total_ms as f64 / 1000.0,
264 );
265 }
266
267 tracing::info!(
268 create_ms,
269 copy_ms,
270 git_ms,
271 total_ms,
272 "Sandbox setup complete"
273 );
274
275 self.sandbox = Some(Arc::new(Mutex::new(docker)));
276 Ok(())
277 }
278
279 async fn sandbox_down(&mut self) -> Result<()> {
281 if let Some(sb) = self.sandbox.take() {
282 let mut docker = sb.lock().await;
283
284 let workspace = std::env::current_dir()
285 .map(|p| p.to_string_lossy().to_string())
286 .unwrap_or_else(|_| ".".to_string());
287
288 if !self.quiet {
289 println!(" {} Copying results from sandbox…", "📦".cyan());
290 }
291
292 let t0 = Instant::now();
293 docker.copy_results(&workspace).await?;
294 let copy_back_ms = t0.elapsed().as_millis();
295
296 let t1 = Instant::now();
297 docker.destroy().await?;
298 let destroy_ms = t1.elapsed().as_millis();
299
300 if !self.quiet {
301 println!(
302 " {} Sandbox destroyed — copy-back {:.1}s, destroy {:.1}s",
303 "🗑️ ".dimmed(),
304 copy_back_ms as f64 / 1000.0,
305 destroy_ms as f64 / 1000.0,
306 );
307 }
308
309 tracing::info!(copy_back_ms, destroy_ms, "Sandbox teardown complete");
310 }
311 Ok(())
312 }
313
314 fn should_sandbox_step(&self, step_type: &StepType) -> bool {
316 if *step_type == StepType::Script {
318 return false;
319 }
320 match self.sandbox_mode {
321 SandboxMode::Disabled => false,
322 SandboxMode::FullWorkflow | SandboxMode::Devbox => {
323 matches!(step_type, StepType::Cmd | StepType::Agent)
325 }
326 SandboxMode::AgentOnly => {
327 matches!(step_type, StepType::Agent)
329 }
330 }
331 }
332
333 pub async fn run(&mut self) -> Result<StepOutput> {
336 let state_file = WorkflowState::state_file_path(&self.workflow.name);
338 self.state_file = Some(state_file.clone());
339
340 let mut loaded_state: Option<WorkflowState> = None;
341 if let Some(ref resume_step) = self.resume_from.clone() {
342 match WorkflowState::find_latest(&self.workflow.name) {
343 Some(path) => {
344 match WorkflowState::load(&path) {
345 Ok(s) => {
346 let exists = self.workflow.steps.iter().any(|s| &s.name == resume_step);
347 if !exists {
348 bail!(
349 "Resume step '{}' not found in workflow '{}'. \
350 Available steps: {}",
351 resume_step,
352 self.workflow.name,
353 self.workflow.steps.iter().map(|s| s.name.as_str()).collect::<Vec<_>>().join(", ")
354 );
355 }
356 if !self.quiet {
357 println!(
358 " {} Resuming from step '{}' (state: {})",
359 "↺".cyan(),
360 resume_step,
361 path.display()
362 );
363 }
364 loaded_state = Some(s);
365 }
366 Err(e) => bail!("Failed to load state file {}: {e}", path.display()),
367 }
368 }
369 None => {
370 bail!(
371 "No state file found for workflow '{}'. \
372 Cannot resume. Run the workflow without --resume first.",
373 self.workflow.name
374 );
375 }
376 }
377 }
378
379 let mut current_state = WorkflowState::new(&self.workflow.name);
381
382 if !self.quiet {
384 display::workflow_start(&self.workflow.name);
385 if self.sandbox_mode != SandboxMode::Disabled {
386 println!(" {} Sandbox mode: {:?}", "🔒".cyan(), self.sandbox_mode);
387 }
388 }
389
390 if self.sandbox_mode != SandboxMode::Disabled {
392 self.sandbox_up().await?;
393 }
394
395 self.event_bus
397 .emit(Event::WorkflowStarted {
398 timestamp: chrono::Utc::now(),
399 })
400 .await;
401
402 let start = Instant::now();
403 let steps = self.workflow.steps.clone();
404 let mut last_output = StepOutput::Empty;
405 let mut step_count = 0;
406
407 let resume_from = self.resume_from.clone();
408 let mut resuming = resume_from.is_some();
409
410 let run_result: Result<(), anyhow::Error> = async {
412 for step_def in &steps {
413 if resuming {
415 let is_resume_point = resume_from.as_deref() == Some(&step_def.name);
416 if !is_resume_point {
417 if let Some(ref ls) = loaded_state {
418 if let Some(output) = ls.steps.get(&step_def.name) {
419 self.context.store(&step_def.name, output.clone());
420 if !self.quiet {
421 println!(
422 " {} {} {}",
423 "⏭".yellow(),
424 step_def.name,
425 "(skipped — loaded from state)".dimmed()
426 );
427 }
428 self.step_records.push(StepRecord {
429 name: step_def.name.clone(),
430 step_type: step_def.step_type.to_string(),
431 status: "skipped_resume".to_string(),
432 duration_secs: 0.0,
433 output_summary: truncate(output.text(), 100),
434 input_tokens: None,
435 output_tokens: None,
436 cost_usd: None,
437 sandboxed: false,
438 });
439 }
440 }
441 continue;
442 }
443 resuming = false;
444 }
445
446 if step_def.async_exec == Some(true) {
448 let handle = self.spawn_async_step(step_def);
449 self.pending_futures.insert(step_def.name.clone(), handle);
450 if !self.quiet {
451 println!(
452 " {} {} {} {}",
453 "⚡".yellow(),
454 step_def.name,
455 format!("[{}]", step_def.step_type).cyan(),
456 "(async — spawned)".dimmed()
457 );
458 }
459 step_count += 1;
460 continue;
461 }
462
463 self.await_pending_deps(step_def).await?;
465
466 match self.execute_step(step_def).await {
467 Ok(output) => {
468 current_state.steps.insert(step_def.name.clone(), output.clone());
469 if let Some(ref p) = self.state_file {
470 let _ = current_state.save(p);
471 }
472 last_output = output;
473 step_count += 1;
474 }
475 Err(StepError::ControlFlow(ControlFlow::Skip { message })) => {
476 self.context.store(&step_def.name, StepOutput::Empty);
477 if !self.quiet {
478 let pb = display::step_start(&step_def.name, &step_def.step_type.to_string());
479 display::step_skip(&pb, &step_def.name, &message);
480 }
481 self.step_records.push(StepRecord {
482 name: step_def.name.clone(),
483 step_type: step_def.step_type.to_string(),
484 status: "skipped".to_string(),
485 duration_secs: 0.0,
486 output_summary: message.clone(),
487 input_tokens: None,
488 output_tokens: None,
489 cost_usd: None,
490 sandboxed: false,
491 });
492 }
493 Err(StepError::ControlFlow(ControlFlow::Fail { message })) => {
494 if !self.quiet {
495 display::workflow_failed(&step_def.name, &message);
496 }
497 bail!("Step '{}' failed: {}", step_def.name, message);
498 }
499 Err(StepError::ControlFlow(ControlFlow::Break { .. })) => {
500 break;
501 }
502 Err(e) => {
503 if !self.quiet {
504 display::workflow_failed(&step_def.name, &e.to_string());
505 }
506 return Err(e.into());
507 }
508 }
509 }
510 Ok(())
511 }
512 .await;
513
514 let remaining: Vec<(String, JoinHandle<Result<StepOutput, StepError>>)> =
516 self.pending_futures.drain().collect();
517 for (name, handle) in remaining {
518 let step_type = self
519 .workflow
520 .steps
521 .iter()
522 .find(|s| s.name == name)
523 .map(|s| s.step_type.to_string())
524 .unwrap_or_else(|| "async".to_string());
525 match handle.await {
526 Ok(Ok(output)) => {
527 self.context.store(&name, output.clone());
528 self.step_records.push(StepRecord {
529 name: name.clone(),
530 step_type: step_type.clone(),
531 status: "ok".to_string(),
532 duration_secs: 0.0,
533 output_summary: truncate(output.text(), 100),
534 input_tokens: None,
535 output_tokens: None,
536 cost_usd: None,
537 sandboxed: false,
538 });
539 }
540 Ok(Err(e)) => {
541 self.step_records.push(StepRecord {
542 name: name.clone(),
543 step_type: step_type.clone(),
544 status: "failed".to_string(),
545 duration_secs: 0.0,
546 output_summary: e.to_string(),
547 input_tokens: None,
548 output_tokens: None,
549 cost_usd: None,
550 sandboxed: false,
551 });
552 if !self.quiet {
553 eprintln!(" {} Async step '{}' failed: {}", "✗".red(), name, e);
554 }
555 }
556 Err(e) => {
557 let msg = format!("Async step '{}' panicked: {e}", name);
558 self.step_records.push(StepRecord {
559 name: name.clone(),
560 step_type,
561 status: "failed".to_string(),
562 duration_secs: 0.0,
563 output_summary: msg.clone(),
564 input_tokens: None,
565 output_tokens: None,
566 cost_usd: None,
567 sandboxed: false,
568 });
569 if !self.quiet {
570 eprintln!(" {} {}", "✗".red(), msg);
571 }
572 }
573 }
574 }
575
576 if self.sandbox_mode != SandboxMode::Disabled {
578 if let Err(e) = self.sandbox_down().await {
579 if !self.quiet {
580 eprintln!(" {} Sandbox cleanup warning: {e}", "⚠".yellow());
581 }
582 }
583 }
584
585 self.event_bus
587 .emit(Event::WorkflowCompleted {
588 duration_ms: start.elapsed().as_millis() as u64,
589 timestamp: chrono::Utc::now(),
590 })
591 .await;
592
593 run_result?;
595
596 if !self.quiet {
597 display::workflow_done(start.elapsed(), step_count);
598 }
599
600 self.state = Some(current_state);
601 Ok(last_output)
602 }
603
604 pub async fn execute_step(&mut self, step_def: &StepDef) -> Result<StepOutput, StepError> {
605 let config = self.resolve_config(step_def);
606 let use_sandbox = self.should_sandbox_step(&step_def.step_type);
607
608 let pb = if !self.quiet {
609 let label = if use_sandbox {
610 format!("{} 🐳", step_def.step_type)
611 } else {
612 step_def.step_type.to_string()
613 };
614 Some(display::step_start(&step_def.name, &label))
615 } else {
616 None
617 };
618
619 let start = Instant::now();
620
621 self.event_bus
623 .emit(Event::StepStarted {
624 step_name: step_def.name.clone(),
625 step_type: step_def.step_type.to_string(),
626 timestamp: chrono::Utc::now(),
627 })
628 .await;
629
630 tracing::debug!(
631 step = %step_def.name,
632 step_type = %step_def.step_type,
633 sandboxed = use_sandbox,
634 "Executing step"
635 );
636
637 let sandbox_ref = if use_sandbox { &self.sandbox } else { &None };
639
640 let result = match step_def.step_type {
641 StepType::Cmd => {
642 CmdExecutor
643 .execute_sandboxed(step_def, &config, &self.context, sandbox_ref)
644 .await
645 }
646 StepType::Agent => {
647 AgentExecutor
648 .execute_sandboxed(step_def, &config, &self.context, sandbox_ref)
649 .await
650 }
651 StepType::Gate => GateExecutor.execute(step_def, &config, &self.context).await,
652 StepType::Repeat => {
653 RepeatExecutor::new(&self.workflow.scopes)
654 .execute(step_def, &config, &self.context)
655 .await
656 }
657 StepType::Chat => {
658 ChatExecutor.execute(step_def, &config, &self.context).await
659 }
660 StepType::Map => {
661 MapExecutor::new(&self.workflow.scopes)
662 .execute(step_def, &config, &self.context)
663 .await
664 }
665 StepType::Parallel => {
666 ParallelExecutor::new(&self.workflow.scopes)
667 .execute(step_def, &config, &self.context)
668 .await
669 }
670 StepType::Call => {
671 CallExecutor::new(&self.workflow.scopes)
672 .execute(step_def, &config, &self.context)
673 .await
674 }
675 StepType::Template => {
676 let prompts_dir = self.workflow.prompts_dir.as_deref();
677 TemplateStepExecutor::new(prompts_dir)
678 .execute(step_def, &config, &self.context)
679 .await
680 }
681 StepType::Script => {
682 ScriptExecutor.execute(step_def, &config, &self.context).await
683 }
684 };
687
688 let elapsed = start.elapsed();
689 let duration_ms = elapsed.as_millis() as u64;
690
691 let result = match result {
694 Ok(output) => parse_step_output(output, step_def),
695 err => err,
696 };
697
698 match &result {
699 Ok(output) => {
700 tracing::info!(
701 step = %step_def.name,
702 step_type = %step_def.step_type,
703 duration_ms = elapsed.as_millis(),
704 sandboxed = use_sandbox,
705 status = "ok",
706 "Step completed"
707 );
708 self.context.store(&step_def.name, output.clone());
709 if let Some(parsed) = extract_parsed_value(output, step_def) {
711 self.context.store_parsed(&step_def.name, parsed);
712 }
713
714 let (it, ot, cost) = token_stats(output);
715 self.step_records.push(StepRecord {
716 name: step_def.name.clone(),
717 step_type: step_def.step_type.to_string(),
718 status: "ok".to_string(),
719 duration_secs: elapsed.as_secs_f64(),
720 output_summary: truncate(output.text(), 100),
721 input_tokens: it,
722 output_tokens: ot,
723 cost_usd: cost,
724 sandboxed: use_sandbox,
725 });
726
727 if let Some(pb) = &pb {
728 display::step_ok(pb, &step_def.name, elapsed);
729 }
730 }
731 Err(StepError::ControlFlow(cf)) => {
732 let msg = match cf {
733 ControlFlow::Skip { message } => format!("skipped: {message}"),
734 ControlFlow::Break { message, .. } => format!("break: {message}"),
735 ControlFlow::Fail { message } => format!("failed: {message}"),
736 ControlFlow::Next { message } => format!("next: {message}"),
737 };
738 tracing::info!(
739 step = %step_def.name,
740 step_type = %step_def.step_type,
741 duration_ms = elapsed.as_millis(),
742 status = "control_flow",
743 message = %msg,
744 "Step control flow"
745 );
746 if let Some(pb) = &pb {
747 display::step_skip(pb, &step_def.name, &msg);
748 }
749 }
750 Err(e) => {
751 tracing::warn!(
752 step = %step_def.name,
753 step_type = %step_def.step_type,
754 duration_ms = elapsed.as_millis(),
755 status = "error",
756 error = %e,
757 "Step failed"
758 );
759 self.step_records.push(StepRecord {
760 name: step_def.name.clone(),
761 step_type: step_def.step_type.to_string(),
762 status: "failed".to_string(),
763 duration_secs: elapsed.as_secs_f64(),
764 output_summary: e.to_string(),
765 input_tokens: None,
766 output_tokens: None,
767 cost_usd: None,
768 sandboxed: use_sandbox,
769 });
770 if let Some(pb) = &pb {
771 display::step_fail(pb, &step_def.name, &e.to_string());
772 }
773 }
774 }
775
776 match &result {
778 Ok(_) => {
779 self.event_bus
780 .emit(Event::StepCompleted {
781 step_name: step_def.name.clone(),
782 step_type: step_def.step_type.to_string(),
783 duration_ms,
784 timestamp: chrono::Utc::now(),
785 })
786 .await;
787 }
788 Err(e) if !matches!(e, StepError::ControlFlow(_)) => {
789 self.event_bus
790 .emit(Event::StepFailed {
791 step_name: step_def.name.clone(),
792 step_type: step_def.step_type.to_string(),
793 error: e.to_string(),
794 duration_ms,
795 timestamp: chrono::Utc::now(),
796 })
797 .await;
798 }
799 _ => {}
800 }
801
802 result
803 }
804
805 pub fn dry_run(&self) {
807 use colored::Colorize;
808
809 println!("{} {} (dry-run)", "▶".cyan().bold(), self.workflow.name.bold());
810 if self.sandbox_mode != SandboxMode::Disabled {
811 println!(" {} Sandbox mode: {:?}", "🔒".cyan(), self.sandbox_mode);
812 }
813 println!();
814
815 let steps = &self.workflow.steps;
816 let total = steps.len();
817 for (i, step) in steps.iter().enumerate() {
818 let is_last = i + 1 == total;
819 let branch = if is_last { "└──" } else { "├──" };
820 let config = self.resolve_config(step);
821
822 let sandbox_indicator = if self.should_sandbox_step(&step.step_type) {
823 " 🐳"
824 } else {
825 ""
826 };
827
828 let async_indicator = if step.async_exec == Some(true) { " ⚡" } else { "" };
829
830 println!(
831 "{} {} {}{}{}",
832 branch.dimmed(),
833 step.name.bold(),
834 format!("[{}]", step.step_type).cyan(),
835 sandbox_indicator,
836 async_indicator
837 );
838
839 let indent = if is_last { " " } else { "│ " };
840 self.print_step_details(step, &config, indent);
841
842 if !is_last {
843 println!("│");
844 }
845 }
846 }
847
848 fn print_step_details(&self, step: &StepDef, config: &StepConfig, indent: &str) {
849 use colored::Colorize;
850
851 match step.step_type {
852 StepType::Cmd => {
853 if let Some(ref run) = step.run {
854 let preview = truncate(run, 80);
855 println!("{} run: {}", indent, preview.dimmed());
856 }
857 }
858 StepType::Agent | StepType::Chat => {
859 if let Some(ref prompt) = step.prompt {
860 let preview = truncate(&prompt.replace('\n', " "), 80);
861 println!("{} prompt: {}", indent, preview.dimmed());
862 }
863 if let Some(model) = config.get_str("model") {
864 println!("{} model: {}", indent, model.dimmed());
865 }
866 }
867 StepType::Gate => {
868 if let Some(ref cond) = step.condition {
869 println!("{} condition: {}", indent, cond.dimmed());
870 }
871 println!(
872 "{} on_pass: {} / on_fail: {}",
873 indent,
874 step.on_pass.as_deref().unwrap_or("continue").dimmed(),
875 step.on_fail.as_deref().unwrap_or("continue").dimmed()
876 );
877 }
878 StepType::Repeat => {
879 let scope_name = step.scope.as_deref().unwrap_or("<none>");
880 let max_iter = step.max_iterations.unwrap_or(1);
881 println!("{} scope: {}", indent, scope_name.dimmed());
882 println!("{} max_iterations: {}", indent, max_iter.to_string().dimmed());
883 self.print_scope_steps(scope_name, indent);
884 }
885 StepType::Map => {
886 let scope_name = step.scope.as_deref().unwrap_or("<none>");
887 let items = step.items.as_deref().unwrap_or("<none>");
888 println!("{} items: {}", indent, items.dimmed());
889 println!("{} scope: {}", indent, scope_name.dimmed());
890 if let Some(p) = step.parallel {
891 println!("{} parallel: {}", indent, p.to_string().dimmed());
892 }
893 self.print_scope_steps(scope_name, indent);
894 }
895 StepType::Call => {
896 let scope_name = step.scope.as_deref().unwrap_or("<none>");
897 println!("{} scope: {}", indent, scope_name.dimmed());
898 self.print_scope_steps(scope_name, indent);
899 }
900 StepType::Parallel => {
901 if let Some(ref sub_steps) = step.steps {
902 println!("{} parallel steps:", indent);
903 for sub in sub_steps {
904 println!(
905 "{} • {} [{}]",
906 indent,
907 sub.name.bold(),
908 sub.step_type.to_string().cyan()
909 );
910 }
911 }
912 }
913 StepType::Template => {
914 if let Some(ref run) = step.run {
915 println!("{} template: {}", indent, run.dimmed());
916 }
917 }
918 StepType::Script => {
919 if let Some(ref run) = step.run {
920 let preview = truncate(&run.replace('\n', " "), 80);
921 println!("{} script: {}", indent, preview.dimmed());
922 }
923 }
924 }
925
926 if let Some(t) = config.get_str("timeout") {
927 println!("{} timeout: {}", indent, t.dimmed());
928 }
929 }
930
931 fn print_scope_steps(&self, scope_name: &str, indent: &str) {
932 use colored::Colorize;
933 if let Some(scope) = self.workflow.scopes.get(scope_name) {
934 println!("{} scope steps:", indent);
935 for step in &scope.steps {
936 println!(
937 "{} • {} [{}]",
938 indent,
939 step.name.bold(),
940 step.step_type.to_string().cyan()
941 );
942 }
943 }
944 }
945
946 fn resolve_config(&self, step_def: &StepDef) -> StepConfig {
947 self.config_manager
948 .resolve(&step_def.name, &step_def.step_type, &step_def.config)
949 }
950
951 fn spawn_async_step(
957 &self,
958 step_def: &StepDef,
959 ) -> JoinHandle<Result<StepOutput, StepError>> {
960 let step = step_def.clone();
961 let config = self.resolve_config(step_def);
962 let target = self
963 .context
964 .get_var("target")
965 .and_then(|v| v.as_str())
966 .unwrap_or("")
967 .to_string();
968
969 tokio::spawn(async move {
970 let ctx = Context::new(target, HashMap::new());
971 match step.step_type {
972 StepType::Cmd => CmdExecutor.execute(&step, &config, &ctx).await,
973 StepType::Agent => AgentExecutor.execute(&step, &config, &ctx).await,
974 StepType::Script => ScriptExecutor.execute(&step, &config, &ctx).await,
975 _ => Err(StepError::Fail(format!(
976 "Async execution not supported for step type '{}'",
977 step.step_type
978 ))),
979 }
980 })
981 }
982
983 async fn await_pending_deps(&mut self, step_def: &StepDef) -> Result<(), StepError> {
986 let pattern = Regex::new(r"steps\.(\w+)\.").unwrap();
987
988 let mut templates: Vec<String> = Vec::new();
990 if let Some(ref run) = step_def.run {
991 templates.push(run.clone());
992 }
993 if let Some(ref prompt) = step_def.prompt {
994 templates.push(prompt.clone());
995 }
996 if let Some(ref condition) = step_def.condition {
997 templates.push(condition.clone());
998 }
999
1000 let mut deps: Vec<String> = Vec::new();
1002 for tmpl in &templates {
1003 for cap in pattern.captures_iter(tmpl) {
1004 let name = cap[1].to_string();
1005 if self.pending_futures.contains_key(&name) && !deps.contains(&name) {
1006 deps.push(name);
1007 }
1008 }
1009 }
1010
1011 for name in deps {
1013 self.await_pending_step(&name).await?;
1014 }
1015
1016 Ok(())
1017 }
1018
1019 async fn await_pending_step(&mut self, name: &str) -> Result<(), StepError> {
1021 if let Some(handle) = self.pending_futures.remove(name) {
1022 match handle.await {
1023 Ok(Ok(output)) => {
1024 self.context.store(name, output.clone());
1025 self.step_records.push(StepRecord {
1026 name: name.to_string(),
1027 step_type: "async".to_string(),
1028 status: "ok".to_string(),
1029 duration_secs: 0.0,
1030 output_summary: truncate(output.text(), 100),
1031 input_tokens: None,
1032 output_tokens: None,
1033 cost_usd: None,
1034 sandboxed: false,
1035 });
1036 }
1037 Ok(Err(e)) => {
1038 return Err(StepError::Fail(format!(
1039 "Async step '{}' failed: {e}",
1040 name
1041 )));
1042 }
1043 Err(e) => {
1044 return Err(StepError::Fail(format!(
1045 "Async step '{}' panicked: {e}",
1046 name
1047 )));
1048 }
1049 }
1050 }
1051 Ok(())
1052 }
1053}
1054
1055fn token_stats(output: &StepOutput) -> (Option<u64>, Option<u64>, Option<f64>) {
1057 match output {
1058 StepOutput::Agent(o) => (
1059 Some(o.stats.input_tokens),
1060 Some(o.stats.output_tokens),
1061 Some(o.stats.cost_usd),
1062 ),
1063 StepOutput::Chat(o) => (Some(o.input_tokens), Some(o.output_tokens), None),
1064 _ => (None, None, None),
1065 }
1066}
1067
1068fn parse_step_output(output: StepOutput, step_def: &StepDef) -> Result<StepOutput, StepError> {
1071 let output_type = match &step_def.output_type {
1072 Some(t) => t,
1073 None => return Ok(output),
1074 };
1075
1076 if *output_type == OutputType::Text {
1077 return Ok(output);
1078 }
1079
1080 let text = output.text().trim().to_string();
1081
1082 match output_type {
1083 OutputType::Integer => {
1084 text.parse::<i64>()
1085 .map_err(|_| StepError::Fail(format!("Failed to parse '{}' as integer", text)))?;
1086 }
1087 OutputType::Json => {
1088 serde_json::from_str::<serde_json::Value>(&text)
1089 .map_err(|e| StepError::Fail(format!("Failed to parse output as JSON: {e}")))?;
1090 }
1091 OutputType::Boolean => {
1092 match text.to_lowercase().as_str() {
1093 "true" | "1" | "yes" | "false" | "0" | "no" => {}
1094 _ => {
1095 return Err(StepError::Fail(format!(
1096 "Failed to parse '{}' as boolean",
1097 text
1098 )));
1099 }
1100 }
1101 }
1102 OutputType::Lines | OutputType::Text => {}
1103 }
1104
1105 Ok(output)
1106}
1107
1108fn extract_parsed_value(output: &StepOutput, step_def: &StepDef) -> Option<ParsedValue> {
1111 let output_type = step_def.output_type.as_ref()?;
1112
1113 let text = output.text().trim().to_string();
1114
1115 let parsed = match output_type {
1116 OutputType::Text => ParsedValue::Text(text),
1117 OutputType::Integer => ParsedValue::Integer(text.parse::<i64>().ok()?),
1118 OutputType::Json => {
1119 let val = serde_json::from_str::<serde_json::Value>(&text).ok()?;
1120 ParsedValue::Json(val)
1121 }
1122 OutputType::Lines => {
1123 let lines: Vec<String> = text
1124 .lines()
1125 .filter(|l| !l.is_empty())
1126 .map(|l| l.to_string())
1127 .collect();
1128 ParsedValue::Lines(lines)
1129 }
1130 OutputType::Boolean => {
1131 let b = match text.to_lowercase().as_str() {
1132 "true" | "1" | "yes" => true,
1133 _ => false,
1134 };
1135 ParsedValue::Boolean(b)
1136 }
1137 };
1138
1139 Some(parsed)
1140}
1141
1142fn truncate(s: &str, max: usize) -> String {
1145 let char_count = s.chars().count();
1146 if char_count <= max {
1147 s.to_string()
1148 } else {
1149 let end: usize = s.char_indices().nth(max).map(|(i, _)| i).unwrap_or(s.len());
1150 format!("{}…", &s[..end])
1151 }
1152}
1153
1154
1155#[cfg(test)]
1156mod tests {
1157 use super::*;
1158 use crate::workflow::parser;
1159
1160 #[tokio::test]
1161 async fn engine_runs_sequential_cmd_steps() {
1162 let yaml = r#"
1163name: test
1164steps:
1165 - name: step1
1166 type: cmd
1167 run: "echo first"
1168 - name: step2
1169 type: cmd
1170 run: "echo second"
1171"#;
1172 let wf = parser::parse_str(yaml).unwrap();
1173 let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true);
1174 let result = engine.run().await.unwrap();
1175 assert_eq!(result.text().trim(), "second");
1176 assert!(engine.context.get_step("step1").is_some());
1177 assert_eq!(
1178 engine.context.get_step("step1").unwrap().text().trim(),
1179 "first"
1180 );
1181 }
1182
1183 #[tokio::test]
1184 async fn engine_exposes_step_output_to_next_step() {
1185 let yaml = r#"
1186name: test
1187steps:
1188 - name: produce
1189 type: cmd
1190 run: "echo hello_world"
1191 - name: consume
1192 type: cmd
1193 run: "echo {{ steps.produce.stdout }}"
1194"#;
1195 let wf = parser::parse_str(yaml).unwrap();
1196 let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true);
1197 let result = engine.run().await.unwrap();
1198 assert!(result.text().contains("hello_world"));
1199 }
1200
1201 #[tokio::test]
1202 async fn engine_collects_step_records_in_json_mode() {
1203 let yaml = r#"
1204name: json-test
1205steps:
1206 - name: alpha
1207 type: cmd
1208 run: "echo alpha"
1209 - name: beta
1210 type: cmd
1211 run: "echo beta"
1212"#;
1213 let wf = parser::parse_str(yaml).unwrap();
1214 let opts = EngineOptions {
1215 json: true,
1216 ..Default::default()
1217 };
1218 let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts);
1219 engine.run().await.unwrap();
1220
1221 let records = engine.step_records();
1222 assert_eq!(records.len(), 2);
1223 assert_eq!(records[0].name, "alpha");
1224 assert_eq!(records[0].status, "ok");
1225 assert!(!records[0].sandboxed);
1226 assert_eq!(records[1].name, "beta");
1227 assert_eq!(records[1].status, "ok");
1228 }
1229
1230 #[tokio::test]
1231 async fn json_output_includes_sandbox_mode() {
1232 let yaml = r#"
1233name: json-output-test
1234steps:
1235 - name: greet
1236 type: cmd
1237 run: "echo hello"
1238"#;
1239 let wf = parser::parse_str(yaml).unwrap();
1240 let opts = EngineOptions {
1241 json: true,
1242 ..Default::default()
1243 };
1244 let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts);
1245 let start = Instant::now();
1246 engine.run().await.unwrap();
1247 let out = engine.json_output("success", start.elapsed());
1248
1249 let json = serde_json::to_string(&out).unwrap();
1250 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1251
1252 assert_eq!(parsed["workflow_name"], "json-output-test");
1253 assert_eq!(parsed["status"], "success");
1254 assert_eq!(parsed["sandbox_mode"], "Disabled");
1255 assert!(parsed["steps"].is_array());
1256 assert_eq!(parsed["steps"][0]["name"], "greet");
1257 }
1258
1259 #[test]
1260 fn should_sandbox_step_logic() {
1261 let yaml = r#"
1262name: test
1263steps:
1264 - name: s
1265 type: cmd
1266 run: "echo test"
1267"#;
1268 let wf = parser::parse_str(yaml).unwrap();
1269
1270 let engine = Engine::new(wf.clone(), "".to_string(), HashMap::new(), false, true);
1272 assert!(!engine.should_sandbox_step(&StepType::Cmd));
1273 assert!(!engine.should_sandbox_step(&StepType::Agent));
1274 assert!(!engine.should_sandbox_step(&StepType::Gate));
1275
1276 let opts = EngineOptions {
1278 sandbox_mode: SandboxMode::FullWorkflow,
1279 quiet: true,
1280 ..Default::default()
1281 };
1282 let engine = Engine::with_options(wf.clone(), "".to_string(), HashMap::new(), opts);
1283 assert!(engine.should_sandbox_step(&StepType::Cmd));
1284 assert!(engine.should_sandbox_step(&StepType::Agent));
1285 assert!(!engine.should_sandbox_step(&StepType::Gate));
1286
1287 let opts = EngineOptions {
1289 sandbox_mode: SandboxMode::AgentOnly,
1290 quiet: true,
1291 ..Default::default()
1292 };
1293 let engine = Engine::with_options(wf.clone(), "".to_string(), HashMap::new(), opts);
1294 assert!(!engine.should_sandbox_step(&StepType::Cmd));
1295 assert!(engine.should_sandbox_step(&StepType::Agent));
1296 assert!(!engine.should_sandbox_step(&StepType::Gate));
1297 }
1298
1299 #[test]
1300 fn dry_run_does_not_panic() {
1301 let yaml = r#"
1302name: dry-run-test
1303scopes:
1304 lint_fix:
1305 steps:
1306 - name: lint
1307 type: cmd
1308 run: "npm run lint"
1309 - name: fix_lint
1310 type: agent
1311 prompt: "Fix lint errors"
1312steps:
1313 - name: setup
1314 type: cmd
1315 run: "echo setup"
1316 - name: validate
1317 type: gate
1318 condition: "{{ steps.setup.exit_code == 0 }}"
1319 on_pass: continue
1320 on_fail: fail
1321 - name: lint_gate
1322 type: repeat
1323 scope: lint_fix
1324 max_iterations: 3
1325"#;
1326 let wf = crate::workflow::parser::parse_str(yaml).unwrap();
1327 let engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true);
1328 engine.dry_run();
1329 }
1330
1331 #[test]
1332 fn dry_run_all_step_types() {
1333 let yaml = r#"
1334name: all-types
1335steps:
1336 - name: c
1337 type: cmd
1338 run: "ls"
1339 - name: g
1340 type: gate
1341 condition: "{{ true }}"
1342 on_pass: continue
1343 - name: p
1344 type: parallel
1345 steps:
1346 - name: p1
1347 type: cmd
1348 run: "echo p1"
1349"#;
1350 let wf = crate::workflow::parser::parse_str(yaml).unwrap();
1351 let engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true);
1352 engine.dry_run();
1353 }
1354
1355 #[test]
1356 fn truncate_helper() {
1357 assert_eq!(truncate("hello", 10), "hello");
1358 assert_eq!(truncate("hello world", 5), "hello…");
1359 }
1360
1361 #[tokio::test]
1362 async fn resume_fails_when_no_state_file() {
1363 let yaml = r#"
1364name: no-state-workflow-xyz-unique
1365steps:
1366 - name: step1
1367 type: cmd
1368 run: "echo 1"
1369"#;
1370 let wf = crate::workflow::parser::parse_str(yaml).unwrap();
1371 let opts = EngineOptions {
1372 resume_from: Some("step1".to_string()),
1373 quiet: true,
1374 ..Default::default()
1375 };
1376 let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts);
1377 let err = engine.run().await.unwrap_err();
1378 assert!(
1379 err.to_string().contains("No state file found"),
1380 "Expected 'No state file found' but got: {err}"
1381 );
1382 }
1383
1384 #[tokio::test]
1385 async fn resume_fails_for_unknown_step() {
1386 let workflow_name = "test-resume-unknown-step";
1387 let state = WorkflowState::new(workflow_name);
1388 let tmp_path = format!("/tmp/minion-{workflow_name}-20991231235959.state.json");
1389 let path = PathBuf::from(&tmp_path);
1390 state.save(&path).unwrap();
1391
1392 let yaml = format!(
1393 r#"
1394name: {workflow_name}
1395steps:
1396 - name: step1
1397 type: cmd
1398 run: "echo 1"
1399"#
1400 );
1401 let wf = crate::workflow::parser::parse_str(&yaml).unwrap();
1402 let opts = EngineOptions {
1403 resume_from: Some("nonexistent_step".to_string()),
1404 quiet: true,
1405 ..Default::default()
1406 };
1407 let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts);
1408 let err = engine.run().await.unwrap_err();
1409 assert!(
1410 err.to_string().contains("not found in workflow"),
1411 "Expected 'not found in workflow' but got: {err}"
1412 );
1413
1414 let _ = std::fs::remove_file(&path);
1415 }
1416
1417 #[tokio::test]
1418 async fn safe_accessor_returns_empty_for_missing_step() {
1419 let yaml = r#"
1420name: test-safe-accessor
1421steps:
1422 - name: use_missing
1423 type: cmd
1424 run: "echo '{{ missing.output? }}'"
1425"#;
1426 let wf = parser::parse_str(yaml).unwrap();
1427 let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true);
1428 let result = engine.run().await.unwrap();
1429 assert_eq!(result.text().trim(), "");
1431 }
1432
1433 #[tokio::test]
1434 async fn safe_accessor_returns_value_when_present() {
1435 let yaml = r#"
1436name: test-safe-accessor-present
1437steps:
1438 - name: produce
1439 type: cmd
1440 run: "echo hello"
1441 - name: consume
1442 type: cmd
1443 run: "echo '{{ produce.output? }}'"
1444"#;
1445 let wf = parser::parse_str(yaml).unwrap();
1446 let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true);
1447 let result = engine.run().await.unwrap();
1448 assert!(result.text().contains("hello"));
1449 }
1450
1451 #[tokio::test]
1452 async fn strict_accessor_fails_when_step_missing() {
1453 let yaml = r#"
1454name: test-strict-accessor-fail
1455steps:
1456 - name: use_missing
1457 type: cmd
1458 run: "echo '{{ nonexistent.output! }}'"
1459"#;
1460 let wf = parser::parse_str(yaml).unwrap();
1461 let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true);
1462 let err = engine.run().await.unwrap_err();
1463 assert!(err.to_string().contains("strict access"), "{err}");
1464 }
1465
1466 #[tokio::test]
1467 async fn output_type_integer_parses_number() {
1468 let yaml = r#"
1469name: test-parse
1470steps:
1471 - name: count
1472 type: cmd
1473 run: "echo 42"
1474 output_type: integer
1475 - name: use_count
1476 type: cmd
1477 run: "echo {{ count.output }}"
1478"#;
1479 let wf = parser::parse_str(yaml).unwrap();
1480 let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true);
1481 let result = engine.run().await.unwrap();
1482 assert_eq!(result.text().trim(), "42");
1483 }
1484
1485 #[tokio::test]
1486 async fn output_type_integer_fails_on_non_number() {
1487 let yaml = r#"
1488name: test-parse-fail
1489steps:
1490 - name: count
1491 type: cmd
1492 run: "echo not_a_number"
1493 output_type: integer
1494"#;
1495 let wf = parser::parse_str(yaml).unwrap();
1496 let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true);
1497 let err = engine.run().await.unwrap_err();
1498 assert!(err.to_string().contains("integer"), "{err}");
1499 }
1500
1501 #[tokio::test]
1502 async fn output_type_json_allows_dot_access() {
1503 let yaml = r#"
1504name: test-json
1505steps:
1506 - name: scan
1507 type: cmd
1508 run: "echo '{\"count\": 5}'"
1509 output_type: json
1510 - name: use_scan
1511 type: cmd
1512 run: "echo {{ scan.output.count }}"
1513"#;
1514 let wf = parser::parse_str(yaml).unwrap();
1515 let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true);
1516 let result = engine.run().await.unwrap();
1517 assert_eq!(result.text().trim(), "5");
1518 }
1519
1520 #[tokio::test]
1521 async fn output_type_lines_allows_length_filter() {
1522 let yaml = r#"
1523name: test-lines
1524steps:
1525 - name: files
1526 type: cmd
1527 run: "printf 'a.rs\nb.rs\nc.rs'"
1528 output_type: lines
1529 - name: count_files
1530 type: cmd
1531 run: "echo {{ files.output | length }}"
1532"#;
1533 let wf = parser::parse_str(yaml).unwrap();
1534 let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true);
1535 let result = engine.run().await.unwrap();
1536 assert_eq!(result.text().trim(), "3");
1537 }
1538
1539 #[tokio::test]
1542 async fn async_step_is_spawned_and_completes() {
1543 let yaml = r#"
1544name: async-test
1545steps:
1546 - name: bg_task
1547 type: cmd
1548 run: "echo async_result"
1549 async_exec: true
1550 - name: sync_step
1551 type: cmd
1552 run: "echo sync_result"
1553"#;
1554 let wf = parser::parse_str(yaml).unwrap();
1555 let opts = EngineOptions { quiet: true, ..Default::default() };
1556 let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts);
1557 let result = engine.run().await.unwrap();
1558 assert!(result.text().contains("sync_result"));
1560 let records = engine.step_records();
1562 assert!(records.iter().any(|r| r.name == "bg_task"), "bg_task should be in records");
1563 }
1564
1565 #[test]
1566 fn dry_run_shows_async_lightning_indicator() {
1567 let yaml = r#"
1568name: dry-async
1569steps:
1570 - name: fast_bg
1571 type: cmd
1572 run: "echo bg"
1573 async_exec: true
1574 - name: normal
1575 type: cmd
1576 run: "echo normal"
1577"#;
1578 let wf = parser::parse_str(yaml).unwrap();
1580 let engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true);
1581 engine.dry_run();
1583 }
1584
1585 #[test]
1586 fn should_sandbox_step_script_always_false() {
1587 let yaml = r#"
1588name: test
1589steps:
1590 - name: s
1591 type: cmd
1592 run: "echo test"
1593"#;
1594 let wf = parser::parse_str(yaml).unwrap();
1595
1596 let opts = EngineOptions {
1598 sandbox_mode: SandboxMode::FullWorkflow,
1599 quiet: true,
1600 ..Default::default()
1601 };
1602 let engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts);
1603 assert!(!engine.should_sandbox_step(&StepType::Script));
1604 }
1605
1606 #[tokio::test]
1609 async fn multiple_async_steps_all_complete_by_workflow_end() {
1610 let yaml = r#"
1611name: multi-async
1612steps:
1613 - name: task_a
1614 type: cmd
1615 run: "echo result_a"
1616 async_exec: true
1617 - name: task_b
1618 type: cmd
1619 run: "echo result_b"
1620 async_exec: true
1621 - name: sync_done
1622 type: cmd
1623 run: "echo done"
1624"#;
1625 let wf = parser::parse_str(yaml).unwrap();
1626 let opts = EngineOptions { quiet: true, ..Default::default() };
1627 let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts);
1628 engine.run().await.unwrap();
1629
1630 let records = engine.step_records();
1631 assert!(records.iter().any(|r| r.name == "task_a"), "task_a should be recorded");
1632 assert!(records.iter().any(|r| r.name == "task_b"), "task_b should be recorded");
1633 assert!(records.iter().any(|r| r.name == "sync_done"), "sync_done should be recorded");
1634 }
1635
1636 #[tokio::test]
1639 async fn engine_dispatches_script_step() {
1640 let yaml = r#"
1641name: script-dispatch
1642steps:
1643 - name: calc
1644 type: script
1645 run: |
1646 let x = 6 * 7;
1647 x.to_string()
1648"#;
1649 let wf = parser::parse_str(yaml).unwrap();
1650 let opts = EngineOptions { quiet: true, ..Default::default() };
1651 let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts);
1652 let result = engine.run().await.unwrap();
1653 assert_eq!(result.text().trim(), "42");
1654 }
1655}