Skip to main content

minion_engine/engine/
mod.rs

1pub mod context;
2pub mod state;
3mod template;
4
5use std::collections::HashMap;
6use std::path::PathBuf;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use anyhow::{bail, Result};
11use colored::Colorize;
12use regex::Regex;
13use serde::Serialize;
14use tokio::sync::Mutex;
15use tokio::task::JoinHandle;
16
17use crate::cli::display;
18use crate::config::{ConfigManager, StepConfig};
19use crate::control_flow::ControlFlow;
20use crate::error::StepError;
21use crate::events::subscribers::{FileSubscriber, WebhookSubscriber};
22use crate::events::types::Event;
23use crate::events::EventBus;
24use crate::plugins::registry::PluginRegistry;
25use crate::sandbox::config::SandboxConfig;
26use crate::sandbox::docker::DockerSandbox;
27use crate::sandbox::SandboxMode;
28use crate::steps::*;
29use crate::steps::{
30    agent::AgentExecutor, call::CallExecutor, chat::ChatExecutor, cmd::CmdExecutor,
31    gate::GateExecutor, map::MapExecutor, parallel::ParallelExecutor,
32    repeat::RepeatExecutor, script::ScriptExecutor, template_step::TemplateStepExecutor,
33};
34use crate::plugins::loader::PluginLoader;
35use crate::workflow::schema::{OutputType, StepDef, StepType, WorkflowDef};
36use context::Context;
37use state::WorkflowState;
38
39/// Options for configuring the Engine
40#[derive(Debug, Default)]
41pub struct EngineOptions {
42    pub verbose: bool,
43    pub quiet: bool,
44    /// Suppress display and emit JSON summary at end
45    pub json: bool,
46    /// Skip execution and show step tree
47    pub dry_run: bool,
48    /// Resume from this step name (requires a state file)
49    pub resume_from: Option<String>,
50    /// Sandbox mode resolved from CLI + config
51    pub sandbox_mode: SandboxMode,
52}
53
54/// Per-step execution record collected for JSON output
55#[derive(Debug, Clone, Serialize)]
56pub struct StepRecord {
57    pub name: String,
58    pub step_type: String,
59    pub status: String,
60    pub duration_secs: f64,
61    pub output_summary: String,
62    #[serde(skip_serializing_if = "Option::is_none")]
63    pub input_tokens: Option<u64>,
64    #[serde(skip_serializing_if = "Option::is_none")]
65    pub output_tokens: Option<u64>,
66    #[serde(skip_serializing_if = "Option::is_none")]
67    pub cost_usd: Option<f64>,
68    /// Whether this step ran inside the Docker sandbox
69    #[serde(skip_serializing_if = "std::ops::Not::not")]
70    pub sandboxed: bool,
71}
72
73/// Full workflow JSON output (--json mode)
74#[derive(Debug, Serialize)]
75pub struct WorkflowJsonOutput {
76    pub workflow_name: String,
77    pub status: String,
78    pub sandbox_mode: String,
79    pub steps: Vec<StepRecord>,
80    pub total_duration_secs: f64,
81    pub total_tokens: u64,
82    pub total_cost_usd: f64,
83}
84
85pub struct Engine {
86    pub workflow: WorkflowDef,
87    pub context: Context,
88    config_manager: ConfigManager,
89    pub verbose: bool,
90    pub quiet: bool,
91    pub json: bool,
92    pub dry_run: bool,
93    resume_from: Option<String>,
94    sandbox_mode: SandboxMode,
95    /// Shared Docker sandbox instance (created during run(), destroyed on completion)
96    sandbox: SharedSandbox,
97    step_records: Vec<StepRecord>,
98    state: Option<WorkflowState>,
99    state_file: Option<PathBuf>,
100    /// Pending async step futures — keyed by step name
101    pending_futures: HashMap<String, JoinHandle<Result<StepOutput, StepError>>>,
102    /// Plugin registry for dynamically-loaded step types
103    plugin_registry: Arc<Mutex<PluginRegistry>>,
104    /// Event bus for lifecycle events
105    pub event_bus: EventBus,
106}
107
108impl Engine {
109    pub fn new(
110        workflow: WorkflowDef,
111        target: String,
112        vars: HashMap<String, serde_json::Value>,
113        verbose: bool,
114        quiet: bool,
115    ) -> Self {
116        let options = EngineOptions {
117            verbose,
118            quiet,
119            ..Default::default()
120        };
121        Self::with_options(workflow, target, vars, options)
122    }
123
124    pub fn with_options(
125        workflow: WorkflowDef,
126        target: String,
127        vars: HashMap<String, serde_json::Value>,
128        options: EngineOptions,
129    ) -> Self {
130        let context = Context::new(target, vars);
131        let config_manager = ConfigManager::new(workflow.config.clone());
132        // JSON mode implies quiet (no decorative output)
133        let quiet = options.quiet || options.json;
134
135        // ── Load plugins from workflow config ─────────────────────────────────
136        let mut registry = PluginRegistry::new();
137        for plugin_cfg in &workflow.config.plugins {
138            match PluginLoader::load_plugin(&plugin_cfg.path) {
139                Ok(plugin) => {
140                    tracing::info!(name = %plugin_cfg.name, path = %plugin_cfg.path, "Loaded plugin");
141                    registry.register(plugin);
142                }
143                Err(e) => {
144                    tracing::warn!(
145                        name = %plugin_cfg.name,
146                        path = %plugin_cfg.path,
147                        error = %e,
148                        "Failed to load plugin"
149                    );
150                }
151            }
152        }
153
154        // ── Wire up event subscribers from workflow config ─────────────────────
155        let mut event_bus = EventBus::new();
156        if let Some(ref events_cfg) = workflow.config.events {
157            if let Some(ref webhook_url) = events_cfg.webhook {
158                event_bus.add_subscriber(Box::new(WebhookSubscriber::new(webhook_url.clone())));
159                tracing::info!(url = %webhook_url, "Registered webhook event subscriber");
160            }
161            if let Some(ref file_path) = events_cfg.file {
162                event_bus.add_subscriber(Box::new(FileSubscriber::new(file_path.clone())));
163                tracing::info!(path = %file_path, "Registered file event subscriber");
164            }
165        }
166
167        Self {
168            workflow,
169            context,
170            config_manager,
171            verbose: options.verbose,
172            quiet,
173            json: options.json,
174            dry_run: options.dry_run,
175            resume_from: options.resume_from,
176            sandbox_mode: options.sandbox_mode,
177            sandbox: None,
178            step_records: Vec::new(),
179            state: None,
180            state_file: None,
181            pending_futures: HashMap::new(),
182            plugin_registry: Arc::new(Mutex::new(registry)),
183            event_bus,
184        }
185    }
186
187    /// Return collected step records (for JSON output or testing)
188    pub fn step_records(&self) -> &[StepRecord] {
189        &self.step_records
190    }
191
192    /// Build the JSON summary after execution
193    pub fn json_output(&self, status: &str, total_duration: Duration) -> WorkflowJsonOutput {
194        let total_tokens: u64 = self
195            .step_records
196            .iter()
197            .map(|r| r.input_tokens.unwrap_or(0) + r.output_tokens.unwrap_or(0))
198            .sum();
199        let total_cost: f64 = self
200            .step_records
201            .iter()
202            .filter_map(|r| r.cost_usd)
203            .sum();
204
205        WorkflowJsonOutput {
206            workflow_name: self.workflow.name.clone(),
207            status: status.to_string(),
208            sandbox_mode: format!("{:?}", self.sandbox_mode),
209            steps: self.step_records.clone(),
210            total_duration_secs: total_duration.as_secs_f64(),
211            total_tokens,
212            total_cost_usd: total_cost,
213        }
214    }
215
216    // ── Sandbox Lifecycle ────────────────────────────────────────────────────
217
218    /// Create and start the Docker sandbox container.
219    /// Copies the current working directory into the container as /workspace.
220    async fn sandbox_up(&mut self) -> Result<()> {
221        let sandbox_config = SandboxConfig::from_global_config(&self.workflow.config.global);
222        let workspace = std::env::current_dir()
223            .map(|p| p.to_string_lossy().to_string())
224            .unwrap_or_else(|_| ".".to_string());
225
226        let mut docker = DockerSandbox::new(sandbox_config, &workspace);
227
228        if !self.quiet {
229            println!("  {} Creating Docker sandbox container…", "🐳".cyan());
230        }
231
232        let t0 = Instant::now();
233        docker.create().await?;
234        let create_ms = t0.elapsed().as_millis();
235
236        let t1 = Instant::now();
237        docker.copy_workspace(&workspace).await?;
238        let copy_ms = t1.elapsed().as_millis();
239
240        // Configure Git safe.directory inside the container so that
241        // Git/gh commands work on the copied workspace (avoids
242        // "dubious ownership" errors when host UID != container UID).
243        // Also set user.name/email defaults for any git operations.
244        let t2 = Instant::now();
245        let _ = docker
246            .run_command(
247                "git config --global --add safe.directory /workspace \
248                 && git config --global user.name 'Minion Engine' \
249                 && git config --global user.email 'minion@localhost'",
250            )
251            .await;
252        let git_ms = t2.elapsed().as_millis();
253
254        let total_ms = t0.elapsed().as_millis();
255
256        if !self.quiet {
257            println!(
258                "  {} Sandbox ready — container {:.1}s, copy {:.1}s, git {:.1}s (total {:.1}s)",
259                "🔒".green(),
260                create_ms as f64 / 1000.0,
261                copy_ms as f64 / 1000.0,
262                git_ms as f64 / 1000.0,
263                total_ms as f64 / 1000.0,
264            );
265        }
266
267        tracing::info!(
268            create_ms,
269            copy_ms,
270            git_ms,
271            total_ms,
272            "Sandbox setup complete"
273        );
274
275        self.sandbox = Some(Arc::new(Mutex::new(docker)));
276        Ok(())
277    }
278
279    /// Copy results from sandbox back to host, then destroy the container.
280    async fn sandbox_down(&mut self) -> Result<()> {
281        if let Some(sb) = self.sandbox.take() {
282            let mut docker = sb.lock().await;
283
284            let workspace = std::env::current_dir()
285                .map(|p| p.to_string_lossy().to_string())
286                .unwrap_or_else(|_| ".".to_string());
287
288            if !self.quiet {
289                println!("  {} Copying results from sandbox…", "📦".cyan());
290            }
291
292            let t0 = Instant::now();
293            docker.copy_results(&workspace).await?;
294            let copy_back_ms = t0.elapsed().as_millis();
295
296            let t1 = Instant::now();
297            docker.destroy().await?;
298            let destroy_ms = t1.elapsed().as_millis();
299
300            if !self.quiet {
301                println!(
302                    "  {} Sandbox destroyed — copy-back {:.1}s, destroy {:.1}s",
303                    "🗑️ ".dimmed(),
304                    copy_back_ms as f64 / 1000.0,
305                    destroy_ms as f64 / 1000.0,
306                );
307            }
308
309            tracing::info!(copy_back_ms, destroy_ms, "Sandbox teardown complete");
310        }
311        Ok(())
312    }
313
314    /// Determine whether a step should run inside the sandbox based on sandbox_mode
315    fn should_sandbox_step(&self, step_type: &StepType) -> bool {
316        // Script steps run embedded (no external process), never sandboxed
317        if *step_type == StepType::Script {
318            return false;
319        }
320        match self.sandbox_mode {
321            SandboxMode::Disabled => false,
322            SandboxMode::FullWorkflow | SandboxMode::Devbox => {
323                // ALL executable steps run inside the sandbox
324                matches!(step_type, StepType::Cmd | StepType::Agent)
325            }
326            SandboxMode::AgentOnly => {
327                // Only agent steps run inside the sandbox; cmd steps run on host
328                matches!(step_type, StepType::Agent)
329            }
330        }
331    }
332
333    // ── Main Run Loop ────────────────────────────────────────────────────────
334
335    pub async fn run(&mut self) -> Result<StepOutput> {
336        // ── State / Resume setup ──────────────────────────────────────────────
337        let state_file = WorkflowState::state_file_path(&self.workflow.name);
338        self.state_file = Some(state_file.clone());
339
340        let mut loaded_state: Option<WorkflowState> = None;
341        if let Some(ref resume_step) = self.resume_from.clone() {
342            match WorkflowState::find_latest(&self.workflow.name) {
343                Some(path) => {
344                    match WorkflowState::load(&path) {
345                        Ok(s) => {
346                            let exists = self.workflow.steps.iter().any(|s| &s.name == resume_step);
347                            if !exists {
348                                bail!(
349                                    "Resume step '{}' not found in workflow '{}'. \
350                                     Available steps: {}",
351                                    resume_step,
352                                    self.workflow.name,
353                                    self.workflow.steps.iter().map(|s| s.name.as_str()).collect::<Vec<_>>().join(", ")
354                                );
355                            }
356                            if !self.quiet {
357                                println!(
358                                    "  {} Resuming from step '{}' (state: {})",
359                                    "↺".cyan(),
360                                    resume_step,
361                                    path.display()
362                                );
363                            }
364                            loaded_state = Some(s);
365                        }
366                        Err(e) => bail!("Failed to load state file {}: {e}", path.display()),
367                    }
368                }
369                None => {
370                    bail!(
371                        "No state file found for workflow '{}'. \
372                         Cannot resume. Run the workflow without --resume first.",
373                        self.workflow.name
374                    );
375                }
376            }
377        }
378
379        // ── Initialize persisted state for this run ───────────────────────────
380        let mut current_state = WorkflowState::new(&self.workflow.name);
381
382        // ── Display ───────────────────────────────────────────────────────────
383        if !self.quiet {
384            display::workflow_start(&self.workflow.name);
385            if self.sandbox_mode != SandboxMode::Disabled {
386                println!("  {} Sandbox mode: {:?}", "🔒".cyan(), self.sandbox_mode);
387            }
388        }
389
390        // ── Sandbox: Create container BEFORE step execution ───────────────────
391        if self.sandbox_mode != SandboxMode::Disabled {
392            self.sandbox_up().await?;
393        }
394
395        // ── Event: WorkflowStarted ────────────────────────────────────────────
396        self.event_bus
397            .emit(Event::WorkflowStarted {
398                timestamp: chrono::Utc::now(),
399            })
400            .await;
401
402        let start = Instant::now();
403        let steps = self.workflow.steps.clone();
404        let mut last_output = StepOutput::Empty;
405        let mut step_count = 0;
406
407        let resume_from = self.resume_from.clone();
408        let mut resuming = resume_from.is_some();
409
410        // ── Execute steps ─────────────────────────────────────────────────────
411        let run_result: Result<(), anyhow::Error> = async {
412            for step_def in &steps {
413                // ── Resume: skip steps before the resume point ────────────────
414                if resuming {
415                    let is_resume_point = resume_from.as_deref() == Some(&step_def.name);
416                    if !is_resume_point {
417                        if let Some(ref ls) = loaded_state {
418                            if let Some(output) = ls.steps.get(&step_def.name) {
419                                self.context.store(&step_def.name, output.clone());
420                                if !self.quiet {
421                                    println!(
422                                        "  {} {} {}",
423                                        "⏭".yellow(),
424                                        step_def.name,
425                                        "(skipped — loaded from state)".dimmed()
426                                    );
427                                }
428                                self.step_records.push(StepRecord {
429                                    name: step_def.name.clone(),
430                                    step_type: step_def.step_type.to_string(),
431                                    status: "skipped_resume".to_string(),
432                                    duration_secs: 0.0,
433                                    output_summary: truncate(output.text(), 100),
434                                    input_tokens: None,
435                                    output_tokens: None,
436                                    cost_usd: None,
437                                    sandboxed: false,
438                                });
439                            }
440                        }
441                        continue;
442                    }
443                    resuming = false;
444                }
445
446                // ── Async step: spawn and register in pending_futures ─────────
447                if step_def.async_exec == Some(true) {
448                    let handle = self.spawn_async_step(step_def);
449                    self.pending_futures.insert(step_def.name.clone(), handle);
450                    if !self.quiet {
451                        println!(
452                            "  {} {} {} {}",
453                            "⚡".yellow(),
454                            step_def.name,
455                            format!("[{}]", step_def.step_type).cyan(),
456                            "(async — spawned)".dimmed()
457                        );
458                    }
459                    step_count += 1;
460                    continue;
461                }
462
463                // ── Auto-await: resolve pending deps before execute ────────────
464                self.await_pending_deps(step_def).await?;
465
466                match self.execute_step(step_def).await {
467                    Ok(output) => {
468                        current_state.steps.insert(step_def.name.clone(), output.clone());
469                        if let Some(ref p) = self.state_file {
470                            let _ = current_state.save(p);
471                        }
472                        last_output = output;
473                        step_count += 1;
474                    }
475                    Err(StepError::ControlFlow(ControlFlow::Skip { message })) => {
476                        self.context.store(&step_def.name, StepOutput::Empty);
477                        if !self.quiet {
478                            let pb = display::step_start(&step_def.name, &step_def.step_type.to_string());
479                            display::step_skip(&pb, &step_def.name, &message);
480                        }
481                        self.step_records.push(StepRecord {
482                            name: step_def.name.clone(),
483                            step_type: step_def.step_type.to_string(),
484                            status: "skipped".to_string(),
485                            duration_secs: 0.0,
486                            output_summary: message.clone(),
487                            input_tokens: None,
488                            output_tokens: None,
489                            cost_usd: None,
490                            sandboxed: false,
491                        });
492                    }
493                    Err(StepError::ControlFlow(ControlFlow::Fail { message })) => {
494                        if !self.quiet {
495                            display::workflow_failed(&step_def.name, &message);
496                        }
497                        bail!("Step '{}' failed: {}", step_def.name, message);
498                    }
499                    Err(StepError::ControlFlow(ControlFlow::Break { .. })) => {
500                        break;
501                    }
502                    Err(e) => {
503                        if !self.quiet {
504                            display::workflow_failed(&step_def.name, &e.to_string());
505                        }
506                        return Err(e.into());
507                    }
508                }
509            }
510            Ok(())
511        }
512        .await;
513
514        // ── Story 3.3: Await all remaining pending async futures ──────────────
515        let remaining: Vec<(String, JoinHandle<Result<StepOutput, StepError>>)> =
516            self.pending_futures.drain().collect();
517        for (name, handle) in remaining {
518            let step_type = self
519                .workflow
520                .steps
521                .iter()
522                .find(|s| s.name == name)
523                .map(|s| s.step_type.to_string())
524                .unwrap_or_else(|| "async".to_string());
525            match handle.await {
526                Ok(Ok(output)) => {
527                    self.context.store(&name, output.clone());
528                    self.step_records.push(StepRecord {
529                        name: name.clone(),
530                        step_type: step_type.clone(),
531                        status: "ok".to_string(),
532                        duration_secs: 0.0,
533                        output_summary: truncate(output.text(), 100),
534                        input_tokens: None,
535                        output_tokens: None,
536                        cost_usd: None,
537                        sandboxed: false,
538                    });
539                }
540                Ok(Err(e)) => {
541                    self.step_records.push(StepRecord {
542                        name: name.clone(),
543                        step_type: step_type.clone(),
544                        status: "failed".to_string(),
545                        duration_secs: 0.0,
546                        output_summary: e.to_string(),
547                        input_tokens: None,
548                        output_tokens: None,
549                        cost_usd: None,
550                        sandboxed: false,
551                    });
552                    if !self.quiet {
553                        eprintln!("  {} Async step '{}' failed: {}", "✗".red(), name, e);
554                    }
555                }
556                Err(e) => {
557                    let msg = format!("Async step '{}' panicked: {e}", name);
558                    self.step_records.push(StepRecord {
559                        name: name.clone(),
560                        step_type,
561                        status: "failed".to_string(),
562                        duration_secs: 0.0,
563                        output_summary: msg.clone(),
564                        input_tokens: None,
565                        output_tokens: None,
566                        cost_usd: None,
567                        sandboxed: false,
568                    });
569                    if !self.quiet {
570                        eprintln!("  {} {}", "✗".red(), msg);
571                    }
572                }
573            }
574        }
575
576        // ── Sandbox: Destroy container AFTER all steps (always, even on error) ─
577        if self.sandbox_mode != SandboxMode::Disabled {
578            if let Err(e) = self.sandbox_down().await {
579                if !self.quiet {
580                    eprintln!("  {} Sandbox cleanup warning: {e}", "⚠".yellow());
581                }
582            }
583        }
584
585        // ── Event: WorkflowCompleted ──────────────────────────────────────────
586        self.event_bus
587            .emit(Event::WorkflowCompleted {
588                duration_ms: start.elapsed().as_millis() as u64,
589                timestamp: chrono::Utc::now(),
590            })
591            .await;
592
593        // Propagate any error from the step loop
594        run_result?;
595
596        if !self.quiet {
597            display::workflow_done(start.elapsed(), step_count);
598        }
599
600        self.state = Some(current_state);
601        Ok(last_output)
602    }
603
604    pub async fn execute_step(&mut self, step_def: &StepDef) -> Result<StepOutput, StepError> {
605        let config = self.resolve_config(step_def);
606        let use_sandbox = self.should_sandbox_step(&step_def.step_type);
607
608        let pb = if !self.quiet {
609            let label = if use_sandbox {
610                format!("{} 🐳", step_def.step_type)
611            } else {
612                step_def.step_type.to_string()
613            };
614            Some(display::step_start(&step_def.name, &label))
615        } else {
616            None
617        };
618
619        let start = Instant::now();
620
621        // ── Event: StepStarted ────────────────────────────────────────────────
622        self.event_bus
623            .emit(Event::StepStarted {
624                step_name: step_def.name.clone(),
625                step_type: step_def.step_type.to_string(),
626                timestamp: chrono::Utc::now(),
627            })
628            .await;
629
630        tracing::debug!(
631            step = %step_def.name,
632            step_type = %step_def.step_type,
633            sandboxed = use_sandbox,
634            "Executing step"
635        );
636
637        // Choose sandbox-aware execution when sandbox is active for this step
638        let sandbox_ref = if use_sandbox { &self.sandbox } else { &None };
639
640        let result = match step_def.step_type {
641            StepType::Cmd => {
642                CmdExecutor
643                    .execute_sandboxed(step_def, &config, &self.context, sandbox_ref)
644                    .await
645            }
646            StepType::Agent => {
647                AgentExecutor
648                    .execute_sandboxed(step_def, &config, &self.context, sandbox_ref)
649                    .await
650            }
651            StepType::Gate => GateExecutor.execute(step_def, &config, &self.context).await,
652            StepType::Repeat => {
653                RepeatExecutor::new(&self.workflow.scopes)
654                    .execute(step_def, &config, &self.context)
655                    .await
656            }
657            StepType::Chat => {
658                ChatExecutor.execute(step_def, &config, &self.context).await
659            }
660            StepType::Map => {
661                MapExecutor::new(&self.workflow.scopes)
662                    .execute(step_def, &config, &self.context)
663                    .await
664            }
665            StepType::Parallel => {
666                ParallelExecutor::new(&self.workflow.scopes)
667                    .execute(step_def, &config, &self.context)
668                    .await
669            }
670            StepType::Call => {
671                CallExecutor::new(&self.workflow.scopes)
672                    .execute(step_def, &config, &self.context)
673                    .await
674            }
675            StepType::Template => {
676                let prompts_dir = self.workflow.prompts_dir.as_deref();
677                TemplateStepExecutor::new(prompts_dir)
678                    .execute(step_def, &config, &self.context)
679                    .await
680            }
681            StepType::Script => {
682                ScriptExecutor.execute(step_def, &config, &self.context).await
683            }
684            // Note: all StepType variants are covered above.
685            // Plugin dispatch will be added via a dedicated StepType::Plugin variant.
686        };
687
688        let elapsed = start.elapsed();
689        let duration_ms = elapsed.as_millis() as u64;
690
691        // ── Output Parsing Section ────────────────────────────────────────────
692        // Parse step output according to output_type (if declared)
693        let result = match result {
694            Ok(output) => parse_step_output(output, step_def),
695            err => err,
696        };
697
698        match &result {
699            Ok(output) => {
700                tracing::info!(
701                    step = %step_def.name,
702                    step_type = %step_def.step_type,
703                    duration_ms = elapsed.as_millis(),
704                    sandboxed = use_sandbox,
705                    status = "ok",
706                    "Step completed"
707                );
708                self.context.store(&step_def.name, output.clone());
709                // Store parsed value separately if present
710                if let Some(parsed) = extract_parsed_value(output, step_def) {
711                    self.context.store_parsed(&step_def.name, parsed);
712                }
713
714                let (it, ot, cost) = token_stats(output);
715                self.step_records.push(StepRecord {
716                    name: step_def.name.clone(),
717                    step_type: step_def.step_type.to_string(),
718                    status: "ok".to_string(),
719                    duration_secs: elapsed.as_secs_f64(),
720                    output_summary: truncate(output.text(), 100),
721                    input_tokens: it,
722                    output_tokens: ot,
723                    cost_usd: cost,
724                    sandboxed: use_sandbox,
725                });
726
727                if let Some(pb) = &pb {
728                    display::step_ok(pb, &step_def.name, elapsed);
729                }
730            }
731            Err(StepError::ControlFlow(cf)) => {
732                let msg = match cf {
733                    ControlFlow::Skip { message } => format!("skipped: {message}"),
734                    ControlFlow::Break { message, .. } => format!("break: {message}"),
735                    ControlFlow::Fail { message } => format!("failed: {message}"),
736                    ControlFlow::Next { message } => format!("next: {message}"),
737                };
738                tracing::info!(
739                    step = %step_def.name,
740                    step_type = %step_def.step_type,
741                    duration_ms = elapsed.as_millis(),
742                    status = "control_flow",
743                    message = %msg,
744                    "Step control flow"
745                );
746                if let Some(pb) = &pb {
747                    display::step_skip(pb, &step_def.name, &msg);
748                }
749            }
750            Err(e) => {
751                tracing::warn!(
752                    step = %step_def.name,
753                    step_type = %step_def.step_type,
754                    duration_ms = elapsed.as_millis(),
755                    status = "error",
756                    error = %e,
757                    "Step failed"
758                );
759                self.step_records.push(StepRecord {
760                    name: step_def.name.clone(),
761                    step_type: step_def.step_type.to_string(),
762                    status: "failed".to_string(),
763                    duration_secs: elapsed.as_secs_f64(),
764                    output_summary: e.to_string(),
765                    input_tokens: None,
766                    output_tokens: None,
767                    cost_usd: None,
768                    sandboxed: use_sandbox,
769                });
770                if let Some(pb) = &pb {
771                    display::step_fail(pb, &step_def.name, &e.to_string());
772                }
773            }
774        }
775
776        // ── Event: StepCompleted / StepFailed ─────────────────────────────────
777        match &result {
778            Ok(_) => {
779                self.event_bus
780                    .emit(Event::StepCompleted {
781                        step_name: step_def.name.clone(),
782                        step_type: step_def.step_type.to_string(),
783                        duration_ms,
784                        timestamp: chrono::Utc::now(),
785                    })
786                    .await;
787            }
788            Err(e) if !matches!(e, StepError::ControlFlow(_)) => {
789                self.event_bus
790                    .emit(Event::StepFailed {
791                        step_name: step_def.name.clone(),
792                        step_type: step_def.step_type.to_string(),
793                        error: e.to_string(),
794                        duration_ms,
795                        timestamp: chrono::Utc::now(),
796                    })
797                    .await;
798            }
799            _ => {}
800        }
801
802        result
803    }
804
805    /// Dry-run: walk all steps and print a visual tree without executing anything.
806    pub fn dry_run(&self) {
807        use colored::Colorize;
808
809        println!("{} {} (dry-run)", "▶".cyan().bold(), self.workflow.name.bold());
810        if self.sandbox_mode != SandboxMode::Disabled {
811            println!("  {} Sandbox mode: {:?}", "🔒".cyan(), self.sandbox_mode);
812        }
813        println!();
814
815        let steps = &self.workflow.steps;
816        let total = steps.len();
817        for (i, step) in steps.iter().enumerate() {
818            let is_last = i + 1 == total;
819            let branch = if is_last { "└──" } else { "├──" };
820            let config = self.resolve_config(step);
821
822            let sandbox_indicator = if self.should_sandbox_step(&step.step_type) {
823                " 🐳"
824            } else {
825                ""
826            };
827
828            let async_indicator = if step.async_exec == Some(true) { " ⚡" } else { "" };
829
830            println!(
831                "{} {} {}{}{}",
832                branch.dimmed(),
833                step.name.bold(),
834                format!("[{}]", step.step_type).cyan(),
835                sandbox_indicator,
836                async_indicator
837            );
838
839            let indent = if is_last { "    " } else { "│   " };
840            self.print_step_details(step, &config, indent);
841
842            if !is_last {
843                println!("│");
844            }
845        }
846    }
847
848    fn print_step_details(&self, step: &StepDef, config: &StepConfig, indent: &str) {
849        use colored::Colorize;
850
851        match step.step_type {
852            StepType::Cmd => {
853                if let Some(ref run) = step.run {
854                    let preview = truncate(run, 80);
855                    println!("{}  run: {}", indent, preview.dimmed());
856                }
857            }
858            StepType::Agent | StepType::Chat => {
859                if let Some(ref prompt) = step.prompt {
860                    let preview = truncate(&prompt.replace('\n', " "), 80);
861                    println!("{}  prompt: {}", indent, preview.dimmed());
862                }
863                if let Some(model) = config.get_str("model") {
864                    println!("{}  model: {}", indent, model.dimmed());
865                }
866            }
867            StepType::Gate => {
868                if let Some(ref cond) = step.condition {
869                    println!("{}  condition: {}", indent, cond.dimmed());
870                }
871                println!(
872                    "{}  on_pass: {} / on_fail: {}",
873                    indent,
874                    step.on_pass.as_deref().unwrap_or("continue").dimmed(),
875                    step.on_fail.as_deref().unwrap_or("continue").dimmed()
876                );
877            }
878            StepType::Repeat => {
879                let scope_name = step.scope.as_deref().unwrap_or("<none>");
880                let max_iter = step.max_iterations.unwrap_or(1);
881                println!("{}  scope: {}", indent, scope_name.dimmed());
882                println!("{}  max_iterations: {}", indent, max_iter.to_string().dimmed());
883                self.print_scope_steps(scope_name, indent);
884            }
885            StepType::Map => {
886                let scope_name = step.scope.as_deref().unwrap_or("<none>");
887                let items = step.items.as_deref().unwrap_or("<none>");
888                println!("{}  items: {}", indent, items.dimmed());
889                println!("{}  scope: {}", indent, scope_name.dimmed());
890                if let Some(p) = step.parallel {
891                    println!("{}  parallel: {}", indent, p.to_string().dimmed());
892                }
893                self.print_scope_steps(scope_name, indent);
894            }
895            StepType::Call => {
896                let scope_name = step.scope.as_deref().unwrap_or("<none>");
897                println!("{}  scope: {}", indent, scope_name.dimmed());
898                self.print_scope_steps(scope_name, indent);
899            }
900            StepType::Parallel => {
901                if let Some(ref sub_steps) = step.steps {
902                    println!("{}  parallel steps:", indent);
903                    for sub in sub_steps {
904                        println!(
905                            "{}    • {} [{}]",
906                            indent,
907                            sub.name.bold(),
908                            sub.step_type.to_string().cyan()
909                        );
910                    }
911                }
912            }
913            StepType::Template => {
914                if let Some(ref run) = step.run {
915                    println!("{}  template: {}", indent, run.dimmed());
916                }
917            }
918        StepType::Script => {
919                if let Some(ref run) = step.run {
920                    let preview = truncate(&run.replace('\n', " "), 80);
921                    println!("{}  script: {}", indent, preview.dimmed());
922                }
923            }
924        }
925
926        if let Some(t) = config.get_str("timeout") {
927            println!("{}  timeout: {}", indent, t.dimmed());
928        }
929    }
930
931    fn print_scope_steps(&self, scope_name: &str, indent: &str) {
932        use colored::Colorize;
933        if let Some(scope) = self.workflow.scopes.get(scope_name) {
934            println!("{}  scope steps:", indent);
935            for step in &scope.steps {
936                println!(
937                    "{}    • {} [{}]",
938                    indent,
939                    step.name.bold(),
940                    step.step_type.to_string().cyan()
941                );
942            }
943        }
944    }
945
946    fn resolve_config(&self, step_def: &StepDef) -> StepConfig {
947        self.config_manager
948            .resolve(&step_def.name, &step_def.step_type, &step_def.config)
949    }
950
951    // ── Async Step Support ───────────────────────────────────────────────────
952
953    /// Spawn an async step as a tokio task. Returns a JoinHandle for later awaiting.
954    /// Creates a minimal context (target only) for the spawned task since the
955    /// executor templates are rendered inside the task.
956    fn spawn_async_step(
957        &self,
958        step_def: &StepDef,
959    ) -> JoinHandle<Result<StepOutput, StepError>> {
960        let step = step_def.clone();
961        let config = self.resolve_config(step_def);
962        let target = self
963            .context
964            .get_var("target")
965            .and_then(|v| v.as_str())
966            .unwrap_or("")
967            .to_string();
968
969        tokio::spawn(async move {
970            let ctx = Context::new(target, HashMap::new());
971            match step.step_type {
972                StepType::Cmd => CmdExecutor.execute(&step, &config, &ctx).await,
973                StepType::Agent => AgentExecutor.execute(&step, &config, &ctx).await,
974                StepType::Script => ScriptExecutor.execute(&step, &config, &ctx).await,
975                _ => Err(StepError::Fail(format!(
976                    "Async execution not supported for step type '{}'",
977                    step.step_type
978                ))),
979            }
980        })
981    }
982
983    /// Scan the step's template fields for references to other steps.
984    /// If any referenced step is in pending_futures, await it and store result in context.
985    async fn await_pending_deps(&mut self, step_def: &StepDef) -> Result<(), StepError> {
986        let pattern = Regex::new(r"steps\.(\w+)\.").unwrap();
987
988        // Collect all template fields that might reference step outputs
989        let mut templates: Vec<String> = Vec::new();
990        if let Some(ref run) = step_def.run {
991            templates.push(run.clone());
992        }
993        if let Some(ref prompt) = step_def.prompt {
994            templates.push(prompt.clone());
995        }
996        if let Some(ref condition) = step_def.condition {
997            templates.push(condition.clone());
998        }
999
1000        // Find all step names referenced in templates
1001        let mut deps: Vec<String> = Vec::new();
1002        for tmpl in &templates {
1003            for cap in pattern.captures_iter(tmpl) {
1004                let name = cap[1].to_string();
1005                if self.pending_futures.contains_key(&name) && !deps.contains(&name) {
1006                    deps.push(name);
1007                }
1008            }
1009        }
1010
1011        // Await each dependency
1012        for name in deps {
1013            self.await_pending_step(&name).await?;
1014        }
1015
1016        Ok(())
1017    }
1018
1019    /// Await a single named async step, storing its output in context.
1020    async fn await_pending_step(&mut self, name: &str) -> Result<(), StepError> {
1021        if let Some(handle) = self.pending_futures.remove(name) {
1022            match handle.await {
1023                Ok(Ok(output)) => {
1024                    self.context.store(name, output.clone());
1025                    self.step_records.push(StepRecord {
1026                        name: name.to_string(),
1027                        step_type: "async".to_string(),
1028                        status: "ok".to_string(),
1029                        duration_secs: 0.0,
1030                        output_summary: truncate(output.text(), 100),
1031                        input_tokens: None,
1032                        output_tokens: None,
1033                        cost_usd: None,
1034                        sandboxed: false,
1035                    });
1036                }
1037                Ok(Err(e)) => {
1038                    return Err(StepError::Fail(format!(
1039                        "Async step '{}' failed: {e}",
1040                        name
1041                    )));
1042                }
1043                Err(e) => {
1044                    return Err(StepError::Fail(format!(
1045                        "Async step '{}' panicked: {e}",
1046                        name
1047                    )));
1048                }
1049            }
1050        }
1051        Ok(())
1052    }
1053}
1054
1055/// Extract token stats from a step output (for JSON records)
1056fn token_stats(output: &StepOutput) -> (Option<u64>, Option<u64>, Option<f64>) {
1057    match output {
1058        StepOutput::Agent(o) => (
1059            Some(o.stats.input_tokens),
1060            Some(o.stats.output_tokens),
1061            Some(o.stats.cost_usd),
1062        ),
1063        StepOutput::Chat(o) => (Some(o.input_tokens), Some(o.output_tokens), None),
1064        _ => (None, None, None),
1065    }
1066}
1067
1068/// Parse the raw step output according to the step's declared output_type.
1069/// Returns the output unchanged if no output_type is declared or it is Text.
1070fn parse_step_output(output: StepOutput, step_def: &StepDef) -> Result<StepOutput, StepError> {
1071    let output_type = match &step_def.output_type {
1072        Some(t) => t,
1073        None => return Ok(output),
1074    };
1075
1076    if *output_type == OutputType::Text {
1077        return Ok(output);
1078    }
1079
1080    let text = output.text().trim().to_string();
1081
1082    match output_type {
1083        OutputType::Integer => {
1084            text.parse::<i64>()
1085                .map_err(|_| StepError::Fail(format!("Failed to parse '{}' as integer", text)))?;
1086        }
1087        OutputType::Json => {
1088            serde_json::from_str::<serde_json::Value>(&text)
1089                .map_err(|e| StepError::Fail(format!("Failed to parse output as JSON: {e}")))?;
1090        }
1091        OutputType::Boolean => {
1092            match text.to_lowercase().as_str() {
1093                "true" | "1" | "yes" | "false" | "0" | "no" => {}
1094                _ => {
1095                    return Err(StepError::Fail(format!(
1096                        "Failed to parse '{}' as boolean",
1097                        text
1098                    )));
1099                }
1100            }
1101        }
1102        OutputType::Lines | OutputType::Text => {}
1103    }
1104
1105    Ok(output)
1106}
1107
1108/// Extract a ParsedValue from the step output based on output_type.
1109/// Returns None if no output_type or it is Text.
1110fn extract_parsed_value(output: &StepOutput, step_def: &StepDef) -> Option<ParsedValue> {
1111    let output_type = step_def.output_type.as_ref()?;
1112
1113    let text = output.text().trim().to_string();
1114
1115    let parsed = match output_type {
1116        OutputType::Text => ParsedValue::Text(text),
1117        OutputType::Integer => ParsedValue::Integer(text.parse::<i64>().ok()?),
1118        OutputType::Json => {
1119            let val = serde_json::from_str::<serde_json::Value>(&text).ok()?;
1120            ParsedValue::Json(val)
1121        }
1122        OutputType::Lines => {
1123            let lines: Vec<String> = text
1124                .lines()
1125                .filter(|l| !l.is_empty())
1126                .map(|l| l.to_string())
1127                .collect();
1128            ParsedValue::Lines(lines)
1129        }
1130        OutputType::Boolean => {
1131            let b = match text.to_lowercase().as_str() {
1132                "true" | "1" | "yes" => true,
1133                _ => false,
1134            };
1135            ParsedValue::Boolean(b)
1136        }
1137    };
1138
1139    Some(parsed)
1140}
1141
1142/// Truncate a string to at most `max` chars, appending "…" if cut.
1143/// Uses char boundaries to avoid panicking on multi-byte UTF-8 (e.g. emojis).
1144fn truncate(s: &str, max: usize) -> String {
1145    let char_count = s.chars().count();
1146    if char_count <= max {
1147        s.to_string()
1148    } else {
1149        let end: usize = s.char_indices().nth(max).map(|(i, _)| i).unwrap_or(s.len());
1150        format!("{}…", &s[..end])
1151    }
1152}
1153
1154
1155#[cfg(test)]
1156mod tests {
1157    use super::*;
1158    use crate::workflow::parser;
1159
1160    #[tokio::test]
1161    async fn engine_runs_sequential_cmd_steps() {
1162        let yaml = r#"
1163name: test
1164steps:
1165  - name: step1
1166    type: cmd
1167    run: "echo first"
1168  - name: step2
1169    type: cmd
1170    run: "echo second"
1171"#;
1172        let wf = parser::parse_str(yaml).unwrap();
1173        let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true);
1174        let result = engine.run().await.unwrap();
1175        assert_eq!(result.text().trim(), "second");
1176        assert!(engine.context.get_step("step1").is_some());
1177        assert_eq!(
1178            engine.context.get_step("step1").unwrap().text().trim(),
1179            "first"
1180        );
1181    }
1182
1183    #[tokio::test]
1184    async fn engine_exposes_step_output_to_next_step() {
1185        let yaml = r#"
1186name: test
1187steps:
1188  - name: produce
1189    type: cmd
1190    run: "echo hello_world"
1191  - name: consume
1192    type: cmd
1193    run: "echo {{ steps.produce.stdout }}"
1194"#;
1195        let wf = parser::parse_str(yaml).unwrap();
1196        let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true);
1197        let result = engine.run().await.unwrap();
1198        assert!(result.text().contains("hello_world"));
1199    }
1200
1201    #[tokio::test]
1202    async fn engine_collects_step_records_in_json_mode() {
1203        let yaml = r#"
1204name: json-test
1205steps:
1206  - name: alpha
1207    type: cmd
1208    run: "echo alpha"
1209  - name: beta
1210    type: cmd
1211    run: "echo beta"
1212"#;
1213        let wf = parser::parse_str(yaml).unwrap();
1214        let opts = EngineOptions {
1215            json: true,
1216            ..Default::default()
1217        };
1218        let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts);
1219        engine.run().await.unwrap();
1220
1221        let records = engine.step_records();
1222        assert_eq!(records.len(), 2);
1223        assert_eq!(records[0].name, "alpha");
1224        assert_eq!(records[0].status, "ok");
1225        assert!(!records[0].sandboxed);
1226        assert_eq!(records[1].name, "beta");
1227        assert_eq!(records[1].status, "ok");
1228    }
1229
1230    #[tokio::test]
1231    async fn json_output_includes_sandbox_mode() {
1232        let yaml = r#"
1233name: json-output-test
1234steps:
1235  - name: greet
1236    type: cmd
1237    run: "echo hello"
1238"#;
1239        let wf = parser::parse_str(yaml).unwrap();
1240        let opts = EngineOptions {
1241            json: true,
1242            ..Default::default()
1243        };
1244        let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts);
1245        let start = Instant::now();
1246        engine.run().await.unwrap();
1247        let out = engine.json_output("success", start.elapsed());
1248
1249        let json = serde_json::to_string(&out).unwrap();
1250        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1251
1252        assert_eq!(parsed["workflow_name"], "json-output-test");
1253        assert_eq!(parsed["status"], "success");
1254        assert_eq!(parsed["sandbox_mode"], "Disabled");
1255        assert!(parsed["steps"].is_array());
1256        assert_eq!(parsed["steps"][0]["name"], "greet");
1257    }
1258
1259    #[test]
1260    fn should_sandbox_step_logic() {
1261        let yaml = r#"
1262name: test
1263steps:
1264  - name: s
1265    type: cmd
1266    run: "echo test"
1267"#;
1268        let wf = parser::parse_str(yaml).unwrap();
1269
1270        // Disabled mode → nothing sandboxed
1271        let engine = Engine::new(wf.clone(), "".to_string(), HashMap::new(), false, true);
1272        assert!(!engine.should_sandbox_step(&StepType::Cmd));
1273        assert!(!engine.should_sandbox_step(&StepType::Agent));
1274        assert!(!engine.should_sandbox_step(&StepType::Gate));
1275
1276        // FullWorkflow mode → cmd + agent sandboxed
1277        let opts = EngineOptions {
1278            sandbox_mode: SandboxMode::FullWorkflow,
1279            quiet: true,
1280            ..Default::default()
1281        };
1282        let engine = Engine::with_options(wf.clone(), "".to_string(), HashMap::new(), opts);
1283        assert!(engine.should_sandbox_step(&StepType::Cmd));
1284        assert!(engine.should_sandbox_step(&StepType::Agent));
1285        assert!(!engine.should_sandbox_step(&StepType::Gate));
1286
1287        // AgentOnly mode → only agent sandboxed
1288        let opts = EngineOptions {
1289            sandbox_mode: SandboxMode::AgentOnly,
1290            quiet: true,
1291            ..Default::default()
1292        };
1293        let engine = Engine::with_options(wf.clone(), "".to_string(), HashMap::new(), opts);
1294        assert!(!engine.should_sandbox_step(&StepType::Cmd));
1295        assert!(engine.should_sandbox_step(&StepType::Agent));
1296        assert!(!engine.should_sandbox_step(&StepType::Gate));
1297    }
1298
1299    #[test]
1300    fn dry_run_does_not_panic() {
1301        let yaml = r#"
1302name: dry-run-test
1303scopes:
1304  lint_fix:
1305    steps:
1306      - name: lint
1307        type: cmd
1308        run: "npm run lint"
1309      - name: fix_lint
1310        type: agent
1311        prompt: "Fix lint errors"
1312steps:
1313  - name: setup
1314    type: cmd
1315    run: "echo setup"
1316  - name: validate
1317    type: gate
1318    condition: "{{ steps.setup.exit_code == 0 }}"
1319    on_pass: continue
1320    on_fail: fail
1321  - name: lint_gate
1322    type: repeat
1323    scope: lint_fix
1324    max_iterations: 3
1325"#;
1326        let wf = crate::workflow::parser::parse_str(yaml).unwrap();
1327        let engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true);
1328        engine.dry_run();
1329    }
1330
1331    #[test]
1332    fn dry_run_all_step_types() {
1333        let yaml = r#"
1334name: all-types
1335steps:
1336  - name: c
1337    type: cmd
1338    run: "ls"
1339  - name: g
1340    type: gate
1341    condition: "{{ true }}"
1342    on_pass: continue
1343  - name: p
1344    type: parallel
1345    steps:
1346      - name: p1
1347        type: cmd
1348        run: "echo p1"
1349"#;
1350        let wf = crate::workflow::parser::parse_str(yaml).unwrap();
1351        let engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true);
1352        engine.dry_run();
1353    }
1354
1355    #[test]
1356    fn truncate_helper() {
1357        assert_eq!(truncate("hello", 10), "hello");
1358        assert_eq!(truncate("hello world", 5), "hello…");
1359    }
1360
1361    #[tokio::test]
1362    async fn resume_fails_when_no_state_file() {
1363        let yaml = r#"
1364name: no-state-workflow-xyz-unique
1365steps:
1366  - name: step1
1367    type: cmd
1368    run: "echo 1"
1369"#;
1370        let wf = crate::workflow::parser::parse_str(yaml).unwrap();
1371        let opts = EngineOptions {
1372            resume_from: Some("step1".to_string()),
1373            quiet: true,
1374            ..Default::default()
1375        };
1376        let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts);
1377        let err = engine.run().await.unwrap_err();
1378        assert!(
1379            err.to_string().contains("No state file found"),
1380            "Expected 'No state file found' but got: {err}"
1381        );
1382    }
1383
1384    #[tokio::test]
1385    async fn resume_fails_for_unknown_step() {
1386        let workflow_name = "test-resume-unknown-step";
1387        let state = WorkflowState::new(workflow_name);
1388        let tmp_path = format!("/tmp/minion-{workflow_name}-20991231235959.state.json");
1389        let path = PathBuf::from(&tmp_path);
1390        state.save(&path).unwrap();
1391
1392        let yaml = format!(
1393            r#"
1394name: {workflow_name}
1395steps:
1396  - name: step1
1397    type: cmd
1398    run: "echo 1"
1399"#
1400        );
1401        let wf = crate::workflow::parser::parse_str(&yaml).unwrap();
1402        let opts = EngineOptions {
1403            resume_from: Some("nonexistent_step".to_string()),
1404            quiet: true,
1405            ..Default::default()
1406        };
1407        let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts);
1408        let err = engine.run().await.unwrap_err();
1409        assert!(
1410            err.to_string().contains("not found in workflow"),
1411            "Expected 'not found in workflow' but got: {err}"
1412        );
1413
1414        let _ = std::fs::remove_file(&path);
1415    }
1416
1417    #[tokio::test]
1418    async fn safe_accessor_returns_empty_for_missing_step() {
1419        let yaml = r#"
1420name: test-safe-accessor
1421steps:
1422  - name: use_missing
1423    type: cmd
1424    run: "echo '{{ missing.output? }}'"
1425"#;
1426        let wf = parser::parse_str(yaml).unwrap();
1427        let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true);
1428        let result = engine.run().await.unwrap();
1429        // safe accessor returns empty string when step doesn't exist
1430        assert_eq!(result.text().trim(), "");
1431    }
1432
1433    #[tokio::test]
1434    async fn safe_accessor_returns_value_when_present() {
1435        let yaml = r#"
1436name: test-safe-accessor-present
1437steps:
1438  - name: produce
1439    type: cmd
1440    run: "echo hello"
1441  - name: consume
1442    type: cmd
1443    run: "echo '{{ produce.output? }}'"
1444"#;
1445        let wf = parser::parse_str(yaml).unwrap();
1446        let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true);
1447        let result = engine.run().await.unwrap();
1448        assert!(result.text().contains("hello"));
1449    }
1450
1451    #[tokio::test]
1452    async fn strict_accessor_fails_when_step_missing() {
1453        let yaml = r#"
1454name: test-strict-accessor-fail
1455steps:
1456  - name: use_missing
1457    type: cmd
1458    run: "echo '{{ nonexistent.output! }}'"
1459"#;
1460        let wf = parser::parse_str(yaml).unwrap();
1461        let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true);
1462        let err = engine.run().await.unwrap_err();
1463        assert!(err.to_string().contains("strict access"), "{err}");
1464    }
1465
1466    #[tokio::test]
1467    async fn output_type_integer_parses_number() {
1468        let yaml = r#"
1469name: test-parse
1470steps:
1471  - name: count
1472    type: cmd
1473    run: "echo 42"
1474    output_type: integer
1475  - name: use_count
1476    type: cmd
1477    run: "echo {{ count.output }}"
1478"#;
1479        let wf = parser::parse_str(yaml).unwrap();
1480        let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true);
1481        let result = engine.run().await.unwrap();
1482        assert_eq!(result.text().trim(), "42");
1483    }
1484
1485    #[tokio::test]
1486    async fn output_type_integer_fails_on_non_number() {
1487        let yaml = r#"
1488name: test-parse-fail
1489steps:
1490  - name: count
1491    type: cmd
1492    run: "echo not_a_number"
1493    output_type: integer
1494"#;
1495        let wf = parser::parse_str(yaml).unwrap();
1496        let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true);
1497        let err = engine.run().await.unwrap_err();
1498        assert!(err.to_string().contains("integer"), "{err}");
1499    }
1500
1501    #[tokio::test]
1502    async fn output_type_json_allows_dot_access() {
1503        let yaml = r#"
1504name: test-json
1505steps:
1506  - name: scan
1507    type: cmd
1508    run: "echo '{\"count\": 5}'"
1509    output_type: json
1510  - name: use_scan
1511    type: cmd
1512    run: "echo {{ scan.output.count }}"
1513"#;
1514        let wf = parser::parse_str(yaml).unwrap();
1515        let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true);
1516        let result = engine.run().await.unwrap();
1517        assert_eq!(result.text().trim(), "5");
1518    }
1519
1520    #[tokio::test]
1521    async fn output_type_lines_allows_length_filter() {
1522        let yaml = r#"
1523name: test-lines
1524steps:
1525  - name: files
1526    type: cmd
1527    run: "printf 'a.rs\nb.rs\nc.rs'"
1528    output_type: lines
1529  - name: count_files
1530    type: cmd
1531    run: "echo {{ files.output | length }}"
1532"#;
1533        let wf = parser::parse_str(yaml).unwrap();
1534        let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true);
1535        let result = engine.run().await.unwrap();
1536        assert_eq!(result.text().trim(), "3");
1537    }
1538
1539    // ── Story 3.1: Async flag and pending futures ────────────────────────────
1540
1541    #[tokio::test]
1542    async fn async_step_is_spawned_and_completes() {
1543        let yaml = r#"
1544name: async-test
1545steps:
1546  - name: bg_task
1547    type: cmd
1548    run: "echo async_result"
1549    async_exec: true
1550  - name: sync_step
1551    type: cmd
1552    run: "echo sync_result"
1553"#;
1554        let wf = parser::parse_str(yaml).unwrap();
1555        let opts = EngineOptions { quiet: true, ..Default::default() };
1556        let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts);
1557        let result = engine.run().await.unwrap();
1558        // sync_step is the last synchronous step
1559        assert!(result.text().contains("sync_result"));
1560        // bg_task should be recorded after join_all
1561        let records = engine.step_records();
1562        assert!(records.iter().any(|r| r.name == "bg_task"), "bg_task should be in records");
1563    }
1564
1565    #[test]
1566    fn dry_run_shows_async_lightning_indicator() {
1567        let yaml = r#"
1568name: dry-async
1569steps:
1570  - name: fast_bg
1571    type: cmd
1572    run: "echo bg"
1573    async_exec: true
1574  - name: normal
1575    type: cmd
1576    run: "echo normal"
1577"#;
1578        // dry_run should not panic and the async step should have ⚡ in output
1579        let wf = parser::parse_str(yaml).unwrap();
1580        let engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true);
1581        // Just verify it doesn't panic
1582        engine.dry_run();
1583    }
1584
1585    #[test]
1586    fn should_sandbox_step_script_always_false() {
1587        let yaml = r#"
1588name: test
1589steps:
1590  - name: s
1591    type: cmd
1592    run: "echo test"
1593"#;
1594        let wf = parser::parse_str(yaml).unwrap();
1595
1596        // Script steps never run in sandbox, regardless of mode
1597        let opts = EngineOptions {
1598            sandbox_mode: SandboxMode::FullWorkflow,
1599            quiet: true,
1600            ..Default::default()
1601        };
1602        let engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts);
1603        assert!(!engine.should_sandbox_step(&StepType::Script));
1604    }
1605
1606    // ── Story 3.3: Await all remaining async futures at workflow end ─────────
1607
1608    #[tokio::test]
1609    async fn multiple_async_steps_all_complete_by_workflow_end() {
1610        let yaml = r#"
1611name: multi-async
1612steps:
1613  - name: task_a
1614    type: cmd
1615    run: "echo result_a"
1616    async_exec: true
1617  - name: task_b
1618    type: cmd
1619    run: "echo result_b"
1620    async_exec: true
1621  - name: sync_done
1622    type: cmd
1623    run: "echo done"
1624"#;
1625        let wf = parser::parse_str(yaml).unwrap();
1626        let opts = EngineOptions { quiet: true, ..Default::default() };
1627        let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts);
1628        engine.run().await.unwrap();
1629
1630        let records = engine.step_records();
1631        assert!(records.iter().any(|r| r.name == "task_a"), "task_a should be recorded");
1632        assert!(records.iter().any(|r| r.name == "task_b"), "task_b should be recorded");
1633        assert!(records.iter().any(|r| r.name == "sync_done"), "sync_done should be recorded");
1634    }
1635
1636    // ── Story 4.3: Script step dispatch ─────────────────────────────────────
1637
1638    #[tokio::test]
1639    async fn engine_dispatches_script_step() {
1640        let yaml = r#"
1641name: script-dispatch
1642steps:
1643  - name: calc
1644    type: script
1645    run: |
1646      let x = 6 * 7;
1647      x.to_string()
1648"#;
1649        let wf = parser::parse_str(yaml).unwrap();
1650        let opts = EngineOptions { quiet: true, ..Default::default() };
1651        let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts);
1652        let result = engine.run().await.unwrap();
1653        assert_eq!(result.text().trim(), "42");
1654    }
1655}