Skip to main content

minion_engine/engine/
mod.rs

1pub mod context;
2pub mod state;
3mod template;
4
5use std::collections::HashMap;
6use std::path::PathBuf;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use anyhow::{bail, Result};
11use colored::Colorize;
12use regex::Regex;
13use serde::Serialize;
14use tokio::sync::Mutex;
15use tokio::task::JoinHandle;
16
17use crate::cli::display;
18use crate::config::{ConfigManager, StepConfig};
19use crate::control_flow::ControlFlow;
20use crate::error::StepError;
21use crate::events::subscribers::{FileSubscriber, WebhookSubscriber};
22use crate::events::types::Event;
23use crate::events::EventBus;
24use crate::plugins::loader::PluginLoader;
25use crate::plugins::registry::PluginRegistry;
26use crate::prompts::{
27    detector::{StackDetector, StackInfo},
28    registry::Registry,
29};
30use crate::sandbox::config::SandboxConfig;
31use crate::sandbox::docker::DockerSandbox;
32use crate::sandbox::proxy::ApiProxy;
33use crate::sandbox::SandboxMode;
34use crate::steps::*;
35use crate::steps::{
36    agent::AgentExecutor, call::CallExecutor, chat::ChatExecutor, cmd::CmdExecutor,
37    gate::GateExecutor, map::MapExecutor, parallel::ParallelExecutor, repeat::RepeatExecutor,
38    script::ScriptExecutor, template_step::TemplateStepExecutor,
39};
40use crate::workflow::schema::{OutputType, StepDef, StepType, WorkflowDef};
41use context::Context;
42use state::WorkflowState;
43
44/// Options for configuring the Engine
45#[derive(Debug, Default)]
46pub struct EngineOptions {
47    pub verbose: bool,
48    pub quiet: bool,
49    /// Suppress display and emit JSON summary at end
50    pub json: bool,
51    /// Skip execution and show step tree
52    pub dry_run: bool,
53    /// Resume from this step name (requires a state file)
54    pub resume_from: Option<String>,
55    /// Sandbox mode resolved from CLI + config
56    pub sandbox_mode: SandboxMode,
57    /// GitHub repo (OWNER/REPO) to clone inside Docker instead of copying CWD
58    pub repo: Option<String>,
59}
60
61/// Per-step execution record collected for JSON output
62#[derive(Debug, Clone, Serialize)]
63pub struct StepRecord {
64    pub name: String,
65    pub step_type: String,
66    pub status: String,
67    pub duration_secs: f64,
68    pub output_summary: String,
69    #[serde(skip_serializing_if = "Option::is_none")]
70    pub input_tokens: Option<u64>,
71    #[serde(skip_serializing_if = "Option::is_none")]
72    pub output_tokens: Option<u64>,
73    #[serde(skip_serializing_if = "Option::is_none")]
74    pub cost_usd: Option<f64>,
75    /// Whether this step ran inside the Docker sandbox
76    #[serde(skip_serializing_if = "std::ops::Not::not")]
77    pub sandboxed: bool,
78}
79
80/// Full workflow JSON output (--json mode)
81#[derive(Debug, Serialize)]
82pub struct WorkflowJsonOutput {
83    pub workflow_name: String,
84    pub status: String,
85    pub sandbox_mode: String,
86    pub steps: Vec<StepRecord>,
87    pub total_duration_secs: f64,
88    pub total_tokens: u64,
89    pub total_cost_usd: f64,
90}
91
92#[allow(dead_code)]
93pub struct Engine {
94    pub workflow: WorkflowDef,
95    pub context: Context,
96    config_manager: Arc<ConfigManager>,
97    pub verbose: bool,
98    pub quiet: bool,
99    pub json: bool,
100    pub dry_run: bool,
101    resume_from: Option<String>,
102    sandbox_mode: SandboxMode,
103    /// Shared Docker sandbox instance (created during run(), destroyed on completion)
104    sandbox: SharedSandbox,
105    step_records: Vec<StepRecord>,
106    state: Option<WorkflowState>,
107    state_file: Option<PathBuf>,
108    /// Pending async step futures — keyed by step name
109    pending_futures: HashMap<String, JoinHandle<Result<StepOutput, StepError>>>,
110    /// Plugin registry for dynamically-loaded step types
111    plugin_registry: Arc<Mutex<PluginRegistry>>,
112    /// Event bus for lifecycle events
113    pub event_bus: EventBus,
114    /// Detected stack info (populated at init if prompts/registry.yaml exists)
115    pub stack_info: Option<StackInfo>,
116    /// GitHub repo (OWNER/REPO) — when set, sandbox clones this repo instead of copying CWD
117    repo: Option<String>,
118    /// Secure API proxy — holds secrets on the host, container only gets the proxy URL
119    api_proxy: Option<ApiProxy>,
120}
121
122#[allow(dead_code)]
123impl Engine {
124    pub async fn new(
125        workflow: WorkflowDef,
126        target: String,
127        vars: HashMap<String, serde_json::Value>,
128        verbose: bool,
129        quiet: bool,
130    ) -> Self {
131        let options = EngineOptions {
132            verbose,
133            quiet,
134            ..Default::default()
135        };
136        Self::with_options(workflow, target, vars, options).await
137    }
138
139    pub async fn with_options(
140        workflow: WorkflowDef,
141        target: String,
142        vars: HashMap<String, serde_json::Value>,
143        options: EngineOptions,
144    ) -> Self {
145        let mut context = Context::new(target, vars.clone());
146        // Wrap --var parameters into an "args" object so templates can use {{ args.mode }}
147        let args_obj: serde_json::Map<String, serde_json::Value> = vars.into_iter().collect();
148        context.insert_var("args", serde_json::Value::Object(args_obj));
149        let config_manager = Arc::new(ConfigManager::new(workflow.config.clone()));
150        // JSON mode implies quiet (no decorative output)
151        let quiet = options.quiet || options.json;
152
153        // ── Load plugins from workflow config ─────────────────────────────────
154        let mut registry = PluginRegistry::new();
155        for plugin_cfg in &workflow.config.plugins {
156            match PluginLoader::load_plugin(&plugin_cfg.path) {
157                Ok(plugin) => {
158                    tracing::info!(name = %plugin_cfg.name, path = %plugin_cfg.path, "Loaded plugin");
159                    registry.register(plugin);
160                }
161                Err(e) => {
162                    tracing::warn!(
163                        name = %plugin_cfg.name,
164                        path = %plugin_cfg.path,
165                        error = %e,
166                        "Failed to load plugin"
167                    );
168                }
169            }
170        }
171
172        // ── Wire up event subscribers from workflow config ─────────────────────
173        let mut event_bus = EventBus::new();
174        if let Some(ref events_cfg) = workflow.config.events {
175            if let Some(ref webhook_url) = events_cfg.webhook {
176                event_bus.add_subscriber(Box::new(WebhookSubscriber::new(webhook_url.clone())));
177                tracing::info!(url = %webhook_url, "Registered webhook event subscriber");
178            }
179            if let Some(ref file_path) = events_cfg.file {
180                event_bus.add_subscriber(Box::new(FileSubscriber::new(file_path.clone())));
181                tracing::info!(path = %file_path, "Registered file event subscriber");
182            }
183        }
184
185        // ── Detect stack if registry exists ──────────────────────────────────
186        let stack_info = detect_stack_if_registry_exists().await;
187        if let Some(ref info) = stack_info {
188            let stack_val = serde_json::json!({
189                "name": info.name,
190                "parent": info.parent_chain.first().cloned().unwrap_or_else(|| "_default".to_string()),
191                "tools": {
192                    "lint": info.tools.get("lint").cloned().unwrap_or_default(),
193                    "test": info.tools.get("test").cloned().unwrap_or_default(),
194                    "build": info.tools.get("build").cloned().unwrap_or_default(),
195                    "install": info.tools.get("install").cloned().unwrap_or_default(),
196                }
197            });
198            context.insert_var("stack", stack_val);
199            context.stack_info = Some(info.clone());
200        }
201        // Set prompts_dir from workflow config or default
202        if let Some(ref pd) = workflow.prompts_dir {
203            context.prompts_dir = std::path::PathBuf::from(pd);
204        }
205
206        Self {
207            workflow,
208            context,
209            config_manager,
210            verbose: options.verbose,
211            quiet,
212            json: options.json,
213            dry_run: options.dry_run,
214            resume_from: options.resume_from,
215            sandbox_mode: options.sandbox_mode,
216            sandbox: None,
217            step_records: Vec::new(),
218            state: None,
219            state_file: None,
220            pending_futures: HashMap::new(),
221            plugin_registry: Arc::new(Mutex::new(registry)),
222            event_bus,
223            stack_info,
224            repo: options.repo,
225            api_proxy: None,
226        }
227    }
228
229    /// Return collected step records (for JSON output or testing)
230    pub fn step_records(&self) -> &[StepRecord] {
231        &self.step_records
232    }
233
234    /// Build the JSON summary after execution
235    pub fn json_output(&self, status: &str, total_duration: Duration) -> WorkflowJsonOutput {
236        let total_tokens: u64 = self
237            .step_records
238            .iter()
239            .map(|r| r.input_tokens.unwrap_or(0) + r.output_tokens.unwrap_or(0))
240            .sum();
241        let total_cost: f64 = self.step_records.iter().filter_map(|r| r.cost_usd).sum();
242
243        WorkflowJsonOutput {
244            workflow_name: self.workflow.name.clone(),
245            status: status.to_string(),
246            sandbox_mode: format!("{:?}", self.sandbox_mode),
247            steps: self.step_records.clone(),
248            total_duration_secs: total_duration.as_secs_f64(),
249            total_tokens,
250            total_cost_usd: total_cost,
251        }
252    }
253
254    // ── Sandbox Lifecycle ────────────────────────────────────────────────────
255
256    /// Create and start the Docker sandbox container.
257    /// Copies the current working directory into the container as /workspace.
258    async fn sandbox_up(&mut self) -> Result<()> {
259        let sandbox_config = SandboxConfig::from_global_config(&self.workflow.config.global);
260        let workspace = std::env::current_dir()
261            .map(|p| p.to_string_lossy().to_string())
262            .unwrap_or_else(|_| ".".to_string());
263
264        let is_repo_mode = self.repo.is_some();
265
266        // In repo mode, use a minimal temp dir as workspace (the real repo
267        // will be cloned inside the container). Otherwise, use the host CWD.
268        let effective_workspace = if is_repo_mode {
269            let tmp = std::env::temp_dir().join("minion-repo-workspace");
270            std::fs::create_dir_all(&tmp).ok();
271            // Initialize a bare git repo so the sandbox doesn't complain
272            let _ = std::process::Command::new("git")
273                .args(["init", "--bare"])
274                .current_dir(&tmp)
275                .stdout(std::process::Stdio::null())
276                .stderr(std::process::Stdio::null())
277                .status();
278            tmp.to_string_lossy().to_string()
279        } else {
280            workspace.clone()
281        };
282
283        let mut docker = DockerSandbox::new(sandbox_config, &effective_workspace);
284
285        // ── Start API proxy to keep secrets on the host ─────────────
286        if let Ok(api_key) = std::env::var("ANTHROPIC_API_KEY") {
287            match ApiProxy::start(api_key).await {
288                Ok(proxy) => {
289                    docker.set_proxy(proxy.port());
290                    if !self.quiet {
291                        println!(
292                            "  {} API proxy started on port {} — secrets stay on host",
293                            "🔐".green(),
294                            proxy.port()
295                        );
296                    }
297                    self.api_proxy = Some(proxy);
298                }
299                Err(e) => {
300                    tracing::warn!(error = %e, "Failed to start API proxy — falling back to env vars");
301                }
302            }
303        }
304
305        if !self.quiet {
306            if let Some(ref repo) = self.repo {
307                println!("  {} Creating Docker sandbox (repo: {})…", "🐳".cyan(), repo);
308            } else {
309                println!("  {} Creating Docker sandbox container…", "🐳".cyan());
310            }
311        }
312
313        let t0 = Instant::now();
314        docker.create().await?;
315        let create_ms = t0.elapsed().as_millis();
316
317        let t1 = Instant::now();
318        if is_repo_mode {
319            // Clone the GitHub repo inside the container instead of copying CWD
320            let repo = self.repo.as_ref().unwrap();
321            let clone_output = docker
322                .run_command(&format!(
323                    "cd /workspace && rm -rf * .* 2>/dev/null; \
324                     git clone --depth=50 https://x-access-token:$GH_TOKEN@github.com/{repo}.git ."
325                ))
326                .await?;
327
328            if clone_output.exit_code != 0 {
329                bail!(
330                    "Failed to clone repo '{}' inside sandbox: {}",
331                    repo,
332                    clone_output.stderr
333                );
334            }
335            tracing::info!(repo = %repo, "Cloned repository inside sandbox");
336        } else {
337            docker.copy_workspace(&workspace).await?;
338        }
339        let copy_ms = t1.elapsed().as_millis();
340
341        // Configure Git safe.directory inside the container so that
342        // Git/gh commands work on the copied workspace (avoids
343        // "dubious ownership" errors when host UID != container UID).
344        // Also set user.name/email defaults for any git operations.
345        let t2 = Instant::now();
346        let _ = docker
347            .run_command(
348                "git config --global --add safe.directory /workspace \
349                 && git config --global user.name 'Minion Engine' \
350                 && git config --global user.email 'minion@localhost' \
351                 && if [ -n \"$GH_TOKEN\" ]; then \
352                      git config --global credential.helper '!f() { echo \"password=$GH_TOKEN\"; }; f'; \
353                      git config --global credential.https://github.com.username x-access-token; \
354                    fi",
355            )
356            .await;
357
358        // Configure the minion user for agent steps (Claude CLI runs as non-root).
359        // The `if id minion` guard ensures backward compatibility with older images.
360        let _ = docker
361            .run_command(
362                "if id minion >/dev/null 2>&1; then \
363                   chown -R minion:minion /workspace 2>/dev/null; \
364                   su - minion -c 'git config --global --add safe.directory /workspace \
365                     && git config --global user.name \"Minion Engine\" \
366                     && git config --global user.email \"minion@localhost\"'; \
367                   if [ -n \"$GH_TOKEN\" ]; then \
368                     su - minion -c \"git config --global credential.helper '!f() { echo password=\\$GH_TOKEN; }; f' \
369                       && git config --global credential.https://github.com.username x-access-token\"; \
370                   fi; \
371                 fi",
372            )
373            .await;
374        let git_ms = t2.elapsed().as_millis();
375
376        let total_ms = t0.elapsed().as_millis();
377
378        if !self.quiet {
379            let mode_label = if is_repo_mode { "clone" } else { "copy" };
380            println!(
381                "  {} Sandbox ready — container {:.1}s, {} {:.1}s, git {:.1}s (total {:.1}s)",
382                "🔒".green(),
383                create_ms as f64 / 1000.0,
384                mode_label,
385                copy_ms as f64 / 1000.0,
386                git_ms as f64 / 1000.0,
387                total_ms as f64 / 1000.0,
388            );
389        }
390
391        tracing::info!(
392            create_ms,
393            copy_ms,
394            git_ms,
395            total_ms,
396            repo_mode = is_repo_mode,
397            "Sandbox setup complete"
398        );
399
400        self.sandbox = Some(Arc::new(Mutex::new(docker)));
401        Ok(())
402    }
403
404    /// Copy results from sandbox back to host, then destroy the container.
405    /// In repo mode (--repo), skip copy_results since there's no host workspace
406    /// to write back to — all side-effects (pushes, PR comments) happen inside the container.
407    async fn sandbox_down(&mut self) -> Result<()> {
408        if let Some(sb) = self.sandbox.take() {
409            let mut docker = sb.lock().await;
410            let is_repo_mode = self.repo.is_some();
411
412            let copy_back_ms = if is_repo_mode {
413                // In repo mode, no host directory to copy back to.
414                // All side-effects (git push, gh pr create/comment) happened inside the container.
415                if !self.quiet {
416                    println!("  {} Repo mode — skipping copy-back (all changes pushed from container)", "📦".cyan());
417                }
418                0u128
419            } else {
420                let workspace = std::env::current_dir()
421                    .map(|p| p.to_string_lossy().to_string())
422                    .unwrap_or_else(|_| ".".to_string());
423
424                if !self.quiet {
425                    println!("  {} Copying results from sandbox…", "📦".cyan());
426                }
427
428                let t0 = Instant::now();
429                docker.copy_results(&workspace).await?;
430                t0.elapsed().as_millis()
431            };
432
433            let t1 = Instant::now();
434            docker.destroy().await?;
435            let destroy_ms = t1.elapsed().as_millis();
436
437            if !self.quiet {
438                println!(
439                    "  {} Sandbox destroyed — copy-back {:.1}s, destroy {:.1}s",
440                    "🗑️ ".dimmed(),
441                    copy_back_ms as f64 / 1000.0,
442                    destroy_ms as f64 / 1000.0,
443                );
444            }
445
446            tracing::info!(copy_back_ms, destroy_ms, repo_mode = is_repo_mode, "Sandbox teardown complete");
447        }
448
449        // Stop the API proxy (if running)
450        if let Some(proxy) = self.api_proxy.take() {
451            proxy.stop().await;
452        }
453
454        Ok(())
455    }
456
457    /// Determine whether a step should run inside the sandbox based on sandbox_mode
458    fn should_sandbox_step(&self, step_type: &StepType) -> bool {
459        // Script steps run embedded (no external process), never sandboxed
460        if *step_type == StepType::Script {
461            return false;
462        }
463        match self.sandbox_mode {
464            SandboxMode::Disabled => false,
465            SandboxMode::FullWorkflow | SandboxMode::Devbox => {
466                // ALL executable steps run inside the sandbox
467                matches!(step_type, StepType::Cmd | StepType::Agent)
468            }
469            SandboxMode::AgentOnly => {
470                // Only agent steps run inside the sandbox; cmd steps run on host
471                matches!(step_type, StepType::Agent)
472            }
473        }
474    }
475
476    // ── Main Run Loop ────────────────────────────────────────────────────────
477
478    pub async fn run(&mut self) -> Result<StepOutput> {
479        // ── State / Resume setup ──────────────────────────────────────────────
480        let state_file = WorkflowState::state_file_path(&self.workflow.name);
481        self.state_file = Some(state_file.clone());
482
483        let mut loaded_state: Option<WorkflowState> = None;
484        if let Some(ref resume_step) = self.resume_from.clone() {
485            match WorkflowState::find_latest(&self.workflow.name) {
486                Some(path) => match WorkflowState::load(&path) {
487                    Ok(s) => {
488                        let exists = self.workflow.steps.iter().any(|s| &s.name == resume_step);
489                        if !exists {
490                            bail!(
491                                "Resume step '{}' not found in workflow '{}'. \
492                                     Available steps: {}",
493                                resume_step,
494                                self.workflow.name,
495                                self.workflow
496                                    .steps
497                                    .iter()
498                                    .map(|s| s.name.as_str())
499                                    .collect::<Vec<_>>()
500                                    .join(", ")
501                            );
502                        }
503                        if !self.quiet {
504                            println!(
505                                "  {} Resuming from step '{}' (state: {})",
506                                "↺".cyan(),
507                                resume_step,
508                                path.display()
509                            );
510                        }
511                        loaded_state = Some(s);
512                    }
513                    Err(e) => bail!("Failed to load state file {}: {e}", path.display()),
514                },
515                None => {
516                    bail!(
517                        "No state file found for workflow '{}'. \
518                         Cannot resume. Run the workflow without --resume first.",
519                        self.workflow.name
520                    );
521                }
522            }
523        }
524
525        // ── Initialize persisted state for this run ───────────────────────────
526        let mut current_state = WorkflowState::new(&self.workflow.name);
527
528        // ── Display ───────────────────────────────────────────────────────────
529        if !self.quiet {
530            display::workflow_start(&self.workflow.name);
531            if self.sandbox_mode != SandboxMode::Disabled {
532                println!("  {} Sandbox mode: {:?}", "🔒".cyan(), self.sandbox_mode);
533            }
534        }
535
536        // ── Sandbox: Create container BEFORE step execution ───────────────────
537        if self.sandbox_mode != SandboxMode::Disabled {
538            self.sandbox_up().await?;
539        }
540
541        // ── Event: WorkflowStarted ────────────────────────────────────────────
542        self.event_bus
543            .emit(Event::WorkflowStarted {
544                timestamp: chrono::Utc::now(),
545            })
546            .await;
547
548        let start = Instant::now();
549        let steps = self.workflow.steps.clone();
550        let mut last_output = StepOutput::Empty;
551        let mut step_count = 0;
552
553        let resume_from = self.resume_from.clone();
554        let mut resuming = resume_from.is_some();
555
556        // ── Execute steps ─────────────────────────────────────────────────────
557        let run_result: Result<(), anyhow::Error> = async {
558            for step_def in &steps {
559                // ── Resume: skip steps before the resume point ────────────────
560                if resuming {
561                    let is_resume_point = resume_from.as_deref() == Some(&step_def.name);
562                    if !is_resume_point {
563                        if let Some(ref ls) = loaded_state {
564                            if let Some(output) = ls.steps.get(&step_def.name) {
565                                self.context.store(&step_def.name, output.clone());
566                                if !self.quiet {
567                                    println!(
568                                        "  {} {} {}",
569                                        "⏭".yellow(),
570                                        step_def.name,
571                                        "(skipped — loaded from state)".dimmed()
572                                    );
573                                }
574                                self.step_records.push(StepRecord {
575                                    name: step_def.name.clone(),
576                                    step_type: step_def.step_type.to_string(),
577                                    status: "skipped_resume".to_string(),
578                                    duration_secs: 0.0,
579                                    output_summary: truncate(output.text(), 100),
580                                    input_tokens: None,
581                                    output_tokens: None,
582                                    cost_usd: None,
583                                    sandboxed: false,
584                                });
585                            }
586                        }
587                        continue;
588                    }
589                    resuming = false;
590                }
591
592                // ── Async step: spawn and register in pending_futures ─────────
593                if step_def.async_exec == Some(true) {
594                    let handle = self.spawn_async_step(step_def);
595                    self.pending_futures.insert(step_def.name.clone(), handle);
596                    if !self.quiet {
597                        println!(
598                            "  {} {} {} {}",
599                            "⚡".yellow(),
600                            step_def.name,
601                            format!("[{}]", step_def.step_type).cyan(),
602                            "(async — spawned)".dimmed()
603                        );
604                    }
605                    step_count += 1;
606                    continue;
607                }
608
609                // ── Auto-await: resolve pending deps before execute ────────────
610                self.await_pending_deps(step_def).await?;
611
612                match self.execute_step(step_def).await {
613                    Ok(output) => {
614                        current_state
615                            .steps
616                            .insert(step_def.name.clone(), output.clone());
617                        if let Some(ref p) = self.state_file {
618                            let _ = current_state.save(p);
619                        }
620                        last_output = output;
621                        step_count += 1;
622                    }
623                    Err(StepError::ControlFlow(ControlFlow::Skip { message })) => {
624                        self.context.store(&step_def.name, StepOutput::Empty);
625                        if !self.quiet {
626                            let pb = display::step_start(
627                                &step_def.name,
628                                &step_def.step_type.to_string(),
629                            );
630                            display::step_skip(&pb, &step_def.name, &message);
631                        }
632                        self.step_records.push(StepRecord {
633                            name: step_def.name.clone(),
634                            step_type: step_def.step_type.to_string(),
635                            status: "skipped".to_string(),
636                            duration_secs: 0.0,
637                            output_summary: message.clone(),
638                            input_tokens: None,
639                            output_tokens: None,
640                            cost_usd: None,
641                            sandboxed: false,
642                        });
643                    }
644                    Err(StepError::ControlFlow(ControlFlow::Fail { message })) => {
645                        if !self.quiet {
646                            display::workflow_failed(&step_def.name, &message);
647                        }
648                        bail!("Step '{}' failed: {}", step_def.name, message);
649                    }
650                    Err(StepError::ControlFlow(ControlFlow::Break { .. })) => {
651                        break;
652                    }
653                    Err(e) => {
654                        if !self.quiet {
655                            display::workflow_failed(&step_def.name, &e.to_string());
656                        }
657                        return Err(e.into());
658                    }
659                }
660            }
661            Ok(())
662        }
663        .await;
664
665        // ── Story 3.3: Await all remaining pending async futures ──────────────
666        let remaining: Vec<(String, JoinHandle<Result<StepOutput, StepError>>)> =
667            self.pending_futures.drain().collect();
668        for (name, handle) in remaining {
669            let step_type = self
670                .workflow
671                .steps
672                .iter()
673                .find(|s| s.name == name)
674                .map(|s| s.step_type.to_string())
675                .unwrap_or_else(|| "async".to_string());
676            match handle.await {
677                Ok(Ok(output)) => {
678                    self.context.store(&name, output.clone());
679                    self.step_records.push(StepRecord {
680                        name: name.clone(),
681                        step_type: step_type.clone(),
682                        status: "ok".to_string(),
683                        duration_secs: 0.0,
684                        output_summary: truncate(output.text(), 100),
685                        input_tokens: None,
686                        output_tokens: None,
687                        cost_usd: None,
688                        sandboxed: false,
689                    });
690                }
691                Ok(Err(e)) => {
692                    self.step_records.push(StepRecord {
693                        name: name.clone(),
694                        step_type: step_type.clone(),
695                        status: "failed".to_string(),
696                        duration_secs: 0.0,
697                        output_summary: e.to_string(),
698                        input_tokens: None,
699                        output_tokens: None,
700                        cost_usd: None,
701                        sandboxed: false,
702                    });
703                    if !self.quiet {
704                        eprintln!("  {} Async step '{}' failed: {}", "✗".red(), name, e);
705                    }
706                }
707                Err(e) => {
708                    let msg = format!("Async step '{}' panicked: {e}", name);
709                    self.step_records.push(StepRecord {
710                        name: name.clone(),
711                        step_type,
712                        status: "failed".to_string(),
713                        duration_secs: 0.0,
714                        output_summary: msg.clone(),
715                        input_tokens: None,
716                        output_tokens: None,
717                        cost_usd: None,
718                        sandboxed: false,
719                    });
720                    if !self.quiet {
721                        eprintln!("  {} {}", "✗".red(), msg);
722                    }
723                }
724            }
725        }
726
727        // ── Sandbox: Destroy container AFTER all steps (always, even on error) ─
728        if self.sandbox_mode != SandboxMode::Disabled {
729            if let Err(e) = self.sandbox_down().await {
730                if !self.quiet {
731                    eprintln!("  {} Sandbox cleanup warning: {e}", "⚠".yellow());
732                }
733            }
734        }
735
736        // ── Event: WorkflowCompleted ──────────────────────────────────────────
737        self.event_bus
738            .emit(Event::WorkflowCompleted {
739                duration_ms: start.elapsed().as_millis() as u64,
740                timestamp: chrono::Utc::now(),
741            })
742            .await;
743
744        // Propagate any error from the step loop
745        run_result?;
746
747        if !self.quiet {
748            display::workflow_done(start.elapsed(), step_count);
749        }
750
751        self.state = Some(current_state);
752        Ok(last_output)
753    }
754
755    pub async fn execute_step(&mut self, step_def: &StepDef) -> Result<StepOutput, StepError> {
756        let config = self.resolve_config(step_def);
757        let use_sandbox = self.should_sandbox_step(&step_def.step_type);
758
759        let pb = if !self.quiet {
760            let label = if use_sandbox {
761                format!("{} 🐳", step_def.step_type)
762            } else {
763                step_def.step_type.to_string()
764            };
765            Some(display::step_start(&step_def.name, &label))
766        } else {
767            None
768        };
769
770        let start = Instant::now();
771
772        // ── Event: StepStarted ────────────────────────────────────────────────
773        self.event_bus
774            .emit(Event::StepStarted {
775                step_name: step_def.name.clone(),
776                step_type: step_def.step_type.to_string(),
777                timestamp: chrono::Utc::now(),
778            })
779            .await;
780
781        tracing::debug!(
782            step = %step_def.name,
783            step_type = %step_def.step_type,
784            sandboxed = use_sandbox,
785            "Executing step"
786        );
787
788        // Choose sandbox-aware execution when sandbox is active for this step
789        let sandbox_ref = if use_sandbox { &self.sandbox } else { &None };
790
791        let result = match step_def.step_type {
792            StepType::Cmd => {
793                CmdExecutor
794                    .execute_sandboxed(step_def, &config, &self.context, sandbox_ref)
795                    .await
796            }
797            StepType::Agent => {
798                AgentExecutor
799                    .execute_sandboxed(step_def, &config, &self.context, sandbox_ref)
800                    .await
801            }
802            StepType::Gate => GateExecutor.execute(step_def, &config, &self.context).await,
803            StepType::Repeat => {
804                RepeatExecutor::new(&self.workflow.scopes, self.sandbox.clone())
805                    .with_config_manager(Some(Arc::clone(&self.config_manager)))
806                    .execute(step_def, &config, &self.context)
807                    .await
808            }
809            StepType::Chat => ChatExecutor.execute(step_def, &config, &self.context).await,
810            StepType::Map => {
811                MapExecutor::new(&self.workflow.scopes, self.sandbox.clone())
812                    .with_config_manager(Some(Arc::clone(&self.config_manager)))
813                    .execute(step_def, &config, &self.context)
814                    .await
815            }
816            StepType::Parallel => {
817                ParallelExecutor::new(&self.workflow.scopes, self.sandbox.clone())
818                    .with_config_manager(Some(Arc::clone(&self.config_manager)))
819                    .execute(step_def, &config, &self.context)
820                    .await
821            }
822            StepType::Call => {
823                CallExecutor::new(&self.workflow.scopes, self.sandbox.clone())
824                    .with_config_manager(Some(Arc::clone(&self.config_manager)))
825                    .execute(step_def, &config, &self.context)
826                    .await
827            }
828            StepType::Template => {
829                let prompts_dir = self.workflow.prompts_dir.as_deref();
830                TemplateStepExecutor::new(prompts_dir)
831                    .execute(step_def, &config, &self.context)
832                    .await
833            }
834            StepType::Script => {
835                ScriptExecutor
836                    .execute(step_def, &config, &self.context)
837                    .await
838            } // Note: all StepType variants are covered above.
839              // Plugin dispatch will be added via a dedicated StepType::Plugin variant.
840        };
841
842        let elapsed = start.elapsed();
843        let duration_ms = elapsed.as_millis() as u64;
844
845        // ── Output Parsing Section ────────────────────────────────────────────
846        // Parse step output according to output_type (if declared)
847        let result = match result {
848            Ok(output) => parse_step_output(output, step_def),
849            err => err,
850        };
851
852        match &result {
853            Ok(output) => {
854                tracing::info!(
855                    step = %step_def.name,
856                    step_type = %step_def.step_type,
857                    duration_ms = elapsed.as_millis(),
858                    sandboxed = use_sandbox,
859                    status = "ok",
860                    "Step completed"
861                );
862                self.context.store(&step_def.name, output.clone());
863                // Store parsed value separately if present
864                if let Some(parsed) = extract_parsed_value(output, step_def) {
865                    self.context.store_parsed(&step_def.name, parsed);
866                }
867
868                let (it, ot, cost) = token_stats(output);
869                self.step_records.push(StepRecord {
870                    name: step_def.name.clone(),
871                    step_type: step_def.step_type.to_string(),
872                    status: "ok".to_string(),
873                    duration_secs: elapsed.as_secs_f64(),
874                    output_summary: truncate(output.text(), 100),
875                    input_tokens: it,
876                    output_tokens: ot,
877                    cost_usd: cost,
878                    sandboxed: use_sandbox,
879                });
880
881                if let Some(pb) = &pb {
882                    display::step_ok(pb, &step_def.name, elapsed);
883                }
884            }
885            Err(StepError::ControlFlow(cf)) => {
886                let msg = match cf {
887                    ControlFlow::Skip { message } => format!("skipped: {message}"),
888                    ControlFlow::Break { message, .. } => format!("break: {message}"),
889                    ControlFlow::Fail { message } => format!("failed: {message}"),
890                    ControlFlow::Next { message } => format!("next: {message}"),
891                };
892                tracing::info!(
893                    step = %step_def.name,
894                    step_type = %step_def.step_type,
895                    duration_ms = elapsed.as_millis(),
896                    status = "control_flow",
897                    message = %msg,
898                    "Step control flow"
899                );
900                if let Some(pb) = &pb {
901                    display::step_skip(pb, &step_def.name, &msg);
902                }
903            }
904            Err(e) => {
905                tracing::warn!(
906                    step = %step_def.name,
907                    step_type = %step_def.step_type,
908                    duration_ms = elapsed.as_millis(),
909                    status = "error",
910                    error = %e,
911                    "Step failed"
912                );
913                self.step_records.push(StepRecord {
914                    name: step_def.name.clone(),
915                    step_type: step_def.step_type.to_string(),
916                    status: "failed".to_string(),
917                    duration_secs: elapsed.as_secs_f64(),
918                    output_summary: e.to_string(),
919                    input_tokens: None,
920                    output_tokens: None,
921                    cost_usd: None,
922                    sandboxed: use_sandbox,
923                });
924                if let Some(pb) = &pb {
925                    display::step_fail(pb, &step_def.name, &e.to_string());
926                }
927            }
928        }
929
930        // ── Event: StepCompleted / StepFailed ─────────────────────────────────
931        match &result {
932            Ok(_) => {
933                self.event_bus
934                    .emit(Event::StepCompleted {
935                        step_name: step_def.name.clone(),
936                        step_type: step_def.step_type.to_string(),
937                        duration_ms,
938                        timestamp: chrono::Utc::now(),
939                    })
940                    .await;
941            }
942            Err(e) if !matches!(e, StepError::ControlFlow(_)) => {
943                self.event_bus
944                    .emit(Event::StepFailed {
945                        step_name: step_def.name.clone(),
946                        step_type: step_def.step_type.to_string(),
947                        error: e.to_string(),
948                        duration_ms,
949                        timestamp: chrono::Utc::now(),
950                    })
951                    .await;
952            }
953            _ => {}
954        }
955
956        result
957    }
958
959    /// Dry-run: walk all steps and print a visual tree without executing anything.
960    pub fn dry_run(&self) {
961        use colored::Colorize;
962
963        println!(
964            "{} {} (dry-run)",
965            "▶".cyan().bold(),
966            self.workflow.name.bold()
967        );
968        if self.sandbox_mode != SandboxMode::Disabled {
969            println!("  {} Sandbox mode: {:?}", "🔒".cyan(), self.sandbox_mode);
970        }
971        println!();
972
973        let steps = &self.workflow.steps;
974        let total = steps.len();
975        for (i, step) in steps.iter().enumerate() {
976            let is_last = i + 1 == total;
977            let branch = if is_last { "└──" } else { "├──" };
978            let config = self.resolve_config(step);
979
980            let sandbox_indicator = if self.should_sandbox_step(&step.step_type) {
981                " 🐳"
982            } else {
983                ""
984            };
985
986            let async_indicator = if step.async_exec == Some(true) {
987                " ⚡"
988            } else {
989                ""
990            };
991
992            println!(
993                "{} {} {}{}{}",
994                branch.dimmed(),
995                step.name.bold(),
996                format!("[{}]", step.step_type).cyan(),
997                sandbox_indicator,
998                async_indicator
999            );
1000
1001            let indent = if is_last { "    " } else { "│   " };
1002            self.print_step_details(step, &config, indent);
1003
1004            if !is_last {
1005                println!("│");
1006            }
1007        }
1008    }
1009
1010    fn print_step_details(&self, step: &StepDef, config: &StepConfig, indent: &str) {
1011        use colored::Colorize;
1012
1013        match step.step_type {
1014            StepType::Cmd => {
1015                if let Some(ref run) = step.run {
1016                    let preview = truncate(run, 80);
1017                    println!("{}  run: {}", indent, preview.dimmed());
1018                }
1019            }
1020            StepType::Agent | StepType::Chat => {
1021                if let Some(ref prompt) = step.prompt {
1022                    let preview = truncate(&prompt.replace('\n', " "), 80);
1023                    println!("{}  prompt: {}", indent, preview.dimmed());
1024                }
1025                if let Some(model) = config.get_str("model") {
1026                    println!("{}  model: {}", indent, model.dimmed());
1027                }
1028            }
1029            StepType::Gate => {
1030                if let Some(ref cond) = step.condition {
1031                    println!("{}  condition: {}", indent, cond.dimmed());
1032                }
1033                println!(
1034                    "{}  on_pass: {} / on_fail: {}",
1035                    indent,
1036                    step.on_pass.as_deref().unwrap_or("continue").dimmed(),
1037                    step.on_fail.as_deref().unwrap_or("continue").dimmed()
1038                );
1039            }
1040            StepType::Repeat => {
1041                let scope_name = step.scope.as_deref().unwrap_or("<none>");
1042                let max_iter = step.max_iterations.unwrap_or(1);
1043                println!("{}  scope: {}", indent, scope_name.dimmed());
1044                println!(
1045                    "{}  max_iterations: {}",
1046                    indent,
1047                    max_iter.to_string().dimmed()
1048                );
1049                self.print_scope_steps(scope_name, indent);
1050            }
1051            StepType::Map => {
1052                let scope_name = step.scope.as_deref().unwrap_or("<none>");
1053                let items = step.items.as_deref().unwrap_or("<none>");
1054                println!("{}  items: {}", indent, items.dimmed());
1055                println!("{}  scope: {}", indent, scope_name.dimmed());
1056                if let Some(p) = step.parallel {
1057                    println!("{}  parallel: {}", indent, p.to_string().dimmed());
1058                }
1059                self.print_scope_steps(scope_name, indent);
1060            }
1061            StepType::Call => {
1062                let scope_name = step.scope.as_deref().unwrap_or("<none>");
1063                println!("{}  scope: {}", indent, scope_name.dimmed());
1064                self.print_scope_steps(scope_name, indent);
1065            }
1066            StepType::Parallel => {
1067                if let Some(ref sub_steps) = step.steps {
1068                    println!("{}  parallel steps:", indent);
1069                    for sub in sub_steps {
1070                        println!(
1071                            "{}    • {} [{}]",
1072                            indent,
1073                            sub.name.bold(),
1074                            sub.step_type.to_string().cyan()
1075                        );
1076                    }
1077                }
1078            }
1079            StepType::Template => {
1080                if let Some(ref run) = step.run {
1081                    println!("{}  template: {}", indent, run.dimmed());
1082                }
1083            }
1084            StepType::Script => {
1085                if let Some(ref run) = step.run {
1086                    let preview = truncate(&run.replace('\n', " "), 80);
1087                    println!("{}  script: {}", indent, preview.dimmed());
1088                }
1089            }
1090        }
1091
1092        if let Some(t) = config.get_str("timeout") {
1093            println!("{}  timeout: {}", indent, t.dimmed());
1094        }
1095    }
1096
1097    fn print_scope_steps(&self, scope_name: &str, indent: &str) {
1098        use colored::Colorize;
1099        if let Some(scope) = self.workflow.scopes.get(scope_name) {
1100            println!("{}  scope steps:", indent);
1101            for step in &scope.steps {
1102                println!(
1103                    "{}    • {} [{}]",
1104                    indent,
1105                    step.name.bold(),
1106                    step.step_type.to_string().cyan()
1107                );
1108            }
1109        }
1110    }
1111
1112    fn resolve_config(&self, step_def: &StepDef) -> StepConfig {
1113        self.config_manager
1114            .resolve(&step_def.name, &step_def.step_type, &step_def.config)
1115    }
1116
1117    // ── Async Step Support ───────────────────────────────────────────────────
1118
1119    /// Spawn an async step as a tokio task. Returns a JoinHandle for later awaiting.
1120    /// Creates a minimal context (target only) for the spawned task since the
1121    /// executor templates are rendered inside the task.
1122    fn spawn_async_step(&self, step_def: &StepDef) -> JoinHandle<Result<StepOutput, StepError>> {
1123        let step = step_def.clone();
1124        let config = self.resolve_config(step_def);
1125        let target = self
1126            .context
1127            .get_var("target")
1128            .and_then(|v| v.as_str())
1129            .unwrap_or("")
1130            .to_string();
1131
1132        tokio::spawn(async move {
1133            let ctx = Context::new(target, HashMap::new());
1134            match step.step_type {
1135                StepType::Cmd => CmdExecutor.execute(&step, &config, &ctx).await,
1136                StepType::Agent => AgentExecutor.execute(&step, &config, &ctx).await,
1137                StepType::Script => ScriptExecutor.execute(&step, &config, &ctx).await,
1138                _ => Err(StepError::Fail(format!(
1139                    "Async execution not supported for step type '{}'",
1140                    step.step_type
1141                ))),
1142            }
1143        })
1144    }
1145
1146    /// Scan the step's template fields for references to other steps.
1147    /// If any referenced step is in pending_futures, await it and store result in context.
1148    async fn await_pending_deps(&mut self, step_def: &StepDef) -> Result<(), StepError> {
1149        let pattern = Regex::new(r"steps\.(\w+)\.").unwrap();
1150
1151        // Collect all template fields that might reference step outputs
1152        let mut templates: Vec<String> = Vec::new();
1153        if let Some(ref run) = step_def.run {
1154            templates.push(run.clone());
1155        }
1156        if let Some(ref prompt) = step_def.prompt {
1157            templates.push(prompt.clone());
1158        }
1159        if let Some(ref condition) = step_def.condition {
1160            templates.push(condition.clone());
1161        }
1162
1163        // Find all step names referenced in templates
1164        let mut deps: Vec<String> = Vec::new();
1165        for tmpl in &templates {
1166            for cap in pattern.captures_iter(tmpl) {
1167                let name = cap[1].to_string();
1168                if self.pending_futures.contains_key(&name) && !deps.contains(&name) {
1169                    deps.push(name);
1170                }
1171            }
1172        }
1173
1174        // Await each dependency
1175        for name in deps {
1176            self.await_pending_step(&name).await?;
1177        }
1178
1179        Ok(())
1180    }
1181
1182    /// Await a single named async step, storing its output in context.
1183    async fn await_pending_step(&mut self, name: &str) -> Result<(), StepError> {
1184        if let Some(handle) = self.pending_futures.remove(name) {
1185            match handle.await {
1186                Ok(Ok(output)) => {
1187                    self.context.store(name, output.clone());
1188                    self.step_records.push(StepRecord {
1189                        name: name.to_string(),
1190                        step_type: "async".to_string(),
1191                        status: "ok".to_string(),
1192                        duration_secs: 0.0,
1193                        output_summary: truncate(output.text(), 100),
1194                        input_tokens: None,
1195                        output_tokens: None,
1196                        cost_usd: None,
1197                        sandboxed: false,
1198                    });
1199                }
1200                Ok(Err(e)) => {
1201                    return Err(StepError::Fail(format!(
1202                        "Async step '{}' failed: {e}",
1203                        name
1204                    )));
1205                }
1206                Err(e) => {
1207                    return Err(StepError::Fail(format!(
1208                        "Async step '{}' panicked: {e}",
1209                        name
1210                    )));
1211                }
1212            }
1213        }
1214        Ok(())
1215    }
1216}
1217
1218/// Extract token stats from a step output (for JSON records)
1219fn token_stats(output: &StepOutput) -> (Option<u64>, Option<u64>, Option<f64>) {
1220    match output {
1221        StepOutput::Agent(o) => (
1222            Some(o.stats.input_tokens),
1223            Some(o.stats.output_tokens),
1224            Some(o.stats.cost_usd),
1225        ),
1226        StepOutput::Chat(o) => (Some(o.input_tokens), Some(o.output_tokens), None),
1227        _ => (None, None, None),
1228    }
1229}
1230
1231/// Parse the raw step output according to the step's declared output_type.
1232/// Returns the output unchanged if no output_type is declared or it is Text.
1233fn parse_step_output(output: StepOutput, step_def: &StepDef) -> Result<StepOutput, StepError> {
1234    let output_type = match &step_def.output_type {
1235        Some(t) => t,
1236        None => return Ok(output),
1237    };
1238
1239    if *output_type == OutputType::Text {
1240        return Ok(output);
1241    }
1242
1243    let text = output.text().trim().to_string();
1244
1245    match output_type {
1246        OutputType::Integer => {
1247            text.parse::<i64>()
1248                .map_err(|_| StepError::Fail(format!("Failed to parse '{}' as integer", text)))?;
1249        }
1250        OutputType::Json => {
1251            serde_json::from_str::<serde_json::Value>(&text)
1252                .map_err(|e| StepError::Fail(format!("Failed to parse output as JSON: {e}")))?;
1253        }
1254        OutputType::Boolean => match text.to_lowercase().as_str() {
1255            "true" | "1" | "yes" | "false" | "0" | "no" => {}
1256            _ => {
1257                return Err(StepError::Fail(format!(
1258                    "Failed to parse '{}' as boolean",
1259                    text
1260                )));
1261            }
1262        },
1263        OutputType::Lines | OutputType::Text => {}
1264    }
1265
1266    Ok(output)
1267}
1268
1269/// Extract a ParsedValue from the step output based on output_type.
1270/// Returns None if no output_type or it is Text.
1271fn extract_parsed_value(output: &StepOutput, step_def: &StepDef) -> Option<ParsedValue> {
1272    let output_type = step_def.output_type.as_ref()?;
1273
1274    let text = output.text().trim().to_string();
1275
1276    let parsed = match output_type {
1277        OutputType::Text => ParsedValue::Text(text),
1278        OutputType::Integer => ParsedValue::Integer(text.parse::<i64>().ok()?),
1279        OutputType::Json => {
1280            let val = serde_json::from_str::<serde_json::Value>(&text).ok()?;
1281            ParsedValue::Json(val)
1282        }
1283        OutputType::Lines => {
1284            let lines: Vec<String> = text
1285                .lines()
1286                .filter(|l| !l.is_empty())
1287                .map(|l| l.to_string())
1288                .collect();
1289            ParsedValue::Lines(lines)
1290        }
1291        OutputType::Boolean => {
1292            let b = matches!(text.to_lowercase().as_str(), "true" | "1" | "yes");
1293            ParsedValue::Boolean(b)
1294        }
1295    };
1296
1297    Some(parsed)
1298}
1299
1300/// Truncate a string to at most `max` chars, appending "…" if cut.
1301/// Uses char boundaries to avoid panicking on multi-byte UTF-8 (e.g. emojis).
1302fn truncate(s: &str, max: usize) -> String {
1303    let char_count = s.chars().count();
1304    if char_count <= max {
1305        s.to_string()
1306    } else {
1307        let end: usize = s.char_indices().nth(max).map(|(i, _)| i).unwrap_or(s.len());
1308        format!("{}…", &s[..end])
1309    }
1310}
1311
1312/// Detect the project stack from `prompts/registry.yaml` if it exists.
1313/// Returns `None` silently if there is no registry or detection fails.
1314async fn detect_stack_if_registry_exists() -> Option<StackInfo> {
1315    let registry_path = std::path::Path::new("prompts/registry.yaml");
1316    if !registry_path.exists() {
1317        return None;
1318    }
1319    match Registry::from_file(registry_path).await {
1320        Ok(registry) => {
1321            let workspace = std::path::Path::new(".");
1322            match StackDetector::detect(&registry, workspace).await {
1323                Ok(info) => {
1324                    tracing::info!(stack = %info.name, "Detected project stack");
1325                    Some(info)
1326                }
1327                Err(e) => {
1328                    tracing::debug!("Stack detection failed: {e}");
1329                    None
1330                }
1331            }
1332        }
1333        Err(e) => {
1334            tracing::warn!("Failed to parse prompts/registry.yaml: {e}");
1335            None
1336        }
1337    }
1338}
1339
1340#[cfg(test)]
1341mod tests {
1342    use super::*;
1343    use crate::workflow::parser;
1344
1345    /// Global mutex to serialize tests that change the process working directory.
1346    static CWD_LOCK: std::sync::LazyLock<std::sync::Mutex<()>> =
1347        std::sync::LazyLock::new(|| std::sync::Mutex::new(()));
1348
1349    #[tokio::test]
1350    async fn engine_runs_sequential_cmd_steps() {
1351        let yaml = r#"
1352name: test
1353steps:
1354  - name: step1
1355    type: cmd
1356    run: "echo first"
1357  - name: step2
1358    type: cmd
1359    run: "echo second"
1360"#;
1361        let wf = parser::parse_str(yaml).unwrap();
1362        let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1363        let result = engine.run().await.unwrap();
1364        assert_eq!(result.text().trim(), "second");
1365        assert!(engine.context.get_step("step1").is_some());
1366        assert_eq!(
1367            engine.context.get_step("step1").unwrap().text().trim(),
1368            "first"
1369        );
1370    }
1371
1372    #[tokio::test]
1373    async fn engine_exposes_step_output_to_next_step() {
1374        let yaml = r#"
1375name: test
1376steps:
1377  - name: produce
1378    type: cmd
1379    run: "echo hello_world"
1380  - name: consume
1381    type: cmd
1382    run: "echo {{ steps.produce.stdout }}"
1383"#;
1384        let wf = parser::parse_str(yaml).unwrap();
1385        let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1386        let result = engine.run().await.unwrap();
1387        assert!(result.text().contains("hello_world"));
1388    }
1389
1390    #[tokio::test]
1391    async fn engine_collects_step_records_in_json_mode() {
1392        let yaml = r#"
1393name: json-test
1394steps:
1395  - name: alpha
1396    type: cmd
1397    run: "echo alpha"
1398  - name: beta
1399    type: cmd
1400    run: "echo beta"
1401"#;
1402        let wf = parser::parse_str(yaml).unwrap();
1403        let opts = EngineOptions {
1404            json: true,
1405            ..Default::default()
1406        };
1407        let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts).await;
1408        engine.run().await.unwrap();
1409
1410        let records = engine.step_records();
1411        assert_eq!(records.len(), 2);
1412        assert_eq!(records[0].name, "alpha");
1413        assert_eq!(records[0].status, "ok");
1414        assert!(!records[0].sandboxed);
1415        assert_eq!(records[1].name, "beta");
1416        assert_eq!(records[1].status, "ok");
1417    }
1418
1419    #[tokio::test]
1420    async fn json_output_includes_sandbox_mode() {
1421        let yaml = r#"
1422name: json-output-test
1423steps:
1424  - name: greet
1425    type: cmd
1426    run: "echo hello"
1427"#;
1428        let wf = parser::parse_str(yaml).unwrap();
1429        let opts = EngineOptions {
1430            json: true,
1431            ..Default::default()
1432        };
1433        let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts).await;
1434        let start = Instant::now();
1435        engine.run().await.unwrap();
1436        let out = engine.json_output("success", start.elapsed());
1437
1438        let json = serde_json::to_string(&out).unwrap();
1439        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1440
1441        assert_eq!(parsed["workflow_name"], "json-output-test");
1442        assert_eq!(parsed["status"], "success");
1443        assert_eq!(parsed["sandbox_mode"], "Disabled");
1444        assert!(parsed["steps"].is_array());
1445        assert_eq!(parsed["steps"][0]["name"], "greet");
1446    }
1447
1448    #[tokio::test]
1449    async fn should_sandbox_step_logic() {
1450        let yaml = r#"
1451name: test
1452steps:
1453  - name: s
1454    type: cmd
1455    run: "echo test"
1456"#;
1457        let wf = parser::parse_str(yaml).unwrap();
1458
1459        // Disabled mode → nothing sandboxed
1460        let engine = Engine::new(wf.clone(), "".to_string(), HashMap::new(), false, true).await;
1461        assert!(!engine.should_sandbox_step(&StepType::Cmd));
1462        assert!(!engine.should_sandbox_step(&StepType::Agent));
1463        assert!(!engine.should_sandbox_step(&StepType::Gate));
1464
1465        // FullWorkflow mode → cmd + agent sandboxed
1466        let opts = EngineOptions {
1467            sandbox_mode: SandboxMode::FullWorkflow,
1468            quiet: true,
1469            ..Default::default()
1470        };
1471        let engine = Engine::with_options(wf.clone(), "".to_string(), HashMap::new(), opts).await;
1472        assert!(engine.should_sandbox_step(&StepType::Cmd));
1473        assert!(engine.should_sandbox_step(&StepType::Agent));
1474        assert!(!engine.should_sandbox_step(&StepType::Gate));
1475
1476        // AgentOnly mode → only agent sandboxed
1477        let opts = EngineOptions {
1478            sandbox_mode: SandboxMode::AgentOnly,
1479            quiet: true,
1480            ..Default::default()
1481        };
1482        let engine = Engine::with_options(wf.clone(), "".to_string(), HashMap::new(), opts).await;
1483        assert!(!engine.should_sandbox_step(&StepType::Cmd));
1484        assert!(engine.should_sandbox_step(&StepType::Agent));
1485        assert!(!engine.should_sandbox_step(&StepType::Gate));
1486    }
1487
1488    #[tokio::test]
1489    async fn dry_run_does_not_panic() {
1490        let yaml = r#"
1491name: dry-run-test
1492scopes:
1493  lint_fix:
1494    steps:
1495      - name: lint
1496        type: cmd
1497        run: "npm run lint"
1498      - name: fix_lint
1499        type: agent
1500        prompt: "Fix lint errors"
1501steps:
1502  - name: setup
1503    type: cmd
1504    run: "echo setup"
1505  - name: validate
1506    type: gate
1507    condition: "{{ steps.setup.exit_code == 0 }}"
1508    on_pass: continue
1509    on_fail: fail
1510  - name: lint_gate
1511    type: repeat
1512    scope: lint_fix
1513    max_iterations: 3
1514"#;
1515        let wf = crate::workflow::parser::parse_str(yaml).unwrap();
1516        let engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1517        engine.dry_run();
1518    }
1519
1520    #[tokio::test]
1521    async fn dry_run_all_step_types() {
1522        let yaml = r#"
1523name: all-types
1524steps:
1525  - name: c
1526    type: cmd
1527    run: "ls"
1528  - name: g
1529    type: gate
1530    condition: "{{ true }}"
1531    on_pass: continue
1532  - name: p
1533    type: parallel
1534    steps:
1535      - name: p1
1536        type: cmd
1537        run: "echo p1"
1538"#;
1539        let wf = crate::workflow::parser::parse_str(yaml).unwrap();
1540        let engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1541        engine.dry_run();
1542    }
1543
1544    #[test]
1545    fn truncate_helper() {
1546        assert_eq!(truncate("hello", 10), "hello");
1547        assert_eq!(truncate("hello world", 5), "hello…");
1548    }
1549
1550    #[tokio::test]
1551    async fn resume_fails_when_no_state_file() {
1552        let yaml = r#"
1553name: no-state-workflow-xyz-unique
1554steps:
1555  - name: step1
1556    type: cmd
1557    run: "echo 1"
1558"#;
1559        let wf = crate::workflow::parser::parse_str(yaml).unwrap();
1560        let opts = EngineOptions {
1561            resume_from: Some("step1".to_string()),
1562            quiet: true,
1563            ..Default::default()
1564        };
1565        let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts).await;
1566        let err = engine.run().await.unwrap_err();
1567        assert!(
1568            err.to_string().contains("No state file found"),
1569            "Expected 'No state file found' but got: {err}"
1570        );
1571    }
1572
1573    #[tokio::test]
1574    async fn resume_fails_for_unknown_step() {
1575        let workflow_name = "test-resume-unknown-step";
1576        let state = WorkflowState::new(workflow_name);
1577        let tmp_path = format!("/tmp/minion-{workflow_name}-20991231235959.state.json");
1578        let path = PathBuf::from(&tmp_path);
1579        state.save(&path).unwrap();
1580
1581        let yaml = format!(
1582            r#"
1583name: {workflow_name}
1584steps:
1585  - name: step1
1586    type: cmd
1587    run: "echo 1"
1588"#
1589        );
1590        let wf = crate::workflow::parser::parse_str(&yaml).unwrap();
1591        let opts = EngineOptions {
1592            resume_from: Some("nonexistent_step".to_string()),
1593            quiet: true,
1594            ..Default::default()
1595        };
1596        let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts).await;
1597        let err = engine.run().await.unwrap_err();
1598        assert!(
1599            err.to_string().contains("not found in workflow"),
1600            "Expected 'not found in workflow' but got: {err}"
1601        );
1602
1603        let _ = std::fs::remove_file(&path);
1604    }
1605
1606    #[tokio::test]
1607    async fn safe_accessor_returns_empty_for_missing_step() {
1608        let yaml = r#"
1609name: test-safe-accessor
1610steps:
1611  - name: use_missing
1612    type: cmd
1613    run: "echo '{{ missing.output? }}'"
1614"#;
1615        let wf = parser::parse_str(yaml).unwrap();
1616        let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1617        let result = engine.run().await.unwrap();
1618        // safe accessor returns empty string when step doesn't exist
1619        assert_eq!(result.text().trim(), "");
1620    }
1621
1622    #[tokio::test]
1623    async fn safe_accessor_returns_value_when_present() {
1624        let yaml = r#"
1625name: test-safe-accessor-present
1626steps:
1627  - name: produce
1628    type: cmd
1629    run: "echo hello"
1630  - name: consume
1631    type: cmd
1632    run: "echo '{{ produce.output? }}'"
1633"#;
1634        let wf = parser::parse_str(yaml).unwrap();
1635        let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1636        let result = engine.run().await.unwrap();
1637        assert!(result.text().contains("hello"));
1638    }
1639
1640    #[tokio::test]
1641    async fn strict_accessor_fails_when_step_missing() {
1642        let yaml = r#"
1643name: test-strict-accessor-fail
1644steps:
1645  - name: use_missing
1646    type: cmd
1647    run: "echo '{{ nonexistent.output! }}'"
1648"#;
1649        let wf = parser::parse_str(yaml).unwrap();
1650        let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1651        let err = engine.run().await.unwrap_err();
1652        assert!(err.to_string().contains("strict access"), "{err}");
1653    }
1654
1655    #[tokio::test]
1656    async fn output_type_integer_parses_number() {
1657        let yaml = r#"
1658name: test-parse
1659steps:
1660  - name: count
1661    type: cmd
1662    run: "echo 42"
1663    output_type: integer
1664  - name: use_count
1665    type: cmd
1666    run: "echo {{ count.output }}"
1667"#;
1668        let wf = parser::parse_str(yaml).unwrap();
1669        let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1670        let result = engine.run().await.unwrap();
1671        assert_eq!(result.text().trim(), "42");
1672    }
1673
1674    #[tokio::test]
1675    async fn output_type_integer_fails_on_non_number() {
1676        let yaml = r#"
1677name: test-parse-fail
1678steps:
1679  - name: count
1680    type: cmd
1681    run: "echo not_a_number"
1682    output_type: integer
1683"#;
1684        let wf = parser::parse_str(yaml).unwrap();
1685        let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1686        let err = engine.run().await.unwrap_err();
1687        assert!(err.to_string().contains("integer"), "{err}");
1688    }
1689
1690    #[tokio::test]
1691    async fn output_type_json_allows_dot_access() {
1692        let yaml = r#"
1693name: test-json
1694steps:
1695  - name: scan
1696    type: cmd
1697    run: "echo '{\"count\": 5}'"
1698    output_type: json
1699  - name: use_scan
1700    type: cmd
1701    run: "echo {{ scan.output.count }}"
1702"#;
1703        let wf = parser::parse_str(yaml).unwrap();
1704        let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1705        let result = engine.run().await.unwrap();
1706        assert_eq!(result.text().trim(), "5");
1707    }
1708
1709    #[tokio::test]
1710    async fn output_type_lines_allows_length_filter() {
1711        let yaml = r#"
1712name: test-lines
1713steps:
1714  - name: files
1715    type: cmd
1716    run: "printf 'a.rs\nb.rs\nc.rs'"
1717    output_type: lines
1718  - name: count_files
1719    type: cmd
1720    run: "echo {{ files.output | length }}"
1721"#;
1722        let wf = parser::parse_str(yaml).unwrap();
1723        let mut engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1724        let result = engine.run().await.unwrap();
1725        assert_eq!(result.text().trim(), "3");
1726    }
1727
1728    // ── Story 3.1: Async flag and pending futures ────────────────────────────
1729
1730    #[tokio::test]
1731    async fn async_step_is_spawned_and_completes() {
1732        let yaml = r#"
1733name: async-test
1734steps:
1735  - name: bg_task
1736    type: cmd
1737    run: "echo async_result"
1738    async_exec: true
1739  - name: sync_step
1740    type: cmd
1741    run: "echo sync_result"
1742"#;
1743        let wf = parser::parse_str(yaml).unwrap();
1744        let opts = EngineOptions {
1745            quiet: true,
1746            ..Default::default()
1747        };
1748        let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts).await;
1749        let result = engine.run().await.unwrap();
1750        // sync_step is the last synchronous step
1751        assert!(result.text().contains("sync_result"));
1752        // bg_task should be recorded after join_all
1753        let records = engine.step_records();
1754        assert!(
1755            records.iter().any(|r| r.name == "bg_task"),
1756            "bg_task should be in records"
1757        );
1758    }
1759
1760    #[tokio::test]
1761    async fn dry_run_shows_async_lightning_indicator() {
1762        let yaml = r#"
1763name: dry-async
1764steps:
1765  - name: fast_bg
1766    type: cmd
1767    run: "echo bg"
1768    async_exec: true
1769  - name: normal
1770    type: cmd
1771    run: "echo normal"
1772"#;
1773        // dry_run should not panic and the async step should have ⚡ in output
1774        let wf = parser::parse_str(yaml).unwrap();
1775        let engine = Engine::new(wf, "".to_string(), HashMap::new(), false, true).await;
1776        // Just verify it doesn't panic
1777        engine.dry_run();
1778    }
1779
1780    #[tokio::test]
1781    async fn should_sandbox_step_script_always_false() {
1782        let yaml = r#"
1783name: test
1784steps:
1785  - name: s
1786    type: cmd
1787    run: "echo test"
1788"#;
1789        let wf = parser::parse_str(yaml).unwrap();
1790
1791        // Script steps never run in sandbox, regardless of mode
1792        let opts = EngineOptions {
1793            sandbox_mode: SandboxMode::FullWorkflow,
1794            quiet: true,
1795            ..Default::default()
1796        };
1797        let engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts).await;
1798        assert!(!engine.should_sandbox_step(&StepType::Script));
1799    }
1800
1801    // ── Story 3.3: Await all remaining async futures at workflow end ─────────
1802
1803    #[tokio::test]
1804    async fn multiple_async_steps_all_complete_by_workflow_end() {
1805        let yaml = r#"
1806name: multi-async
1807steps:
1808  - name: task_a
1809    type: cmd
1810    run: "echo result_a"
1811    async_exec: true
1812  - name: task_b
1813    type: cmd
1814    run: "echo result_b"
1815    async_exec: true
1816  - name: sync_done
1817    type: cmd
1818    run: "echo done"
1819"#;
1820        let wf = parser::parse_str(yaml).unwrap();
1821        let opts = EngineOptions {
1822            quiet: true,
1823            ..Default::default()
1824        };
1825        let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts).await;
1826        engine.run().await.unwrap();
1827
1828        let records = engine.step_records();
1829        assert!(
1830            records.iter().any(|r| r.name == "task_a"),
1831            "task_a should be recorded"
1832        );
1833        assert!(
1834            records.iter().any(|r| r.name == "task_b"),
1835            "task_b should be recorded"
1836        );
1837        assert!(
1838            records.iter().any(|r| r.name == "sync_done"),
1839            "sync_done should be recorded"
1840        );
1841    }
1842
1843    // ── Story 4.3: Script step dispatch ─────────────────────────────────────
1844
1845    #[tokio::test]
1846    async fn engine_dispatches_script_step() {
1847        let yaml = r#"
1848name: script-dispatch
1849steps:
1850  - name: calc
1851    type: script
1852    run: |
1853      let x = 6 * 7;
1854      x.to_string()
1855"#;
1856        let wf = parser::parse_str(yaml).unwrap();
1857        let opts = EngineOptions {
1858            quiet: true,
1859            ..Default::default()
1860        };
1861        let mut engine = Engine::with_options(wf, "".to_string(), HashMap::new(), opts).await;
1862        let result = engine.run().await.unwrap();
1863        assert_eq!(result.text().trim(), "42");
1864    }
1865
1866    #[tokio::test]
1867    #[allow(clippy::await_holding_lock)]
1868    async fn stack_context_injected_when_registry_exists() {
1869        let _lock = CWD_LOCK.lock().unwrap_or_else(|e| e.into_inner());
1870
1871        let dir = tempfile::tempdir().unwrap();
1872        let orig_dir = std::env::current_dir().unwrap();
1873
1874        // Create prompts/registry.yaml and a marker file
1875        let prompts_dir = dir.path().join("prompts");
1876        std::fs::create_dir_all(&prompts_dir).unwrap();
1877        let registry_yaml = r#"
1878version: 1
1879detection_order:
1880  - rust
1881stacks:
1882  rust:
1883    file_markers:
1884      - Cargo.toml
1885    tools:
1886      lint: "cargo clippy"
1887      test: "cargo test"
1888      build: "cargo build"
1889      install: "cargo fetch"
1890"#;
1891        std::fs::write(prompts_dir.join("registry.yaml"), registry_yaml).unwrap();
1892        // Create the Cargo.toml marker
1893        std::fs::write(dir.path().join("Cargo.toml"), "[package]").unwrap();
1894
1895        std::env::set_current_dir(dir.path()).unwrap();
1896
1897        let yaml = "name: test\nsteps:\n  - name: s\n    type: cmd\n    run: \"echo hi\"\n";
1898        let wf = parser::parse_str(yaml).unwrap();
1899        let engine = Engine::with_options(
1900            wf,
1901            "target".to_string(),
1902            HashMap::new(),
1903            EngineOptions {
1904                quiet: true,
1905                ..Default::default()
1906            },
1907        )
1908        .await;
1909
1910        let result = engine.context.render_template("{{ stack.name }}").unwrap();
1911        assert_eq!(result, "rust");
1912
1913        let lint = engine
1914            .context
1915            .render_template("{{ stack.tools.lint }}")
1916            .unwrap();
1917        assert_eq!(lint, "cargo clippy");
1918
1919        assert!(engine.stack_info.is_some());
1920        assert_eq!(engine.stack_info.as_ref().unwrap().name, "rust");
1921
1922        std::env::set_current_dir(orig_dir).unwrap();
1923    }
1924
1925    #[tokio::test]
1926    #[allow(clippy::await_holding_lock)]
1927    async fn stack_context_skipped_when_no_registry() {
1928        let _lock = CWD_LOCK.lock().unwrap_or_else(|e| e.into_inner());
1929
1930        let yaml = "name: test\nsteps:\n  - name: s\n    type: cmd\n    run: \"echo hi\"\n";
1931        let wf = parser::parse_str(yaml).unwrap();
1932        // Run from a tempdir without any registry.yaml
1933        let dir = tempfile::tempdir().unwrap();
1934        let orig_dir = std::env::current_dir().unwrap();
1935        std::env::set_current_dir(dir.path()).unwrap();
1936
1937        let engine = Engine::with_options(
1938            wf,
1939            "target".to_string(),
1940            HashMap::new(),
1941            EngineOptions {
1942                quiet: true,
1943                ..Default::default()
1944            },
1945        )
1946        .await;
1947        // Should not crash, stack_info is None
1948        assert!(engine.stack_info.is_none());
1949        // stack variable not in context
1950        assert!(engine.context.get_var("stack").is_none());
1951
1952        std::env::set_current_dir(orig_dir).unwrap();
1953    }
1954}