Skip to main content

minion_engine/engine/
mod.rs

1pub mod context;
2pub mod state;
3mod template;
4
5use std::collections::HashMap;
6use std::path::PathBuf;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use anyhow::{bail, Result};
11use colored::Colorize;
12use regex::Regex;
13use serde::Serialize;
14use tokio::sync::Mutex;
15use tokio::task::JoinHandle;
16
17use crate::cli::display;
18use crate::config::{ConfigManager, StepConfig};
19use crate::control_flow::ControlFlow;
20use crate::error::StepError;
21use crate::events::subscribers::{FileSubscriber, WebhookSubscriber};
22use crate::events::types::Event;
23use crate::events::EventBus;
24use crate::plugins::loader::PluginLoader;
25use crate::plugins::registry::PluginRegistry;
26use crate::prompts::{
27    detector::{StackDetector, StackInfo},
28    registry::Registry,
29};
30use crate::sandbox::config::SandboxConfig;
31use crate::sandbox::docker::DockerSandbox;
32use crate::sandbox::SandboxMode;
33use crate::steps::*;
34use crate::steps::{
35    agent::AgentExecutor, call::CallExecutor, chat::ChatExecutor, cmd::CmdExecutor,
36    gate::GateExecutor, map::MapExecutor, parallel::ParallelExecutor, repeat::RepeatExecutor,
37    script::ScriptExecutor, template_step::TemplateStepExecutor,
38};
39use crate::workflow::schema::{OutputType, StepDef, StepType, WorkflowDef};
40use context::Context;
41use state::WorkflowState;
42
43/// Options for configuring the Engine
44#[derive(Debug, Default)]
45pub struct EngineOptions {
46    pub verbose: bool,
47    pub quiet: bool,
48    /// Suppress display and emit JSON summary at end
49    pub json: bool,
50    /// Skip execution and show step tree
51    pub dry_run: bool,
52    /// Resume from this step name (requires a state file)
53    pub resume_from: Option<String>,
54    /// Sandbox mode resolved from CLI + config
55    pub sandbox_mode: SandboxMode,
56}
57
58/// Per-step execution record collected for JSON output
59#[derive(Debug, Clone, Serialize)]
60pub struct StepRecord {
61    pub name: String,
62    pub step_type: String,
63    pub status: String,
64    pub duration_secs: f64,
65    pub output_summary: String,
66    #[serde(skip_serializing_if = "Option::is_none")]
67    pub input_tokens: Option<u64>,
68    #[serde(skip_serializing_if = "Option::is_none")]
69    pub output_tokens: Option<u64>,
70    #[serde(skip_serializing_if = "Option::is_none")]
71    pub cost_usd: Option<f64>,
72    /// Whether this step ran inside the Docker sandbox
73    #[serde(skip_serializing_if = "std::ops::Not::not")]
74    pub sandboxed: bool,
75}
76
77/// Full workflow JSON output (--json mode)
78#[derive(Debug, Serialize)]
79pub struct WorkflowJsonOutput {
80    pub workflow_name: String,
81    pub status: String,
82    pub sandbox_mode: String,
83    pub steps: Vec<StepRecord>,
84    pub total_duration_secs: f64,
85    pub total_tokens: u64,
86    pub total_cost_usd: f64,
87}
88
89pub struct Engine {
90    pub workflow: WorkflowDef,
91    pub context: Context,
92    config_manager: ConfigManager,
93    pub verbose: bool,
94    pub quiet: bool,
95    pub json: bool,
96    pub dry_run: bool,
97    resume_from: Option<String>,
98    sandbox_mode: SandboxMode,
99    /// Shared Docker sandbox instance (created during run(), destroyed on completion)
100    sandbox: SharedSandbox,
101    step_records: Vec<StepRecord>,
102    state: Option<WorkflowState>,
103    state_file: Option<PathBuf>,
104    /// Pending async step futures — keyed by step name
105    pending_futures: HashMap<String, JoinHandle<Result<StepOutput, StepError>>>,
106    /// Plugin registry for dynamically-loaded step types
107    plugin_registry: Arc<Mutex<PluginRegistry>>,
108    /// Event bus for lifecycle events
109    pub event_bus: EventBus,
110    /// Detected stack info (populated at init if prompts/registry.yaml exists)
111    pub stack_info: Option<StackInfo>,
112}
113
114impl Engine {
115    pub async fn new(
116        workflow: WorkflowDef,
117        target: String,
118        vars: HashMap<String, serde_json::Value>,
119        verbose: bool,
120        quiet: bool,
121    ) -> Self {
122        let options = EngineOptions {
123            verbose,
124            quiet,
125            ..Default::default()
126        };
127        Self::with_options(workflow, target, vars, options).await
128    }
129
130    pub async fn with_options(
131        workflow: WorkflowDef,
132        target: String,
133        vars: HashMap<String, serde_json::Value>,
134        options: EngineOptions,
135    ) -> Self {
136        let mut context = Context::new(target, vars.clone());
137        // Wrap --var parameters into an "args" object so templates can use {{ args.mode }}
138        let args_obj: serde_json::Map<String, serde_json::Value> = vars.into_iter().collect();
139        context.insert_var("args", serde_json::Value::Object(args_obj));
140        let config_manager = ConfigManager::new(workflow.config.clone());
141        // JSON mode implies quiet (no decorative output)
142        let quiet = options.quiet || options.json;
143
144        // ── Load plugins from workflow config ─────────────────────────────────
145        let mut registry = PluginRegistry::new();
146        for plugin_cfg in &workflow.config.plugins {
147            match PluginLoader::load_plugin(&plugin_cfg.path) {
148                Ok(plugin) => {
149                    tracing::info!(name = %plugin_cfg.name, path = %plugin_cfg.path, "Loaded plugin");
150                    registry.register(plugin);
151                }
152                Err(e) => {
153                    tracing::warn!(
154                        name = %plugin_cfg.name,
155                        path = %plugin_cfg.path,
156                        error = %e,
157                        "Failed to load plugin"
158                    );
159                }
160            }
161        }
162
163        // ── Wire up event subscribers from workflow config ─────────────────────
164        let mut event_bus = EventBus::new();
165        if let Some(ref events_cfg) = workflow.config.events {
166            if let Some(ref webhook_url) = events_cfg.webhook {
167                event_bus.add_subscriber(Box::new(WebhookSubscriber::new(webhook_url.clone())));
168                tracing::info!(url = %webhook_url, "Registered webhook event subscriber");
169            }
170            if let Some(ref file_path) = events_cfg.file {
171                event_bus.add_subscriber(Box::new(FileSubscriber::new(file_path.clone())));
172                tracing::info!(path = %file_path, "Registered file event subscriber");
173            }
174        }
175
176        // ── Detect stack if registry exists ──────────────────────────────────
177        let stack_info = detect_stack_if_registry_exists().await;
178        if let Some(ref info) = stack_info {
179            let stack_val = serde_json::json!({
180                "name": info.name,
181                "parent": info.parent_chain.first().cloned().unwrap_or_else(|| "_default".to_string()),
182                "tools": {
183                    "lint": info.tools.get("lint").cloned().unwrap_or_default(),
184                    "test": info.tools.get("test").cloned().unwrap_or_default(),
185                    "build": info.tools.get("build").cloned().unwrap_or_default(),
186                    "install": info.tools.get("install").cloned().unwrap_or_default(),
187                }
188            });
189            context.insert_var("stack", stack_val);
190            context.stack_info = Some(info.clone());
191        }
192        // Set prompts_dir from workflow config or default
193        if let Some(ref pd) = workflow.prompts_dir {
194            context.prompts_dir = std::path::PathBuf::from(pd);
195        }
196
197        Self {
198            workflow,
199            context,
200            config_manager,
201            verbose: options.verbose,
202            quiet,
203            json: options.json,
204            dry_run: options.dry_run,
205            resume_from: options.resume_from,
206            sandbox_mode: options.sandbox_mode,
207            sandbox: None,
208            step_records: Vec::new(),
209            state: None,
210            state_file: None,
211            pending_futures: HashMap::new(),
212            plugin_registry: Arc::new(Mutex::new(registry)),
213            event_bus,
214            stack_info,
215        }
216    }
217
218    /// Return collected step records (for JSON output or testing)
219    pub fn step_records(&self) -> &[StepRecord] {
220        &self.step_records
221    }
222
223    /// Build the JSON summary after execution
224    pub fn json_output(&self, status: &str, total_duration: Duration) -> WorkflowJsonOutput {
225        let total_tokens: u64 = self
226            .step_records
227            .iter()
228            .map(|r| r.input_tokens.unwrap_or(0) + r.output_tokens.unwrap_or(0))
229            .sum();
230        let total_cost: f64 = self.step_records.iter().filter_map(|r| r.cost_usd).sum();
231
232        WorkflowJsonOutput {
233            workflow_name: self.workflow.name.clone(),
234            status: status.to_string(),
235            sandbox_mode: format!("{:?}", self.sandbox_mode),
236            steps: self.step_records.clone(),
237            total_duration_secs: total_duration.as_secs_f64(),
238            total_tokens,
239            total_cost_usd: total_cost,
240        }
241    }
242
243    // ── Sandbox Lifecycle ────────────────────────────────────────────────────
244
245    /// Create and start the Docker sandbox container.
246    /// Copies the current working directory into the container as /workspace.
247    async fn sandbox_up(&mut self) -> Result<()> {
248        let sandbox_config = SandboxConfig::from_global_config(&self.workflow.config.global);
249        let workspace = std::env::current_dir()
250            .map(|p| p.to_string_lossy().to_string())
251            .unwrap_or_else(|_| ".".to_string());
252
253        let mut docker = DockerSandbox::new(sandbox_config, &workspace);
254
255        if !self.quiet {
256            println!("  {} Creating Docker sandbox container…", "🐳".cyan());
257        }
258
259        let t0 = Instant::now();
260        docker.create().await?;
261        let create_ms = t0.elapsed().as_millis();
262
263        let t1 = Instant::now();
264        docker.copy_workspace(&workspace).await?;
265        let copy_ms = t1.elapsed().as_millis();
266
267        // Configure Git safe.directory inside the container so that
268        // Git/gh commands work on the copied workspace (avoids
269        // "dubious ownership" errors when host UID != container UID).
270        // Also set user.name/email defaults for any git operations.
271        let t2 = Instant::now();
272        let _ = docker
273            .run_command(
274                "git config --global --add safe.directory /workspace \
275                 && git config --global user.name 'Minion Engine' \
276                 && git config --global user.email 'minion@localhost' \
277                 && if [ -n \"$GH_TOKEN\" ]; then \
278                      git config --global credential.helper '!f() { echo \"password=$GH_TOKEN\"; }; f'; \
279                      git config --global credential.https://github.com.username x-access-token; \
280                    fi",
281            )
282            .await;
283        let git_ms = t2.elapsed().as_millis();
284
285        let total_ms = t0.elapsed().as_millis();
286
287        if !self.quiet {
288            println!(
289                "  {} Sandbox ready — container {:.1}s, copy {:.1}s, git {:.1}s (total {:.1}s)",
290                "🔒".green(),
291                create_ms as f64 / 1000.0,
292                copy_ms as f64 / 1000.0,
293                git_ms as f64 / 1000.0,
294                total_ms as f64 / 1000.0,
295            );
296        }
297
298        tracing::info!(
299            create_ms,
300            copy_ms,
301            git_ms,
302            total_ms,
303            "Sandbox setup complete"
304        );
305
306        self.sandbox = Some(Arc::new(Mutex::new(docker)));
307        Ok(())
308    }
309
310    /// Copy results from sandbox back to host, then destroy the container.
311    async fn sandbox_down(&mut self) -> Result<()> {
312        if let Some(sb) = self.sandbox.take() {
313            let mut docker = sb.lock().await;
314
315            let workspace = std::env::current_dir()
316                .map(|p| p.to_string_lossy().to_string())
317                .unwrap_or_else(|_| ".".to_string());
318
319            if !self.quiet {
320                println!("  {} Copying results from sandbox…", "📦".cyan());
321            }
322
323            let t0 = Instant::now();
324            docker.copy_results(&workspace).await?;
325            let copy_back_ms = t0.elapsed().as_millis();
326
327            let t1 = Instant::now();
328            docker.destroy().await?;
329            let destroy_ms = t1.elapsed().as_millis();
330
331            if !self.quiet {
332                println!(
333                    "  {} Sandbox destroyed — copy-back {:.1}s, destroy {:.1}s",
334                    "🗑️ ".dimmed(),
335                    copy_back_ms as f64 / 1000.0,
336                    destroy_ms as f64 / 1000.0,
337                );
338            }
339
340            tracing::info!(copy_back_ms, destroy_ms, "Sandbox teardown complete");
341        }
342        Ok(())
343    }
344
345    /// Determine whether a step should run inside the sandbox based on sandbox_mode
346    fn should_sandbox_step(&self, step_type: &StepType) -> bool {
347        // Script steps run embedded (no external process), never sandboxed
348        if *step_type == StepType::Script {
349            return false;
350        }
351        match self.sandbox_mode {
352            SandboxMode::Disabled => false,
353            SandboxMode::FullWorkflow | SandboxMode::Devbox => {
354                // ALL executable steps run inside the sandbox
355                matches!(step_type, StepType::Cmd | StepType::Agent)
356            }
357            SandboxMode::AgentOnly => {
358                // Only agent steps run inside the sandbox; cmd steps run on host
359                matches!(step_type, StepType::Agent)
360            }
361        }
362    }
363
364    // ── Main Run Loop ────────────────────────────────────────────────────────
365
366    pub async fn run(&mut self) -> Result<StepOutput> {
367        // ── State / Resume setup ──────────────────────────────────────────────
368        let state_file = WorkflowState::state_file_path(&self.workflow.name);
369        self.state_file = Some(state_file.clone());
370
371        let mut loaded_state: Option<WorkflowState> = None;
372        if let Some(ref resume_step) = self.resume_from.clone() {
373            match WorkflowState::find_latest(&self.workflow.name) {
374                Some(path) => match WorkflowState::load(&path) {
375                    Ok(s) => {
376                        let exists = self.workflow.steps.iter().any(|s| &s.name == resume_step);
377                        if !exists {
378                            bail!(
379                                "Resume step '{}' not found in workflow '{}'. \
380                                     Available steps: {}",
381                                resume_step,
382                                self.workflow.name,
383                                self.workflow
384                                    .steps
385                                    .iter()
386                                    .map(|s| s.name.as_str())
387                                    .collect::<Vec<_>>()
388                                    .join(", ")
389                            );
390                        }
391                        if !self.quiet {
392                            println!(
393                                "  {} Resuming from step '{}' (state: {})",
394                                "↺".cyan(),
395                                resume_step,
396                                path.display()
397                            );
398                        }
399                        loaded_state = Some(s);
400                    }
401                    Err(e) => bail!("Failed to load state file {}: {e}", path.display()),
402                },
403                None => {
404                    bail!(
405                        "No state file found for workflow '{}'. \
406                         Cannot resume. Run the workflow without --resume first.",
407                        self.workflow.name
408                    );
409                }
410            }
411        }
412
413        // ── Initialize persisted state for this run ───────────────────────────
414        let mut current_state = WorkflowState::new(&self.workflow.name);
415
416        // ── Display ───────────────────────────────────────────────────────────
417        if !self.quiet {
418            display::workflow_start(&self.workflow.name);
419            if self.sandbox_mode != SandboxMode::Disabled {
420                println!("  {} Sandbox mode: {:?}", "🔒".cyan(), self.sandbox_mode);
421            }
422        }
423
424        // ── Sandbox: Create container BEFORE step execution ───────────────────
425        if self.sandbox_mode != SandboxMode::Disabled {
426            self.sandbox_up().await?;
427        }
428
429        // ── Event: WorkflowStarted ────────────────────────────────────────────
430        self.event_bus
431            .emit(Event::WorkflowStarted {
432                timestamp: chrono::Utc::now(),
433            })
434            .await;
435
436        let start = Instant::now();
437        let steps = self.workflow.steps.clone();
438        let mut last_output = StepOutput::Empty;
439        let mut step_count = 0;
440
441        let resume_from = self.resume_from.clone();
442        let mut resuming = resume_from.is_some();
443
444        // ── Execute steps ─────────────────────────────────────────────────────
445        let run_result: Result<(), anyhow::Error> = async {
446            for step_def in &steps {
447                // ── Resume: skip steps before the resume point ────────────────
448                if resuming {
449                    let is_resume_point = resume_from.as_deref() == Some(&step_def.name);
450                    if !is_resume_point {
451                        if let Some(ref ls) = loaded_state {
452                            if let Some(output) = ls.steps.get(&step_def.name) {
453                                self.context.store(&step_def.name, output.clone());
454                                if !self.quiet {
455                                    println!(
456                                        "  {} {} {}",
457                                        "⏭".yellow(),
458                                        step_def.name,
459                                        "(skipped — loaded from state)".dimmed()
460                                    );
461                                }
462                                self.step_records.push(StepRecord {
463                                    name: step_def.name.clone(),
464                                    step_type: step_def.step_type.to_string(),
465                                    status: "skipped_resume".to_string(),
466                                    duration_secs: 0.0,
467                                    output_summary: truncate(output.text(), 100),
468                                    input_tokens: None,
469                                    output_tokens: None,
470                                    cost_usd: None,
471                                    sandboxed: false,
472                                });
473                            }
474                        }
475                        continue;
476                    }
477                    resuming = false;
478                }
479
480                // ── Async step: spawn and register in pending_futures ─────────
481                if step_def.async_exec == Some(true) {
482                    let handle = self.spawn_async_step(step_def);
483                    self.pending_futures.insert(step_def.name.clone(), handle);
484                    if !self.quiet {
485                        println!(
486                            "  {} {} {} {}",
487                            "⚡".yellow(),
488                            step_def.name,
489                            format!("[{}]", step_def.step_type).cyan(),
490                            "(async — spawned)".dimmed()
491                        );
492                    }
493                    step_count += 1;
494                    continue;
495                }
496
497                // ── Auto-await: resolve pending deps before execute ────────────
498                self.await_pending_deps(step_def).await?;
499
500                match self.execute_step(step_def).await {
501                    Ok(output) => {
502                        current_state
503                            .steps
504                            .insert(step_def.name.clone(), output.clone());
505                        if let Some(ref p) = self.state_file {
506                            let _ = current_state.save(p);
507                        }
508                        last_output = output;
509                        step_count += 1;
510                    }
511                    Err(StepError::ControlFlow(ControlFlow::Skip { message })) => {
512                        self.context.store(&step_def.name, StepOutput::Empty);
513                        if !self.quiet {
514                            let pb = display::step_start(
515                                &step_def.name,
516                                &step_def.step_type.to_string(),
517                            );
518                            display::step_skip(&pb, &step_def.name, &message);
519                        }
520                        self.step_records.push(StepRecord {
521                            name: step_def.name.clone(),
522                            step_type: step_def.step_type.to_string(),
523                            status: "skipped".to_string(),
524                            duration_secs: 0.0,
525                            output_summary: message.clone(),
526                            input_tokens: None,
527                            output_tokens: None,
528                            cost_usd: None,
529                            sandboxed: false,
530                        });
531                    }
532                    Err(StepError::ControlFlow(ControlFlow::Fail { message })) => {
533                        if !self.quiet {
534                            display::workflow_failed(&step_def.name, &message);
535                        }
536                        bail!("Step '{}' failed: {}", step_def.name, message);
537                    }
538                    Err(StepError::ControlFlow(ControlFlow::Break { .. })) => {
539                        break;
540                    }
541                    Err(e) => {
542                        if !self.quiet {
543                            display::workflow_failed(&step_def.name, &e.to_string());
544                        }
545                        return Err(e.into());
546                    }
547                }
548            }
549            Ok(())
550        }
551        .await;
552
553        // ── Story 3.3: Await all remaining pending async futures ──────────────
554        let remaining: Vec<(String, JoinHandle<Result<StepOutput, StepError>>)> =
555            self.pending_futures.drain().collect();
556        for (name, handle) in remaining {
557            let step_type = self
558                .workflow
559                .steps
560                .iter()
561                .find(|s| s.name == name)
562                .map(|s| s.step_type.to_string())
563                .unwrap_or_else(|| "async".to_string());
564            match handle.await {
565                Ok(Ok(output)) => {
566                    self.context.store(&name, output.clone());
567                    self.step_records.push(StepRecord {
568                        name: name.clone(),
569                        step_type: step_type.clone(),
570                        status: "ok".to_string(),
571                        duration_secs: 0.0,
572                        output_summary: truncate(output.text(), 100),
573                        input_tokens: None,
574                        output_tokens: None,
575                        cost_usd: None,
576                        sandboxed: false,
577                    });
578                }
579                Ok(Err(e)) => {
580                    self.step_records.push(StepRecord {
581                        name: name.clone(),
582                        step_type: step_type.clone(),
583                        status: "failed".to_string(),
584                        duration_secs: 0.0,
585                        output_summary: e.to_string(),
586                        input_tokens: None,
587                        output_tokens: None,
588                        cost_usd: None,
589                        sandboxed: false,
590                    });
591                    if !self.quiet {
592                        eprintln!("  {} Async step '{}' failed: {}", "✗".red(), name, e);
593                    }
594                }
595                Err(e) => {
596                    let msg = format!("Async step '{}' panicked: {e}", name);
597                    self.step_records.push(StepRecord {
598                        name: name.clone(),
599                        step_type,
600                        status: "failed".to_string(),
601                        duration_secs: 0.0,
602                        output_summary: msg.clone(),
603                        input_tokens: None,
604                        output_tokens: None,
605                        cost_usd: None,
606                        sandboxed: false,
607                    });
608                    if !self.quiet {
609                        eprintln!("  {} {}", "✗".red(), msg);
610                    }
611                }
612            }
613        }
614
615        // ── Sandbox: Destroy container AFTER all steps (always, even on error) ─
616        if self.sandbox_mode != SandboxMode::Disabled {
617            if let Err(e) = self.sandbox_down().await {
618                if !self.quiet {
619                    eprintln!("  {} Sandbox cleanup warning: {e}", "⚠".yellow());
620                }
621            }
622        }
623
624        // ── Event: WorkflowCompleted ──────────────────────────────────────────
625        self.event_bus
626            .emit(Event::WorkflowCompleted {
627                duration_ms: start.elapsed().as_millis() as u64,
628                timestamp: chrono::Utc::now(),
629            })
630            .await;
631
632        // Propagate any error from the step loop
633        run_result?;
634
635        if !self.quiet {
636            display::workflow_done(start.elapsed(), step_count);
637        }
638
639        self.state = Some(current_state);
640        Ok(last_output)
641    }
642
643    pub async fn execute_step(&mut self, step_def: &StepDef) -> Result<StepOutput, StepError> {
644        let config = self.resolve_config(step_def);
645        let use_sandbox = self.should_sandbox_step(&step_def.step_type);
646
647        let pb = if !self.quiet {
648            let label = if use_sandbox {
649                format!("{} 🐳", step_def.step_type)
650            } else {
651                step_def.step_type.to_string()
652            };
653            Some(display::step_start(&step_def.name, &label))
654        } else {
655            None
656        };
657
658        let start = Instant::now();
659
660        // ── Event: StepStarted ────────────────────────────────────────────────
661        self.event_bus
662            .emit(Event::StepStarted {
663                step_name: step_def.name.clone(),
664                step_type: step_def.step_type.to_string(),
665                timestamp: chrono::Utc::now(),
666            })
667            .await;
668
669        tracing::debug!(
670            step = %step_def.name,
671            step_type = %step_def.step_type,
672            sandboxed = use_sandbox,
673            "Executing step"
674        );
675
676        // Choose sandbox-aware execution when sandbox is active for this step
677        let sandbox_ref = if use_sandbox { &self.sandbox } else { &None };
678
679        let result = match step_def.step_type {
680            StepType::Cmd => {
681                CmdExecutor
682                    .execute_sandboxed(step_def, &config, &self.context, sandbox_ref)
683                    .await
684            }
685            StepType::Agent => {
686                AgentExecutor
687                    .execute_sandboxed(step_def, &config, &self.context, sandbox_ref)
688                    .await
689            }
690            StepType::Gate => GateExecutor.execute(step_def, &config, &self.context).await,
691            StepType::Repeat => {
692                RepeatExecutor::new(&self.workflow.scopes)
693                    .execute(step_def, &config, &self.context)
694                    .await
695            }
696            StepType::Chat => ChatExecutor.execute(step_def, &config, &self.context).await,
697            StepType::Map => {
698                MapExecutor::new(&self.workflow.scopes)
699                    .execute(step_def, &config, &self.context)
700                    .await
701            }
702            StepType::Parallel => {
703                ParallelExecutor::new(&self.workflow.scopes)
704                    .execute(step_def, &config, &self.context)
705                    .await
706            }
707            StepType::Call => {
708                CallExecutor::new(&self.workflow.scopes)
709                    .execute(step_def, &config, &self.context)
710                    .await
711            }
712            StepType::Template => {
713                let prompts_dir = self.workflow.prompts_dir.as_deref();
714                TemplateStepExecutor::new(prompts_dir)
715                    .execute(step_def, &config, &self.context)
716                    .await
717            }
718            StepType::Script => {
719                ScriptExecutor
720                    .execute(step_def, &config, &self.context)
721                    .await
722            } // Note: all StepType variants are covered above.
723              // Plugin dispatch will be added via a dedicated StepType::Plugin variant.
724        };
725
726        let elapsed = start.elapsed();
727        let duration_ms = elapsed.as_millis() as u64;
728
729        // ── Output Parsing Section ────────────────────────────────────────────
730        // Parse step output according to output_type (if declared)
731        let result = match result {
732            Ok(output) => parse_step_output(output, step_def),
733            err => err,
734        };
735
736        match &result {
737            Ok(output) => {
738                tracing::info!(
739                    step = %step_def.name,
740                    step_type = %step_def.step_type,
741                    duration_ms = elapsed.as_millis(),
742                    sandboxed = use_sandbox,
743                    status = "ok",
744                    "Step completed"
745                );
746                self.context.store(&step_def.name, output.clone());
747                // Store parsed value separately if present
748                if let Some(parsed) = extract_parsed_value(output, step_def) {
749                    self.context.store_parsed(&step_def.name, parsed);
750                }
751
752                let (it, ot, cost) = token_stats(output);
753                self.step_records.push(StepRecord {
754                    name: step_def.name.clone(),
755                    step_type: step_def.step_type.to_string(),
756                    status: "ok".to_string(),
757                    duration_secs: elapsed.as_secs_f64(),
758                    output_summary: truncate(output.text(), 100),
759                    input_tokens: it,
760                    output_tokens: ot,
761                    cost_usd: cost,
762                    sandboxed: use_sandbox,
763                });
764
765                if let Some(pb) = &pb {
766                    display::step_ok(pb, &step_def.name, elapsed);
767                }
768            }
769            Err(StepError::ControlFlow(cf)) => {
770                let msg = match cf {
771                    ControlFlow::Skip { message } => format!("skipped: {message}"),
772                    ControlFlow::Break { message, .. } => format!("break: {message}"),
773                    ControlFlow::Fail { message } => format!("failed: {message}"),
774                    ControlFlow::Next { message } => format!("next: {message}"),
775                };
776                tracing::info!(
777                    step = %step_def.name,
778                    step_type = %step_def.step_type,
779                    duration_ms = elapsed.as_millis(),
780                    status = "control_flow",
781                    message = %msg,
782                    "Step control flow"
783                );
784                if let Some(pb) = &pb {
785                    display::step_skip(pb, &step_def.name, &msg);
786                }
787            }
788            Err(e) => {
789                tracing::warn!(
790                    step = %step_def.name,
791                    step_type = %step_def.step_type,
792                    duration_ms = elapsed.as_millis(),
793                    status = "error",
794                    error = %e,
795                    "Step failed"
796                );
797                self.step_records.push(StepRecord {
798                    name: step_def.name.clone(),
799                    step_type: step_def.step_type.to_string(),
800                    status: "failed".to_string(),
801                    duration_secs: elapsed.as_secs_f64(),
802                    output_summary: e.to_string(),
803                    input_tokens: None,
804                    output_tokens: None,
805                    cost_usd: None,
806                    sandboxed: use_sandbox,
807                });
808                if let Some(pb) = &pb {
809                    display::step_fail(pb, &step_def.name, &e.to_string());
810                }
811            }
812        }
813
814        // ── Event: StepCompleted / StepFailed ─────────────────────────────────
815        match &result {
816            Ok(_) => {
817                self.event_bus
818                    .emit(Event::StepCompleted {
819                        step_name: step_def.name.clone(),
820                        step_type: step_def.step_type.to_string(),
821                        duration_ms,
822                        timestamp: chrono::Utc::now(),
823                    })
824                    .await;
825            }
826            Err(e) if !matches!(e, StepError::ControlFlow(_)) => {
827                self.event_bus
828                    .emit(Event::StepFailed {
829                        step_name: step_def.name.clone(),
830                        step_type: step_def.step_type.to_string(),
831                        error: e.to_string(),
832                        duration_ms,
833                        timestamp: chrono::Utc::now(),
834                    })
835                    .await;
836            }
837            _ => {}
838        }
839
840        result
841    }
842
843    /// Dry-run: walk all steps and print a visual tree without executing anything.
844    pub fn dry_run(&self) {
845        use colored::Colorize;
846
847        println!(
848            "{} {} (dry-run)",
849            "▶".cyan().bold(),
850            self.workflow.name.bold()
851        );
852        if self.sandbox_mode != SandboxMode::Disabled {
853            println!("  {} Sandbox mode: {:?}", "🔒".cyan(), self.sandbox_mode);
854        }
855        println!();
856
857        let steps = &self.workflow.steps;
858        let total = steps.len();
859        for (i, step) in steps.iter().enumerate() {
860            let is_last = i + 1 == total;
861            let branch = if is_last { "└──" } else { "├──" };
862            let config = self.resolve_config(step);
863
864            let sandbox_indicator = if self.should_sandbox_step(&step.step_type) {
865                " 🐳"
866            } else {
867                ""
868            };
869
870            let async_indicator = if step.async_exec == Some(true) {
871                " ⚡"
872            } else {
873                ""
874            };
875
876            println!(
877                "{} {} {}{}{}",
878                branch.dimmed(),
879                step.name.bold(),
880                format!("[{}]", step.step_type).cyan(),
881                sandbox_indicator,
882                async_indicator
883            );
884
885            let indent = if is_last { "    " } else { "│   " };
886            self.print_step_details(step, &config, indent);
887
888            if !is_last {
889                println!("│");
890            }
891        }
892    }
893
894    fn print_step_details(&self, step: &StepDef, config: &StepConfig, indent: &str) {
895        use colored::Colorize;
896
897        match step.step_type {
898            StepType::Cmd => {
899                if let Some(ref run) = step.run {
900                    let preview = truncate(run, 80);
901                    println!("{}  run: {}", indent, preview.dimmed());
902                }
903            }
904            StepType::Agent | StepType::Chat => {
905                if let Some(ref prompt) = step.prompt {
906                    let preview = truncate(&prompt.replace('\n', " "), 80);
907                    println!("{}  prompt: {}", indent, preview.dimmed());
908                }
909                if let Some(model) = config.get_str("model") {
910                    println!("{}  model: {}", indent, model.dimmed());
911                }
912            }
913            StepType::Gate => {
914                if let Some(ref cond) = step.condition {
915                    println!("{}  condition: {}", indent, cond.dimmed());
916                }
917                println!(
918                    "{}  on_pass: {} / on_fail: {}",
919                    indent,
920                    step.on_pass.as_deref().unwrap_or("continue").dimmed(),
921                    step.on_fail.as_deref().unwrap_or("continue").dimmed()
922                );
923            }
924            StepType::Repeat => {
925                let scope_name = step.scope.as_deref().unwrap_or("<none>");
926                let max_iter = step.max_iterations.unwrap_or(1);
927                println!("{}  scope: {}", indent, scope_name.dimmed());
928                println!(
929                    "{}  max_iterations: {}",
930                    indent,
931                    max_iter.to_string().dimmed()
932                );
933                self.print_scope_steps(scope_name, indent);
934            }
935            StepType::Map => {
936                let scope_name = step.scope.as_deref().unwrap_or("<none>");
937                let items = step.items.as_deref().unwrap_or("<none>");
938                println!("{}  items: {}", indent, items.dimmed());
939                println!("{}  scope: {}", indent, scope_name.dimmed());
940                if let Some(p) = step.parallel {
941                    println!("{}  parallel: {}", indent, p.to_string().dimmed());
942                }
943                self.print_scope_steps(scope_name, indent);
944            }
945            StepType::Call => {
946                let scope_name = step.scope.as_deref().unwrap_or("<none>");
947                println!("{}  scope: {}", indent, scope_name.dimmed());
948                self.print_scope_steps(scope_name, indent);
949            }
950            StepType::Parallel => {
951                if let Some(ref sub_steps) = step.steps {
952                    println!("{}  parallel steps:", indent);
953                    for sub in sub_steps {
954                        println!(
955                            "{}    • {} [{}]",
956                            indent,
957                            sub.name.bold(),
958                            sub.step_type.to_string().cyan()
959                        );
960                    }
961                }
962            }
963            StepType::Template => {
964                if let Some(ref run) = step.run {
965                    println!("{}  template: {}", indent, run.dimmed());
966                }
967            }
968            StepType::Script => {
969                if let Some(ref run) = step.run {
970                    let preview = truncate(&run.replace('\n', " "), 80);
971                    println!("{}  script: {}", indent, preview.dimmed());
972                }
973            }
974        }
975
976        if let Some(t) = config.get_str("timeout") {
977            println!("{}  timeout: {}", indent, t.dimmed());
978        }
979    }
980
981    fn print_scope_steps(&self, scope_name: &str, indent: &str) {
982        use colored::Colorize;
983        if let Some(scope) = self.workflow.scopes.get(scope_name) {
984            println!("{}  scope steps:", indent);
985            for step in &scope.steps {
986                println!(
987                    "{}    • {} [{}]",
988                    indent,
989                    step.name.bold(),
990                    step.step_type.to_string().cyan()
991                );
992            }
993        }
994    }
995
996    fn resolve_config(&self, step_def: &StepDef) -> StepConfig {
997        self.config_manager
998            .resolve(&step_def.name, &step_def.step_type, &step_def.config)
999    }
1000
1001    // ── Async Step Support ───────────────────────────────────────────────────
1002
1003    /// Spawn an async step as a tokio task. Returns a JoinHandle for later awaiting.
1004    /// Creates a minimal context (target only) for the spawned task since the
1005    /// executor templates are rendered inside the task.
1006    fn spawn_async_step(&self, step_def: &StepDef) -> JoinHandle<Result<StepOutput, StepError>> {
1007        let step = step_def.clone();
1008        let config = self.resolve_config(step_def);
1009        let target = self
1010            .context
1011            .get_var("target")
1012            .and_then(|v| v.as_str())
1013            .unwrap_or("")
1014            .to_string();
1015
1016        tokio::spawn(async move {
1017            let ctx = Context::new(target, HashMap::new());
1018            match step.step_type {
1019                StepType::Cmd => CmdExecutor.execute(&step, &config, &ctx).await,
1020                StepType::Agent => AgentExecutor.execute(&step, &config, &ctx).await,
1021                StepType::Script => ScriptExecutor.execute(&step, &config, &ctx).await,
1022                _ => Err(StepError::Fail(format!(
1023                    "Async execution not supported for step type '{}'",
1024                    step.step_type
1025                ))),
1026            }
1027        })
1028    }
1029
1030    /// Scan the step's template fields for references to other steps.
1031    /// If any referenced step is in pending_futures, await it and store result in context.
1032    async fn await_pending_deps(&mut self, step_def: &StepDef) -> Result<(), StepError> {
1033        let pattern = Regex::new(r"steps\.(\w+)\.").unwrap();
1034
1035        // Collect all template fields that might reference step outputs
1036        let mut templates: Vec<String> = Vec::new();
1037        if let Some(ref run) = step_def.run {
1038            templates.push(run.clone());
1039        }
1040        if let Some(ref prompt) = step_def.prompt {
1041            templates.push(prompt.clone());
1042        }
1043        if let Some(ref condition) = step_def.condition {
1044            templates.push(condition.clone());
1045        }
1046
1047        // Find all step names referenced in templates
1048        let mut deps: Vec<String> = Vec::new();
1049        for tmpl in &templates {
1050            for cap in pattern.captures_iter(tmpl) {
1051                let name = cap[1].to_string();
1052                if self.pending_futures.contains_key(&name) && !deps.contains(&name) {
1053                    deps.push(name);
1054                }
1055            }
1056        }
1057
1058        // Await each dependency
1059        for name in deps {
1060            self.await_pending_step(&name).await?;
1061        }
1062
1063        Ok(())
1064    }
1065
1066    /// Await a single named async step, storing its output in context.
1067    async fn await_pending_step(&mut self, name: &str) -> Result<(), StepError> {
1068        if let Some(handle) = self.pending_futures.remove(name) {
1069            match handle.await {
1070                Ok(Ok(output)) => {
1071                    self.context.store(name, output.clone());
1072                    self.step_records.push(StepRecord {
1073                        name: name.to_string(),
1074                        step_type: "async".to_string(),
1075                        status: "ok".to_string(),
1076                        duration_secs: 0.0,
1077                        output_summary: truncate(output.text(), 100),
1078                        input_tokens: None,
1079                        output_tokens: None,
1080                        cost_usd: None,
1081                        sandboxed: false,
1082                    });
1083                }
1084                Ok(Err(e)) => {
1085                    return Err(StepError::Fail(format!(
1086                        "Async step '{}' failed: {e}",
1087                        name
1088                    )));
1089                }
1090                Err(e) => {
1091                    return Err(StepError::Fail(format!(
1092                        "Async step '{}' panicked: {e}",
1093                        name
1094                    )));
1095                }
1096            }
1097        }
1098        Ok(())
1099    }
1100}
1101
1102/// Extract token stats from a step output (for JSON records)
1103fn token_stats(output: &StepOutput) -> (Option<u64>, Option<u64>, Option<f64>) {
1104    match output {
1105        StepOutput::Agent(o) => (
1106            Some(o.stats.input_tokens),
1107            Some(o.stats.output_tokens),
1108            Some(o.stats.cost_usd),
1109        ),
1110        StepOutput::Chat(o) => (Some(o.input_tokens), Some(o.output_tokens), None),
1111        _ => (None, None, None),
1112    }
1113}
1114
1115/// Parse the raw step output according to the step's declared output_type.
1116/// Returns the output unchanged if no output_type is declared or it is Text.
1117fn parse_step_output(output: StepOutput, step_def: &StepDef) -> Result<StepOutput, StepError> {
1118    let output_type = match &step_def.output_type {
1119        Some(t) => t,
1120        None => return Ok(output),
1121    };
1122
1123    if *output_type == OutputType::Text {
1124        return Ok(output);
1125    }
1126
1127    let text = output.text().trim().to_string();
1128
1129    match output_type {
1130        OutputType::Integer => {
1131            text.parse::<i64>()
1132                .map_err(|_| StepError::Fail(format!("Failed to parse '{}' as integer", text)))?;
1133        }
1134        OutputType::Json => {
1135            serde_json::from_str::<serde_json::Value>(&text)
1136                .map_err(|e| StepError::Fail(format!("Failed to parse output as JSON: {e}")))?;
1137        }
1138        OutputType::Boolean => match text.to_lowercase().as_str() {
1139            "true" | "1" | "yes" | "false" | "0" | "no" => {}
1140            _ => {
1141                return Err(StepError::Fail(format!(
1142                    "Failed to parse '{}' as boolean",
1143                    text
1144                )));
1145            }
1146        },
1147        OutputType::Lines | OutputType::Text => {}
1148    }
1149
1150    Ok(output)
1151}
1152
1153/// Extract a ParsedValue from the step output based on output_type.
1154/// Returns None if no output_type or it is Text.
1155fn extract_parsed_value(output: &StepOutput, step_def: &StepDef) -> Option<ParsedValue> {
1156    let output_type = step_def.output_type.as_ref()?;
1157
1158    let text = output.text().trim().to_string();
1159
1160    let parsed = match output_type {
1161        OutputType::Text => ParsedValue::Text(text),
1162        OutputType::Integer => ParsedValue::Integer(text.parse::<i64>().ok()?),
1163        OutputType::Json => {
1164            let val = serde_json::from_str::<serde_json::Value>(&text).ok()?;
1165            ParsedValue::Json(val)
1166        }
1167        OutputType::Lines => {
1168            let lines: Vec<String> = text
1169                .lines()
1170                .filter(|l| !l.is_empty())
1171                .map(|l| l.to_string())
1172                .collect();
1173            ParsedValue::Lines(lines)
1174        }
1175        OutputType::Boolean => {
1176            let b = match text.to_lowercase().as_str() {
1177                "true" | "1" | "yes" => true,
1178                _ => false,
1179            };
1180            ParsedValue::Boolean(b)
1181        }
1182    };
1183
1184    Some(parsed)
1185}
1186
1187/// Truncate a string to at most `max` chars, appending "…" if cut.
1188/// Uses char boundaries to avoid panicking on multi-byte UTF-8 (e.g. emojis).
1189fn truncate(s: &str, max: usize) -> String {
1190    let char_count = s.chars().count();
1191    if char_count <= max {
1192        s.to_string()
1193    } else {
1194        let end: usize = s.char_indices().nth(max).map(|(i, _)| i).unwrap_or(s.len());
1195        format!("{}…", &s[..end])
1196    }
1197}
1198
1199/// Detect the project stack from `prompts/registry.yaml` if it exists.
1200/// Returns `None` silently if there is no registry or detection fails.
1201async fn detect_stack_if_registry_exists() -> Option<StackInfo> {
1202    let registry_path = std::path::Path::new("prompts/registry.yaml");
1203    if !registry_path.exists() {
1204        return None;
1205    }
1206    match Registry::from_file(registry_path).await {
1207        Ok(registry) => {
1208            let workspace = std::path::Path::new(".");
1209            match StackDetector::detect(&registry, workspace).await {
1210                Ok(info) => {
1211                    tracing::info!(stack = %info.name, "Detected project stack");
1212                    Some(info)
1213                }
1214                Err(e) => {
1215                    tracing::debug!("Stack detection failed: {e}");
1216                    None
1217                }
1218            }
1219        }
1220        Err(e) => {
1221            tracing::warn!("Failed to parse prompts/registry.yaml: {e}");
1222            None
1223        }
1224    }
1225}
1226
1227#[cfg(test)]
1228mod tests {
1229    use super::*;
1230    use crate::workflow::parser;
1231
1232    /// Global mutex to serialize tests that change the process working directory.
1233    static CWD_LOCK: std::sync::LazyLock<std::sync::Mutex<()>> =
1234        std::sync::LazyLock::new(|| std::sync::Mutex::new(()));
1235
1236    #[tokio::test]
1237    async fn engine_runs_sequential_cmd_steps() {
1238        let yaml = r#"
1239name: test
1240steps:
1241  - name: step1
1242    type: cmd
1243    run: "echo first"
1244  - name: step2
1245    type: cmd
1246    run: "echo second"
1247"#;
1248        let wf = parser::parse_str(yaml).unwrap();
1249        let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1250        let result = engine.run().await.unwrap();
1251        assert_eq!(result.text().trim(), "second");
1252        assert!(engine.context.get_step("step1").is_some());
1253        assert_eq!(
1254            engine.context.get_step("step1").unwrap().text().trim(),
1255            "first"
1256        );
1257    }
1258
1259    #[tokio::test]
1260    async fn engine_exposes_step_output_to_next_step() {
1261        let yaml = r#"
1262name: test
1263steps:
1264  - name: produce
1265    type: cmd
1266    run: "echo hello_world"
1267  - name: consume
1268    type: cmd
1269    run: "echo {{ steps.produce.stdout }}"
1270"#;
1271        let wf = parser::parse_str(yaml).unwrap();
1272        let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1273        let result = engine.run().await.unwrap();
1274        assert!(result.text().contains("hello_world"));
1275    }
1276
1277    #[tokio::test]
1278    async fn engine_collects_step_records_in_json_mode() {
1279        let yaml = r#"
1280name: json-test
1281steps:
1282  - name: alpha
1283    type: cmd
1284    run: "echo alpha"
1285  - name: beta
1286    type: cmd
1287    run: "echo beta"
1288"#;
1289        let wf = parser::parse_str(yaml).unwrap();
1290        let opts = EngineOptions {
1291            json: true,
1292            ..Default::default()
1293        };
1294        let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts).await;
1295        engine.run().await.unwrap();
1296
1297        let records = engine.step_records();
1298        assert_eq!(records.len(), 2);
1299        assert_eq!(records[0].name, "alpha");
1300        assert_eq!(records[0].status, "ok");
1301        assert!(!records[0].sandboxed);
1302        assert_eq!(records[1].name, "beta");
1303        assert_eq!(records[1].status, "ok");
1304    }
1305
1306    #[tokio::test]
1307    async fn json_output_includes_sandbox_mode() {
1308        let yaml = r#"
1309name: json-output-test
1310steps:
1311  - name: greet
1312    type: cmd
1313    run: "echo hello"
1314"#;
1315        let wf = parser::parse_str(yaml).unwrap();
1316        let opts = EngineOptions {
1317            json: true,
1318            ..Default::default()
1319        };
1320        let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts).await;
1321        let start = Instant::now();
1322        engine.run().await.unwrap();
1323        let out = engine.json_output("success", start.elapsed());
1324
1325        let json = serde_json::to_string(&out).unwrap();
1326        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1327
1328        assert_eq!(parsed["workflow_name"], "json-output-test");
1329        assert_eq!(parsed["status"], "success");
1330        assert_eq!(parsed["sandbox_mode"], "Disabled");
1331        assert!(parsed["steps"].is_array());
1332        assert_eq!(parsed["steps"][0]["name"], "greet");
1333    }
1334
1335    #[tokio::test]
1336    async fn should_sandbox_step_logic() {
1337        let yaml = r#"
1338name: test
1339steps:
1340  - name: s
1341    type: cmd
1342    run: "echo test"
1343"#;
1344        let wf = parser::parse_str(yaml).unwrap();
1345
1346        // Disabled mode → nothing sandboxed
1347        let engine = Engine::new(wf.clone(), "".to_string(), HashMap::new(), false, true).await;
1348        assert!(!engine.should_sandbox_step(&StepType::Cmd));
1349        assert!(!engine.should_sandbox_step(&StepType::Agent));
1350        assert!(!engine.should_sandbox_step(&StepType::Gate));
1351
1352        // FullWorkflow mode → cmd + agent sandboxed
1353        let opts = EngineOptions {
1354            sandbox_mode: SandboxMode::FullWorkflow,
1355            quiet: true,
1356            ..Default::default()
1357        };
1358        let engine = Engine::with_options(wf.clone(), "".to_string(), HashMap::new(), opts).await;
1359        assert!(engine.should_sandbox_step(&StepType::Cmd));
1360        assert!(engine.should_sandbox_step(&StepType::Agent));
1361        assert!(!engine.should_sandbox_step(&StepType::Gate));
1362
1363        // AgentOnly mode → only agent sandboxed
1364        let opts = EngineOptions {
1365            sandbox_mode: SandboxMode::AgentOnly,
1366            quiet: true,
1367            ..Default::default()
1368        };
1369        let engine = Engine::with_options(wf.clone(), "".to_string(), HashMap::new(), opts).await;
1370        assert!(!engine.should_sandbox_step(&StepType::Cmd));
1371        assert!(engine.should_sandbox_step(&StepType::Agent));
1372        assert!(!engine.should_sandbox_step(&StepType::Gate));
1373    }
1374
1375    #[tokio::test]
1376    async fn dry_run_does_not_panic() {
1377        let yaml = r#"
1378name: dry-run-test
1379scopes:
1380  lint_fix:
1381    steps:
1382      - name: lint
1383        type: cmd
1384        run: "npm run lint"
1385      - name: fix_lint
1386        type: agent
1387        prompt: "Fix lint errors"
1388steps:
1389  - name: setup
1390    type: cmd
1391    run: "echo setup"
1392  - name: validate
1393    type: gate
1394    condition: "{{ steps.setup.exit_code == 0 }}"
1395    on_pass: continue
1396    on_fail: fail
1397  - name: lint_gate
1398    type: repeat
1399    scope: lint_fix
1400    max_iterations: 3
1401"#;
1402        let wf = crate::workflow::parser::parse_str(yaml).unwrap();
1403        let engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1404        engine.dry_run();
1405    }
1406
1407    #[tokio::test]
1408    async fn dry_run_all_step_types() {
1409        let yaml = r#"
1410name: all-types
1411steps:
1412  - name: c
1413    type: cmd
1414    run: "ls"
1415  - name: g
1416    type: gate
1417    condition: "{{ true }}"
1418    on_pass: continue
1419  - name: p
1420    type: parallel
1421    steps:
1422      - name: p1
1423        type: cmd
1424        run: "echo p1"
1425"#;
1426        let wf = crate::workflow::parser::parse_str(yaml).unwrap();
1427        let engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1428        engine.dry_run();
1429    }
1430
1431    #[test]
1432    fn truncate_helper() {
1433        assert_eq!(truncate("hello", 10), "hello");
1434        assert_eq!(truncate("hello world", 5), "hello…");
1435    }
1436
1437    #[tokio::test]
1438    async fn resume_fails_when_no_state_file() {
1439        let yaml = r#"
1440name: no-state-workflow-xyz-unique
1441steps:
1442  - name: step1
1443    type: cmd
1444    run: "echo 1"
1445"#;
1446        let wf = crate::workflow::parser::parse_str(yaml).unwrap();
1447        let opts = EngineOptions {
1448            resume_from: Some("step1".to_string()),
1449            quiet: true,
1450            ..Default::default()
1451        };
1452        let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts).await;
1453        let err = engine.run().await.unwrap_err();
1454        assert!(
1455            err.to_string().contains("No state file found"),
1456            "Expected 'No state file found' but got: {err}"
1457        );
1458    }
1459
1460    #[tokio::test]
1461    async fn resume_fails_for_unknown_step() {
1462        let workflow_name = "test-resume-unknown-step";
1463        let state = WorkflowState::new(workflow_name);
1464        let tmp_path = format!("/tmp/minion-{workflow_name}-20991231235959.state.json");
1465        let path = PathBuf::from(&tmp_path);
1466        state.save(&path).unwrap();
1467
1468        let yaml = format!(
1469            r#"
1470name: {workflow_name}
1471steps:
1472  - name: step1
1473    type: cmd
1474    run: "echo 1"
1475"#
1476        );
1477        let wf = crate::workflow::parser::parse_str(&yaml).unwrap();
1478        let opts = EngineOptions {
1479            resume_from: Some("nonexistent_step".to_string()),
1480            quiet: true,
1481            ..Default::default()
1482        };
1483        let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts).await;
1484        let err = engine.run().await.unwrap_err();
1485        assert!(
1486            err.to_string().contains("not found in workflow"),
1487            "Expected 'not found in workflow' but got: {err}"
1488        );
1489
1490        let _ = std::fs::remove_file(&path);
1491    }
1492
1493    #[tokio::test]
1494    async fn safe_accessor_returns_empty_for_missing_step() {
1495        let yaml = r#"
1496name: test-safe-accessor
1497steps:
1498  - name: use_missing
1499    type: cmd
1500    run: "echo '{{ missing.output? }}'"
1501"#;
1502        let wf = parser::parse_str(yaml).unwrap();
1503        let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1504        let result = engine.run().await.unwrap();
1505        // safe accessor returns empty string when step doesn't exist
1506        assert_eq!(result.text().trim(), "");
1507    }
1508
1509    #[tokio::test]
1510    async fn safe_accessor_returns_value_when_present() {
1511        let yaml = r#"
1512name: test-safe-accessor-present
1513steps:
1514  - name: produce
1515    type: cmd
1516    run: "echo hello"
1517  - name: consume
1518    type: cmd
1519    run: "echo '{{ produce.output? }}'"
1520"#;
1521        let wf = parser::parse_str(yaml).unwrap();
1522        let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1523        let result = engine.run().await.unwrap();
1524        assert!(result.text().contains("hello"));
1525    }
1526
1527    #[tokio::test]
1528    async fn strict_accessor_fails_when_step_missing() {
1529        let yaml = r#"
1530name: test-strict-accessor-fail
1531steps:
1532  - name: use_missing
1533    type: cmd
1534    run: "echo '{{ nonexistent.output! }}'"
1535"#;
1536        let wf = parser::parse_str(yaml).unwrap();
1537        let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1538        let err = engine.run().await.unwrap_err();
1539        assert!(err.to_string().contains("strict access"), "{err}");
1540    }
1541
1542    #[tokio::test]
1543    async fn output_type_integer_parses_number() {
1544        let yaml = r#"
1545name: test-parse
1546steps:
1547  - name: count
1548    type: cmd
1549    run: "echo 42"
1550    output_type: integer
1551  - name: use_count
1552    type: cmd
1553    run: "echo {{ count.output }}"
1554"#;
1555        let wf = parser::parse_str(yaml).unwrap();
1556        let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1557        let result = engine.run().await.unwrap();
1558        assert_eq!(result.text().trim(), "42");
1559    }
1560
1561    #[tokio::test]
1562    async fn output_type_integer_fails_on_non_number() {
1563        let yaml = r#"
1564name: test-parse-fail
1565steps:
1566  - name: count
1567    type: cmd
1568    run: "echo not_a_number"
1569    output_type: integer
1570"#;
1571        let wf = parser::parse_str(yaml).unwrap();
1572        let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1573        let err = engine.run().await.unwrap_err();
1574        assert!(err.to_string().contains("integer"), "{err}");
1575    }
1576
1577    #[tokio::test]
1578    async fn output_type_json_allows_dot_access() {
1579        let yaml = r#"
1580name: test-json
1581steps:
1582  - name: scan
1583    type: cmd
1584    run: "echo '{\"count\": 5}'"
1585    output_type: json
1586  - name: use_scan
1587    type: cmd
1588    run: "echo {{ scan.output.count }}"
1589"#;
1590        let wf = parser::parse_str(yaml).unwrap();
1591        let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1592        let result = engine.run().await.unwrap();
1593        assert_eq!(result.text().trim(), "5");
1594    }
1595
1596    #[tokio::test]
1597    async fn output_type_lines_allows_length_filter() {
1598        let yaml = r#"
1599name: test-lines
1600steps:
1601  - name: files
1602    type: cmd
1603    run: "printf 'a.rs\nb.rs\nc.rs'"
1604    output_type: lines
1605  - name: count_files
1606    type: cmd
1607    run: "echo {{ files.output | length }}"
1608"#;
1609        let wf = parser::parse_str(yaml).unwrap();
1610        let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1611        let result = engine.run().await.unwrap();
1612        assert_eq!(result.text().trim(), "3");
1613    }
1614
1615    // ── Story 3.1: Async flag and pending futures ────────────────────────────
1616
1617    #[tokio::test]
1618    async fn async_step_is_spawned_and_completes() {
1619        let yaml = r#"
1620name: async-test
1621steps:
1622  - name: bg_task
1623    type: cmd
1624    run: "echo async_result"
1625    async_exec: true
1626  - name: sync_step
1627    type: cmd
1628    run: "echo sync_result"
1629"#;
1630        let wf = parser::parse_str(yaml).unwrap();
1631        let opts = EngineOptions {
1632            quiet: true,
1633            ..Default::default()
1634        };
1635        let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts).await;
1636        let result = engine.run().await.unwrap();
1637        // sync_step is the last synchronous step
1638        assert!(result.text().contains("sync_result"));
1639        // bg_task should be recorded after join_all
1640        let records = engine.step_records();
1641        assert!(
1642            records.iter().any(|r| r.name == "bg_task"),
1643            "bg_task should be in records"
1644        );
1645    }
1646
1647    #[tokio::test]
1648    async fn dry_run_shows_async_lightning_indicator() {
1649        let yaml = r#"
1650name: dry-async
1651steps:
1652  - name: fast_bg
1653    type: cmd
1654    run: "echo bg"
1655    async_exec: true
1656  - name: normal
1657    type: cmd
1658    run: "echo normal"
1659"#;
1660        // dry_run should not panic and the async step should have ⚡ in output
1661        let wf = parser::parse_str(yaml).unwrap();
1662        let engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1663        // Just verify it doesn't panic
1664        engine.dry_run();
1665    }
1666
1667    #[tokio::test]
1668    async fn should_sandbox_step_script_always_false() {
1669        let yaml = r#"
1670name: test
1671steps:
1672  - name: s
1673    type: cmd
1674    run: "echo test"
1675"#;
1676        let wf = parser::parse_str(yaml).unwrap();
1677
1678        // Script steps never run in sandbox, regardless of mode
1679        let opts = EngineOptions {
1680            sandbox_mode: SandboxMode::FullWorkflow,
1681            quiet: true,
1682            ..Default::default()
1683        };
1684        let engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts).await;
1685        assert!(!engine.should_sandbox_step(&StepType::Script));
1686    }
1687
1688    // ── Story 3.3: Await all remaining async futures at workflow end ─────────
1689
1690    #[tokio::test]
1691    async fn multiple_async_steps_all_complete_by_workflow_end() {
1692        let yaml = r#"
1693name: multi-async
1694steps:
1695  - name: task_a
1696    type: cmd
1697    run: "echo result_a"
1698    async_exec: true
1699  - name: task_b
1700    type: cmd
1701    run: "echo result_b"
1702    async_exec: true
1703  - name: sync_done
1704    type: cmd
1705    run: "echo done"
1706"#;
1707        let wf = parser::parse_str(yaml).unwrap();
1708        let opts = EngineOptions {
1709            quiet: true,
1710            ..Default::default()
1711        };
1712        let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts).await;
1713        engine.run().await.unwrap();
1714
1715        let records = engine.step_records();
1716        assert!(
1717            records.iter().any(|r| r.name == "task_a"),
1718            "task_a should be recorded"
1719        );
1720        assert!(
1721            records.iter().any(|r| r.name == "task_b"),
1722            "task_b should be recorded"
1723        );
1724        assert!(
1725            records.iter().any(|r| r.name == "sync_done"),
1726            "sync_done should be recorded"
1727        );
1728    }
1729
1730    // ── Story 4.3: Script step dispatch ─────────────────────────────────────
1731
1732    #[tokio::test]
1733    async fn engine_dispatches_script_step() {
1734        let yaml = r#"
1735name: script-dispatch
1736steps:
1737  - name: calc
1738    type: script
1739    run: |
1740      let x = 6 * 7;
1741      x.to_string()
1742"#;
1743        let wf = parser::parse_str(yaml).unwrap();
1744        let opts = EngineOptions {
1745            quiet: true,
1746            ..Default::default()
1747        };
1748        let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts).await;
1749        let result = engine.run().await.unwrap();
1750        assert_eq!(result.text().trim(), "42");
1751    }
1752
1753    #[tokio::test]
1754    async fn stack_context_injected_when_registry_exists() {
1755        let _lock = CWD_LOCK.lock().unwrap_or_else(|e| e.into_inner());
1756
1757        let dir = tempfile::tempdir().unwrap();
1758        let orig_dir = std::env::current_dir().unwrap();
1759
1760        // Create prompts/registry.yaml and a marker file
1761        let prompts_dir = dir.path().join("prompts");
1762        std::fs::create_dir_all(&prompts_dir).unwrap();
1763        let registry_yaml = r#"
1764version: 1
1765detection_order:
1766  - rust
1767stacks:
1768  rust:
1769    file_markers:
1770      - Cargo.toml
1771    tools:
1772      lint: "cargo clippy"
1773      test: "cargo test"
1774      build: "cargo build"
1775      install: "cargo fetch"
1776"#;
1777        std::fs::write(prompts_dir.join("registry.yaml"), registry_yaml).unwrap();
1778        // Create the Cargo.toml marker
1779        std::fs::write(dir.path().join("Cargo.toml"), "[package]").unwrap();
1780
1781        std::env::set_current_dir(dir.path()).unwrap();
1782
1783        let yaml = "name: test\nsteps:\n  - name: s\n    type: cmd\n    run: \"echo hi\"\n";
1784        let wf = parser::parse_str(yaml).unwrap();
1785        let engine = Engine::with_options(
1786            wf,
1787            "target".to_string(),
1788            HashMap::new(),
1789            EngineOptions {
1790                quiet: true,
1791                ..Default::default()
1792            },
1793        )
1794        .await;
1795
1796        let result = engine.context.render_template("{{ stack.name }}").unwrap();
1797        assert_eq!(result, "rust");
1798
1799        let lint = engine
1800            .context
1801            .render_template("{{ stack.tools.lint }}")
1802            .unwrap();
1803        assert_eq!(lint, "cargo clippy");
1804
1805        assert!(engine.stack_info.is_some());
1806        assert_eq!(engine.stack_info.as_ref().unwrap().name, "rust");
1807
1808        std::env::set_current_dir(orig_dir).unwrap();
1809    }
1810
1811    #[tokio::test]
1812    async fn stack_context_skipped_when_no_registry() {
1813        let _lock = CWD_LOCK.lock().unwrap_or_else(|e| e.into_inner());
1814
1815        let yaml = "name: test\nsteps:\n  - name: s\n    type: cmd\n    run: \"echo hi\"\n";
1816        let wf = parser::parse_str(yaml).unwrap();
1817        // Run from a tempdir without any registry.yaml
1818        let dir = tempfile::tempdir().unwrap();
1819        let orig_dir = std::env::current_dir().unwrap();
1820        std::env::set_current_dir(dir.path()).unwrap();
1821
1822        let engine = Engine::with_options(
1823            wf,
1824            "target".to_string(),
1825            HashMap::new(),
1826            EngineOptions {
1827                quiet: true,
1828                ..Default::default()
1829            },
1830        )
1831        .await;
1832        // Should not crash, stack_info is None
1833        assert!(engine.stack_info.is_none());
1834        // stack variable not in context
1835        assert!(engine.context.get_var("stack").is_none());
1836
1837        std::env::set_current_dir(orig_dir).unwrap();
1838    }
1839}