Skip to main content

nika_engine/runtime/
runner.rs

1//! DAG Runner - workflow execution with tokio
2//!
3//! Performance optimizations:
4//! - Arc for zero-cost task/context sharing
5//! - JoinSet for efficient parallel task collection
6//! - Tokio handles all concurrency (no artificial limits)
7
8use indexmap::IndexMap;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Arc;
11use std::time::Instant;
12
13use colored::Colorize;
14use serde_json::Value;
15use tokio::sync::{Notify, Semaphore};
16use tokio::task::JoinSet;
17use tokio_util::sync::CancellationToken;
18use tracing::{debug, info, instrument};
19
20use crate::ast::analyzed::{
21    AnalyzedOutput, AnalyzedTask, AnalyzedTaskAction, AnalyzedWorkflow,
22    OutputFormat as AnalyzedOutputFormat,
23};
24use crate::ast::lower::{lower_action, lower_mcp_servers, lower_output};
25use crate::ast::output::OutputPolicy;
26use crate::ast::{InferParams, TaskAction};
27use crate::binding::ResolvedBindings;
28use crate::dag::Dag;
29use crate::error::NikaError;
30use crate::event::{prune_traces, EventKind, EventLog, TraceWriter};
31use crate::runtime::boot::TraceConfig;
32use crate::store::{RunContext, TaskResult};
33use crate::util::{intern, DECOMPOSE_TIMEOUT};
34
35use super::artifact_processor::process_task_artifacts;
36use super::context_loader::load_context_analyzed;
37use super::executor::TaskExecutor;
38use super::output::{extract_json, format_validation_errors, make_task_result};
39use super::resolver::{resolve_assets_analyzed, ResolvedAssets};
40use super::structured_output::StructuredOutputEngine;
41
42use crate::ast::artifact::ArtifactsConfig;
43use std::path::PathBuf;
44
45// ═══════════════════════════════════════════════════════════════════════════════
46// RAII Lockfile Guard
47// ═══════════════════════════════════════════════════════════════════════════════
48
49/// RAII guard that removes a lockfile on drop.
50///
51/// The media store lockfile (`.nika-run.lock`) prevents `nika media clean` from
52/// garbage-collecting blobs that are still in use by a running workflow. Without
53/// this guard, any early return via `?`, `return Err(...)`, or panic would leave
54/// a stale lockfile that permanently blocks GC until manually deleted.
55///
56/// The guard writes the lockfile on creation and removes it when dropped,
57/// covering **all** exit paths: normal completion, error propagation, and
58/// unwinding panics.
59struct LockfileGuard {
60    path: PathBuf,
61}
62
63impl LockfileGuard {
64    /// Create the lockfile and return a guard that will remove it on drop.
65    ///
66    /// Logs warnings on failure but does not block execution — lockfile is
67    /// best-effort protection against concurrent runs, not a hard requirement.
68    ///
69    /// NOTE: With `panic = "abort"` (release profile), panics bypass Drop.
70    /// The lockfile includes a timestamp — callers should treat locks older
71    /// than 10 minutes as stale.
72    fn create(path: PathBuf) -> Self {
73        // Check for stale lockfile (>10min = likely from panic=abort crash)
74        if path.exists() {
75            if let Ok(metadata) = path.metadata() {
76                if let Ok(modified) = metadata.modified() {
77                    if modified.elapsed().unwrap_or_default() > std::time::Duration::from_secs(600)
78                    {
79                        tracing::warn!("Removing stale lockfile (>10min old)");
80                        let _ = std::fs::remove_file(&path);
81                    }
82                }
83            }
84        }
85
86        if let Some(parent) = path.parent() {
87            if let Err(e) = std::fs::create_dir_all(parent) {
88                tracing::warn!(path = %parent.display(), error = %e, "Failed to create lockfile directory");
89            }
90        }
91        if let Err(e) = std::fs::write(&path, format!("pid:{}", std::process::id())) {
92            tracing::warn!(path = %path.display(), error = %e, "Failed to write lockfile — concurrent runs may conflict");
93        }
94        Self { path }
95    }
96}
97
98impl Drop for LockfileGuard {
99    fn drop(&mut self) {
100        let _ = std::fs::remove_file(&self.path);
101    }
102}
103
104// ═══════════════════════════════════════════════════════════════════════════════
105// Helper Functions
106// ═══════════════════════════════════════════════════════════════════════════════
107
108/// Try to extract an array from a Value, parsing JSON strings if needed.
109///
110/// This function handles the case where a task output is a JSON array stored as a
111/// string (e.g., from `exec: 'echo ''["a","b","c"]'''`). The for_each resolution
112/// needs to iterate over arrays, but task outputs are stored as strings.
113///
114/// # Returns
115/// - `Some(Vec<Value>)` if the value is an array or a parseable JSON array string
116/// - `None` if the value cannot be converted to an array
117fn value_to_array(value: &Value) -> Option<Vec<Value>> {
118    // Fast path: direct array access
119    if let Some(arr) = value.as_array() {
120        return Some(arr.clone());
121    }
122
123    // String → try extract_json (handles markdown fences, bare JSON, brackets)
124    if let Some(s) = value.as_str() {
125        if let Ok(extracted) = extract_json(s) {
126            if let Some(arr) = extracted.as_array() {
127                return Some(arr.clone());
128            }
129        }
130    }
131
132    None
133}
134
135/// Result of executing a task iteration
136/// For for_each tasks, includes the iteration index for ordered aggregation
137struct IterationResult {
138    /// ID used for storage (task_id for regular, indexed for for_each)
139    store_id: Arc<str>,
140    /// The actual task result
141    result: TaskResult,
142    /// For for_each: (parent_id, index) to enable aggregation
143    for_each_info: Option<(Arc<str>, usize)>,
144    /// Paths of artifacts written during this task (for CLI reporting)
145    #[allow(dead_code)]
146    artifact_paths: Vec<PathBuf>,
147}
148
149/// DAG workflow runner with event sourcing
150///
151/// Consumes `AnalyzedWorkflow` directly from the analyzer.
152/// Bridge conversions (`lower_action`, `lower_output`) happen at the
153/// `TaskExecutor` boundary only.
154pub struct Runner {
155    workflow: AnalyzedWorkflow,
156    flow_graph: Dag,
157    datastore: RunContext,
158    executor: TaskExecutor,
159    event_log: EventLog,
160    /// Unique identifier for this workflow execution (for trace files)
161    generation_id: String,
162    /// Suppress console output (for TUI mode)
163    quiet: bool,
164    /// Cancellation token for aborting workflow
165    cancel_token: CancellationToken,
166    /// Pause state - when true, runner waits between layers
167    paused: Arc<AtomicBool>,
168    /// Notify to wake runner from pause
169    resume_notify: Arc<Notify>,
170    /// Resolved agents and skills
171    resolved_assets: ResolvedAssets,
172    /// Trace retention config (max_traces + retention_days)
173    trace_config: TraceConfig,
174    /// CLI event stream renderer (None when quiet or TUI mode).
175    /// Uses `RunRenderer` to auto-select Live (animated) vs Classic (append-only).
176    cli_renderer: Option<crate::display::RunRenderer>,
177}
178
179impl Runner {
180    pub fn new(workflow: AnalyzedWorkflow) -> Result<Self, NikaError> {
181        Self::with_event_log(workflow, EventLog::new())
182    }
183
184    /// Create a Runner with a custom EventLog (for TUI integration)
185    ///
186    /// Use `EventLog::new_with_broadcast()` to create an EventLog that
187    /// sends events to TUI in real-time.
188    ///
189    /// # Errors
190    ///
191    /// Returns `NikaError::ValidationError` if DAG construction fails
192    /// (e.g. the workflow contains cycles or invalid dependencies).
193    pub fn with_event_log(
194        workflow: AnalyzedWorkflow,
195        event_log: EventLog,
196    ) -> Result<Self, NikaError> {
197        let flow_graph = Dag::from_analyzed(&workflow).map_err(|e| NikaError::ValidationError {
198            reason: format!("DAG construction failed: {e}"),
199        })?;
200        flow_graph.detect_cycles()?;
201        let datastore = RunContext::new();
202
203        // Bridge MCP servers to old FxHashMap<String, McpConfigInline> for TaskExecutor
204        let mcp_configs = lower_mcp_servers(workflow.mcp_servers.clone());
205        let provider = workflow.provider.as_deref().unwrap_or("claude");
206
207        let executor = TaskExecutor::new(
208            provider,
209            workflow.model.as_deref(),
210            mcp_configs,
211            event_log.clone(),
212        )?;
213
214        // Generate unique ID for this execution (used for trace files)
215        let generation_id = format!("gen-{}", uuid::Uuid::new_v4());
216
217        Ok(Self {
218            workflow,
219            flow_graph,
220            datastore,
221            executor,
222            event_log,
223            generation_id,
224            quiet: false,
225            cancel_token: CancellationToken::new(),
226            paused: Arc::new(AtomicBool::new(false)),
227            resume_notify: Arc::new(Notify::new()),
228            resolved_assets: ResolvedAssets::default(),
229            trace_config: TraceConfig::default(),
230            cli_renderer: None,
231        })
232    }
233
234    /// Enable quiet mode to suppress console output (for TUI mode)
235    ///
236    /// When quiet is true, Runner will not print to stdout/stderr.
237    /// All events are still emitted to the EventLog for TUI display.
238    pub fn quiet(mut self) -> Self {
239        self.quiet = true;
240        self
241    }
242
243    /// Set the CLI detail level for event rendering.
244    ///
245    /// Automatically selects Live (animated) or Classic (append-only)
246    /// renderer based on TTY detection and detail level.
247    pub fn with_detail_level(mut self, detail: crate::display::DetailLevel) -> Self {
248        let effective_detail = if self.quiet {
249            crate::display::DetailLevel::Min
250        } else {
251            detail
252        };
253        self.cli_renderer = Some(crate::display::RunRenderer::auto(effective_detail));
254        self
255    }
256
257    /// Force the classic (append-only) renderer regardless of TTY.
258    pub fn with_classic_renderer(mut self, detail: crate::display::DetailLevel) -> Self {
259        let effective_detail = if self.quiet {
260            crate::display::DetailLevel::Min
261        } else {
262            detail
263        };
264        self.cli_renderer = Some(crate::display::RunRenderer::classic(effective_detail));
265        self
266    }
267
268    /// Get a reference to the run context (task results store).
269    ///
270    /// Available after `run()` completes to collect task outputs for `-o/--output`.
271    pub fn datastore(&self) -> &RunContext {
272        &self.datastore
273    }
274
275    /// Inject initial context into the datastore
276    ///
277    /// Used by nika_run to pass parent context to child workflows.
278    /// The context is stored as a successful task result under the given key,
279    /// making it accessible via `with: alias: <key>.result` in the child workflow.
280    ///
281    /// # Example
282    ///
283    /// ```text
284    /// // In parent workflow via nika_run:
285    /// // context: { "entity": "qr-code", "locale": "fr-FR" }
286    ///
287    /// // Child workflow can access via:
288    /// // with:
289    /// //   parent: __parent_context__.result
290    /// ```
291    pub fn with_initial_context(self, key: &str, context: Value) -> Self {
292        use crate::store::TaskResult;
293        use crate::util::intern;
294
295        self.datastore.insert(
296            intern(key),
297            TaskResult::success(context, std::time::Duration::ZERO),
298        );
299        self
300    }
301
302    /// Set the permission mode for file tools (nika:write, nika:edit, etc.)
303    ///
304    /// By default, `PermissionMode::Plan` is used (deny writes, emit permission request).
305    /// For `nika run`, use `AcceptAll` since the user explicitly chose to run.
306    pub fn with_permission_mode(self, mode: crate::tools::PermissionMode) -> Self {
307        self.executor.set_permission_mode(mode);
308        self
309    }
310
311    /// Set a custom cancellation token
312    ///
313    /// This allows external control of workflow cancellation.
314    /// The TUI can hold a clone of the token and call `cancel()` on it.
315    /// Also propagated to TaskExecutor so MCP invoke operations
316    /// abort promptly instead of waiting for INVOKE_TASK_DEADLINE.
317    pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
318        self.executor = self.executor.with_cancel_token(token.clone());
319        self.cancel_token = token;
320        self
321    }
322
323    /// Get a clone of the cancellation token
324    ///
325    /// The TUI can use this to abort the workflow by calling `cancel()`.
326    pub fn cancel_token(&self) -> CancellationToken {
327        self.cancel_token.clone()
328    }
329
330    /// Check if the workflow has been cancelled
331    pub fn is_cancelled(&self) -> bool {
332        self.cancel_token.is_cancelled()
333    }
334
335    /// Pause workflow execution
336    ///
337    /// When paused, the runner will complete current tasks but won't start new ones.
338    /// Use `resume()` to continue execution.
339    pub fn pause(&self) {
340        self.paused.store(true, Ordering::SeqCst);
341        self.event_log.emit(EventKind::WorkflowPaused);
342    }
343
344    /// Resume workflow execution after pause
345    pub fn resume(&self) {
346        self.paused.store(false, Ordering::SeqCst);
347        self.resume_notify.notify_one();
348        self.event_log.emit(EventKind::WorkflowResumed);
349    }
350
351    /// Check if the workflow is paused
352    pub fn is_paused(&self) -> bool {
353        self.paused.load(Ordering::SeqCst)
354    }
355
356    /// Get cloneable handles for external pause/resume control
357    ///
358    /// Returns (paused_flag, resume_notify) that can be used by the TUI
359    /// to control pause state externally.
360    pub fn pause_handles(&self) -> (Arc<AtomicBool>, Arc<Notify>) {
361        (Arc::clone(&self.paused), Arc::clone(&self.resume_notify))
362    }
363
364    /// Get the event log for inspection/export
365    pub fn event_log(&self) -> &EventLog {
366        &self.event_log
367    }
368
369    /// Get tasks that are ready to run (all dependencies satisfied)
370    ///
371    /// Also detects and marks tasks whose dependencies have failed.
372    /// These tasks are marked as DependencyFailed and stored in the datastore.
373    fn get_ready_tasks(&self) -> Vec<&AnalyzedTask> {
374        self.workflow
375            .tasks
376            .iter()
377            .filter(|task| {
378                // Skip if already done
379                if self.datastore.contains(&task.name) {
380                    return false;
381                }
382
383                // Check all dependencies
384                let deps = self.flow_graph.get_dependencies(&task.name);
385                for dep in deps.iter() {
386                    // Check if dependency has completed
387                    if let Some(succeeded) = self.datastore.is_completed_successfully(dep.as_ref())
388                    {
389                        // If dependency failed, mark this task as DependencyFailed
390                        if !succeeded {
391                            // Store DependencyFailed result for this task
392                            self.datastore.insert(
393                                intern(&task.name),
394                                TaskResult::dependency_failed(dep.as_ref()),
395                            );
396
397                            // Emit event for observability
398                            self.event_log.emit(EventKind::TaskSkipped {
399                                task_id: Arc::from(task.name.as_str()),
400                                reason: format!("dependency '{}' failed", dep.as_ref()),
401                            });
402
403                            debug!(
404                                task_id = %task.name,
405                                dependency = %dep.as_ref(),
406                                "Task blocked due to failed dependency"
407                            );
408
409                            return false;
410                        }
411                    } else {
412                        // Dependency hasn't completed yet - task not ready
413                        return false;
414                    }
415                }
416
417                // All dependencies succeeded - task is ready
418                true
419            })
420            .collect()
421    }
422
423    /// Check if all tasks are done (completed, failed, or blocked by dependency failure)
424    fn all_done(&self) -> bool {
425        self.workflow
426            .tasks
427            .iter()
428            .all(|t| self.datastore.contains(&t.name))
429    }
430
431    /// Get tasks that are blocked waiting for incomplete dependencies (not failed)
432    ///
433    /// Used to distinguish actual deadlocks from dependency failures.
434    fn get_pending_tasks(&self) -> Vec<String> {
435        self.workflow
436            .tasks
437            .iter()
438            .filter(|task| !self.datastore.contains(&task.name))
439            .map(|t| t.name.clone())
440            .collect()
441    }
442
443    /// Get the first failed task in the workflow (for error reporting)
444    fn find_root_failure(&self) -> Option<String> {
445        for task in &self.workflow.tasks {
446            if let Some(result) = self.datastore.get(&task.name) {
447                // Only consider actual failures, not dependency failures
448                if matches!(result.status, crate::store::TaskOutcome::Failed(_)) {
449                    return Some(task.name.clone());
450                }
451            }
452        }
453        None
454    }
455
456    /// Get the final output (from tasks with no successors)
457    ///
458    /// Uses `get_deepest_final_task()` to select the terminal task with the
459    /// highest topological depth. This ensures branching DAGs return the correct
460    /// output (e.g., "final" task, not "branch_a").
461    fn get_final_output(&self) -> Option<String> {
462        // Use deepest terminal task instead of arbitrary selection
463        if let Some(deepest_task) = self.flow_graph.get_deepest_final_task() {
464            if let Some(result) = self.datastore.get(deepest_task.as_ref()) {
465                if result.is_success() {
466                    return Some(result.output_str().into_owned());
467                }
468            }
469        }
470
471        // Fallback: Try any successful final task
472        let final_tasks = self.flow_graph.get_final_tasks();
473        for task_id in final_tasks {
474            if let Some(result) = self.datastore.get(&task_id) {
475                if result.is_success() {
476                    return Some(result.output_str().into_owned());
477                }
478            }
479        }
480        None
481    }
482
483    /// Write execution trace to .nika/traces/ (called on ALL exit paths).
484    ///
485    /// Traces are written for WorkflowCompleted, WorkflowFailed, and WorkflowAborted.
486    /// After writing, prunes old traces based on `trace_config` (max_traces + retention_days).
487    fn write_trace(&self) -> Option<String> {
488        let trace_path = match TraceWriter::new(&self.generation_id) {
489            Ok(trace_writer) => {
490                if let Err(e) = trace_writer.write_all(&self.event_log) {
491                    tracing::warn!(error = %e, "Failed to write trace");
492                    None
493                } else {
494                    let path = trace_writer.path().display().to_string();
495                    tracing::info!(path = %path, "Trace written");
496                    Some(path)
497                }
498            }
499            Err(e) => {
500                tracing::warn!(error = %e, "Failed to create trace writer — traces disabled for this run");
501                None
502            }
503        };
504
505        // Enforce retention: prune traces beyond max_traces / retention_days
506        prune_traces(
507            self.trace_config.max_traces,
508            self.trace_config.retention_days,
509        );
510        trace_path
511    }
512
513    /// Verify media integrity: check that all MediaRef paths exist and sizes match.
514    ///
515    /// Called after all tasks complete but before WorkflowCompleted event.
516    /// Emits a `MediaIntegrityCheck` event with results.
517    /// Returns warning count. Never fails the workflow -- only warns.
518    fn verify_media_integrity(&self) -> usize {
519        let mut warnings = 0;
520        let mut checked: u64 = 0;
521        for (task_id, result) in self.datastore.iter_results() {
522            for media_ref in &result.media {
523                checked += 1;
524                if !media_ref.path.exists() {
525                    tracing::warn!(
526                        task_id = %task_id,
527                        hash = %media_ref.hash,
528                        path = %media_ref.path.display(),
529                        "Media integrity: CAS file missing"
530                    );
531                    warnings += 1;
532                    continue;
533                }
534                match std::fs::metadata(&media_ref.path) {
535                    Ok(meta) => {
536                        if meta.len() != media_ref.size_bytes {
537                            tracing::warn!(
538                                task_id = %task_id,
539                                hash = %media_ref.hash,
540                                expected = media_ref.size_bytes,
541                                actual = meta.len(),
542                                "Media integrity: size mismatch"
543                            );
544                            warnings += 1;
545                        }
546                    }
547                    Err(e) => {
548                        tracing::warn!(
549                            task_id = %task_id,
550                            hash = %media_ref.hash,
551                            error = %e,
552                            "Media integrity: failed to stat CAS file"
553                        );
554                        warnings += 1;
555                    }
556                }
557            }
558        }
559
560        // Emit structured event for telemetry consumers
561        if checked > 0 {
562            self.event_log.emit(EventKind::MediaIntegrityCheck {
563                checked,
564                warnings: warnings as u64,
565            });
566        }
567
568        warnings
569    }
570
571    /// Check if a task qualifies for schema validation retry
572    ///
573    /// Returns Some((schema, max_retries, infer_params)) if:
574    /// - Task action is Infer
575    /// - Output format is JSON
576    /// - Output has inline schema
577    /// - structured.max_retries > 0
578    fn get_retry_config(task: &AnalyzedTask) -> Option<(Value, u8, InferParams)> {
579        // Must be an infer action
580        let infer_action = match &task.action {
581            AnalyzedTaskAction::Infer(infer) => infer,
582            _ => return None,
583        };
584
585        // Must have output with JSON format and inline schema
586        let output = task.output.as_ref()?;
587        if output.format != AnalyzedOutputFormat::Json {
588            return None;
589        }
590
591        // Must have inline schema
592        let schema = output.schema.as_ref()?.clone();
593
594        // max_retries comes from structured output spec, NOT from output policy
595        let structured = task.structured.as_ref()?;
596        let max_retries = structured.max_retries.unwrap_or(0);
597        if max_retries == 0 {
598            return None;
599        }
600
601        // Build InferParams directly from analyzed types
602        let infer_params = InferParams {
603            prompt: infer_action.prompt.clone(),
604            provider: task.provider.clone(),
605            model: task.model.clone(),
606            temperature: infer_action.temperature,
607            max_tokens: infer_action.max_tokens,
608            system: infer_action.system.clone(),
609            response_format: None,
610            extended_thinking: None,
611            thinking_budget: None,
612            content: infer_action
613                .content
614                .as_ref()
615                .map(|parts| parts.iter().cloned().map(Into::into).collect()),
616            guardrails: Vec::new(),
617        };
618
619        Some((schema, max_retries, infer_params))
620    }
621
622    /// Execute an infer task with schema validation and retry loop
623    ///
624    /// When LLM output fails schema validation, builds a feedback prompt with:
625    /// - Original prompt
626    /// - Schema that must be matched
627    /// - Previous output
628    /// - Validation errors
629    ///
630    /// Retries up to max_retries times before failing.
631    #[allow(clippy::too_many_arguments)]
632    async fn execute_with_retry(
633        task_id: &Arc<str>,
634        original_infer: InferParams,
635        schema: &Value,
636        max_retries: u8,
637        bindings: &ResolvedBindings,
638        datastore: &RunContext,
639        executor: &TaskExecutor,
640        event_log: &EventLog,
641        start: Instant,
642        output_policy: Option<&OutputPolicy>,
643    ) -> TaskResult {
644        let mut current_infer = original_infer;
645        let original_prompt = current_infer.prompt.clone();
646        let mut attempts = 0u8;
647
648        // PERF: Compile JSON Schema validator ONCE before the retry loop.
649        // Previously compiled on every iteration (10-50ms per retry wasted).
650        let compiled_validator = jsonschema::validator_for(schema).ok();
651
652        loop {
653            // Check cancellation before each retry attempt (avoids wasting LLM calls)
654            if executor.is_cancelled() {
655                let reason = "cancelled during structured output retry".to_string();
656                event_log.emit(EventKind::TaskFailed {
657                    task_id: Arc::clone(task_id),
658                    error: reason.clone(),
659                    error_code: Some("NIKA-097".to_string()),
660                    duration_ms: start.elapsed().as_millis() as u64,
661                });
662                return TaskResult::failed(reason, start.elapsed());
663            }
664            attempts += 1;
665
666            // Create action for this attempt
667            let action = TaskAction::Infer {
668                infer: current_infer.clone(),
669            };
670
671            // Execute
672            let result = executor
673                .execute(task_id, &action, bindings, datastore, output_policy)
674                .await;
675            let duration = start.elapsed();
676
677            match result {
678                Ok(output) => {
679                    // Try to extract JSON from output
680                    let json_value = match extract_json(&output) {
681                        Ok(v) => v,
682                        Err(e) => {
683                            if attempts > max_retries {
684                                // Max retries exhausted
685                                event_log.emit(EventKind::TaskFailed {
686                                    task_id: Arc::clone(task_id),
687                                    error: format!(
688                                        "NIKA-060: Invalid JSON after {} attempts: {}",
689                                        attempts, e
690                                    ),
691                                    duration_ms: duration.as_millis() as u64,
692                                    error_code: Some("NIKA-060".to_string()),
693                                });
694                                return TaskResult::failed(
695                                    format!(
696                                        "NIKA-060: Invalid JSON output after {} attempts: {}",
697                                        attempts, e
698                                    ),
699                                    duration,
700                                );
701                            }
702
703                            // Build retry prompt with JSON parsing error
704                            tracing::debug!(
705                                task_id = %task_id,
706                                attempt = attempts,
707                                "JSON parsing failed, retrying"
708                            );
709                            current_infer.prompt = Self::build_retry_prompt(
710                                &original_prompt,
711                                schema,
712                                &output,
713                                &format!("JSON parsing failed: {}", e),
714                            );
715                            continue;
716                        }
717                    };
718
719                    // Validate against schema (using pre-compiled validator)
720                    let compiled = match compiled_validator.as_ref() {
721                        Some(c) => c,
722                        None => {
723                            event_log.emit(EventKind::TaskFailed {
724                                task_id: Arc::clone(task_id),
725                                error: "Invalid schema (failed to compile)".to_string(),
726                                duration_ms: duration.as_millis() as u64,
727                                error_code: Some("NIKA-061".to_string()),
728                            });
729                            return TaskResult::failed(
730                                "Invalid inline schema (compilation failed)".to_string(),
731                                duration,
732                            );
733                        }
734                    };
735
736                    let errors: Vec<_> = compiled.iter_errors(&json_value).collect();
737                    if errors.is_empty() {
738                        // Validation passed
739                        event_log.emit(EventKind::TaskCompleted {
740                            task_id: Arc::clone(task_id),
741                            output: Arc::new(json_value.clone()),
742                            duration_ms: duration.as_millis() as u64,
743                        });
744                        return TaskResult::success(json_value, duration);
745                    }
746
747                    // Validation failed
748                    if attempts > max_retries {
749                        let error_feedback = format_validation_errors(&json_value, schema);
750                        event_log.emit(EventKind::TaskFailed {
751                            task_id: Arc::clone(task_id),
752                            error: format!(
753                                "Schema validation failed after {} attempts:\n{}",
754                                attempts, error_feedback
755                            ),
756                            duration_ms: duration.as_millis() as u64,
757                            error_code: Some("NIKA-061".to_string()),
758                        });
759                        return TaskResult::failed(
760                            format!(
761                                "NIKA-061: Schema validation failed after {} attempts:\n{}",
762                                attempts, error_feedback
763                            ),
764                            duration,
765                        );
766                    }
767
768                    // Build retry prompt with validation errors
769                    let error_feedback = format_validation_errors(&json_value, schema);
770                    tracing::debug!(
771                        task_id = %task_id,
772                        attempt = attempts,
773                        errors = %error_feedback,
774                        "Schema validation failed, retrying"
775                    );
776                    current_infer.prompt = Self::build_retry_prompt(
777                        &original_prompt,
778                        schema,
779                        &output,
780                        &error_feedback,
781                    );
782                }
783                Err(e) => {
784                    // Executor error (not validation error) - don't retry
785                    event_log.emit(EventKind::TaskFailed {
786                        task_id: Arc::clone(task_id),
787                        error: e.to_string(),
788                        duration_ms: duration.as_millis() as u64,
789                        error_code: Some(e.code().to_string()),
790                    });
791                    return TaskResult::failed(e.to_string(), duration);
792                }
793            }
794        }
795    }
796
797    /// Build a retry prompt with error feedback
798    fn build_retry_prompt(
799        original_prompt: &str,
800        schema: &Value,
801        previous_output: &str,
802        error_feedback: &str,
803    ) -> String {
804        format!(
805            r#"{original_prompt}
806
807---
808RETRY: Your previous response did not match the required JSON schema.
809
810REQUIRED SCHEMA:
811{schema}
812
813YOUR PREVIOUS OUTPUT:
814{previous_output}
815
816VALIDATION ERRORS:
817{error_feedback}
818
819Please provide a corrected JSON response that strictly matches the schema."#,
820            original_prompt = original_prompt,
821            schema = serde_json::to_string_pretty(schema).unwrap_or_else(|_| schema.to_string()),
822            previous_output = previous_output,
823            error_feedback = error_feedback
824        )
825    }
826
827    /// Execute a single task iteration (used for both regular tasks and for_each items)
828    ///
829    /// Bridge conversions (`lower_action`, `lower_output`) happen here at the
830    /// `TaskExecutor` boundary — the rest of Runner works with `AnalyzedTask`.
831    ///
832    /// # Arguments
833    ///
834    /// * `task` - The analyzed task to execute
835    /// * `task_id` - ID for this specific execution (may include index for for_each)
836    /// * `parent_task_id` - Original task ID (for for_each, this is the parent task ID)
837    /// * `datastore` - Data store for task results
838    /// * `executor` - Task executor
839    /// * `event_log` - Event log for observability
840    /// * `for_each_binding` - Optional (var_name, value, index) for for_each iteration
841    /// * `workflow_artifacts` - Workflow-level artifact configuration
842    /// * `base_path` - Base path for artifact resolution
843    #[allow(clippy::too_many_arguments)] // Artifact integration requires additional params
844    async fn execute_task_iteration(
845        task: Arc<AnalyzedTask>,
846        task_id: Arc<str>,
847        parent_task_id: Arc<str>,
848        datastore: RunContext,
849        executor: TaskExecutor,
850        event_log: EventLog,
851        for_each_binding: Option<(String, Value, usize)>,
852        workflow_artifacts: Option<ArtifactsConfig>,
853        base_path: PathBuf,
854    ) -> IterationResult {
855        let start = Instant::now();
856
857        // Extract for_each info if present
858        let for_each_info = for_each_binding
859            .as_ref()
860            .map(|(_, _, idx)| (Arc::clone(&parent_task_id), *idx));
861
862        // Build bindings from with: spec (always present in AnalyzedTask)
863        let (mut bindings, binding_events) = match ResolvedBindings::from_with_spec_traced(
864            Some(&task.with_spec),
865            &datastore,
866            &task_id,
867        ) {
868            Ok(result) => result,
869            Err(e) => {
870                let duration = start.elapsed();
871                event_log.emit(EventKind::TaskFailed {
872                    task_id: Arc::clone(&task_id),
873                    error: e.to_string(),
874                    duration_ms: duration.as_millis() as u64,
875                    error_code: Some(e.code().to_string()),
876                });
877                return IterationResult {
878                    store_id: task_id,
879                    result: TaskResult::failed(e.to_string(), duration),
880                    for_each_info,
881                    artifact_paths: vec![],
882                };
883            }
884        };
885        // Emit collected binding events
886        for event in binding_events {
887            event_log.emit(event);
888        }
889
890        // Add for_each binding if present
891        if let Some((var_name, value, _idx)) = for_each_binding {
892            bindings.set(&var_name, value);
893        }
894
895        // EMIT: TaskStarted
896        event_log.emit(EventKind::TaskStarted {
897            task_id: Arc::clone(&task_id),
898            verb: Arc::from(task.action.verb_name()),
899            inputs: bindings.to_value(),
900        });
901
902        // Bridge AnalyzedTask to lowered types at executor boundary
903        // PERF(M4): pass references — lower_action clones only what each verb needs
904        let lowered_action = lower_action(&task.action, &task.provider, &task.model, &task.retry);
905        let lowered_output = task
906            .output
907            .as_ref()
908            .map(|o: &AnalyzedOutput| lower_output(o.clone()));
909
910        // Bridge structured: config to OutputPolicy for executor Layer 0 dispatch.
911        // If both output: and structured: are set, output: takes precedence (already lowered).
912        // If only structured: is set, synthesize an OutputPolicy so the executor
913        // can trigger Layer 0 tool injection and prompt schema instructions.
914        let effective_output = if lowered_output.is_some() {
915            lowered_output
916        } else {
917            task.structured.as_ref().map(|spec| spec.to_output_policy())
918        };
919
920        // Check if task qualifies for schema validation retry
921        let retry_config = Self::get_retry_config(&task);
922
923        // Execute with retry loop if configured
924        let mut task_result = if let Some((schema, max_retries, original_infer)) = retry_config {
925            Self::execute_with_retry(
926                &task_id,
927                original_infer,
928                &schema,
929                max_retries,
930                &bindings,
931                &datastore,
932                &executor,
933                &event_log,
934                start,
935                effective_output.as_ref(),
936            )
937            .await
938        } else {
939            // Standard execution without retry
940            let result = executor
941                .execute(
942                    &task_id,
943                    &lowered_action,
944                    &bindings,
945                    &datastore,
946                    effective_output.as_ref(),
947                )
948                .await;
949            let duration = start.elapsed();
950
951            match result {
952                Ok(output) => {
953                    // Structured output validation via 4-layer engine.
954                    //
955                    // Skip if effective_output was set (the executor already validated
956                    // in verbs.rs via StructuredOutputEngine with InferCallback wired).
957                    // Only apply runner-level validation when structured: is set but
958                    // wasn't bridged to the executor (shouldn't happen, but defensive).
959                    let executor_already_validated = effective_output.is_some();
960                    let final_output = if !executor_already_validated {
961                        if let Some(ref structured_spec) = task.structured {
962                            let mut engine = StructuredOutputEngine::new(
963                                structured_spec.clone(),
964                                Arc::new(event_log.clone()),
965                            );
966                            match engine.validate(&task_id, &output).await {
967                                Ok(result) => {
968                                    debug!(
969                                        task_id = %task_id,
970                                        layer = result.layer,
971                                        layer_name = %result.layer_name,
972                                        total_attempts = result.total_attempts,
973                                        "Structured output validation succeeded (runner fallback)"
974                                    );
975                                    result.value.to_string()
976                                }
977                                Err(e) => {
978                                    // Drain any orphaned media refs (defense-in-depth)
979                                    let _ = datastore.take_media(&task_id);
980                                    event_log.emit(EventKind::TaskFailed {
981                                        task_id: Arc::clone(&task_id),
982                                        error: e.to_string(),
983                                        duration_ms: duration.as_millis() as u64,
984                                        error_code: Some(e.code().to_string()),
985                                    });
986                                    return IterationResult {
987                                        store_id: task_id,
988                                        result: TaskResult::failed(e.to_string(), duration),
989                                        for_each_info,
990                                        artifact_paths: vec![],
991                                    };
992                                }
993                            }
994                        } else {
995                            output
996                        }
997                    } else {
998                        output
999                    };
1000
1001                    let tr =
1002                        make_task_result(final_output, effective_output.as_ref(), duration).await;
1003                    // Attach media refs from staging side-channel
1004                    let tr = tr.with_media(datastore.take_media(&task_id));
1005                    if tr.is_success() {
1006                        event_log.emit(EventKind::TaskCompleted {
1007                            task_id: Arc::clone(&task_id),
1008                            output: Arc::clone(&tr.output),
1009                            duration_ms: duration.as_millis() as u64,
1010                        });
1011                    } else {
1012                        event_log.emit(EventKind::TaskFailed {
1013                            task_id: Arc::clone(&task_id),
1014                            error: tr.error().unwrap_or("Unknown error").to_string(),
1015                            duration_ms: duration.as_millis() as u64,
1016                            error_code: Some("NIKA-060".to_string()),
1017                        });
1018                    }
1019                    tr
1020                }
1021                Err(e) => {
1022                    // Drain any orphaned media refs (defense-in-depth)
1023                    let _ = datastore.take_media(&task_id);
1024                    event_log.emit(EventKind::TaskFailed {
1025                        task_id: Arc::clone(&task_id),
1026                        error: e.to_string(),
1027                        duration_ms: duration.as_millis() as u64,
1028                        error_code: Some(e.code().to_string()),
1029                    });
1030                    TaskResult::failed(e.to_string(), duration)
1031                }
1032            }
1033        };
1034
1035        // Process artifacts if task succeeded and has artifact config
1036        let mut artifact_paths = Vec::new();
1037        if task_result.is_success() {
1038            if let Some(ref artifact_spec) = task.artifact {
1039                let output_content = task_result.output_str().into_owned();
1040
1041                let artifact_result = process_task_artifacts(
1042                    &task_id,
1043                    &output_content,
1044                    artifact_spec,
1045                    workflow_artifacts.as_ref(),
1046                    &base_path,
1047                    Some(&event_log),
1048                    &bindings,
1049                    &datastore,
1050                    task_result.media.as_slice(),
1051                )
1052                .await;
1053
1054                if artifact_result.written > 0 {
1055                    debug!(
1056                        task_id = %task_id,
1057                        artifacts_written = artifact_result.written,
1058                        "Artifacts written"
1059                    );
1060                }
1061                artifact_paths = artifact_result.paths;
1062
1063                if !artifact_result.errors.is_empty() {
1064                    let error_msgs: Vec<String> = artifact_result
1065                        .errors
1066                        .iter()
1067                        .map(|err| {
1068                            tracing::error!(
1069                                task_id = %task_id,
1070                                error = %err,
1071                                "Artifact write failed"
1072                            );
1073                            event_log.emit(EventKind::ArtifactFailed {
1074                                task_id: Arc::clone(&task_id),
1075                                path: String::new(),
1076                                reason: err.to_string(),
1077                            });
1078                            err.to_string()
1079                        })
1080                        .collect();
1081                    let error = format!("Artifact write errors: {}", error_msgs.join("; "));
1082                    // Emit TaskFailed to correct the earlier TaskCompleted event
1083                    event_log.emit(EventKind::TaskFailed {
1084                        task_id: Arc::clone(&task_id),
1085                        error: error.clone(),
1086                        error_code: Some("NIKA-281".to_string()),
1087                        duration_ms: start.elapsed().as_millis() as u64,
1088                    });
1089                    task_result = TaskResult::failed(error, start.elapsed());
1090                }
1091            }
1092        }
1093
1094        IterationResult {
1095            store_id: task_id,
1096            result: task_result,
1097            for_each_info,
1098            artifact_paths,
1099        }
1100    }
1101
1102    /// Main execution loop
1103    #[instrument(skip(self), fields(workflow_tasks = self.workflow.tasks.len()))]
1104    pub async fn run(&mut self) -> Result<String, NikaError> {
1105        let workflow_start = Instant::now();
1106        info!("Starting workflow execution");
1107
1108        // Check for cancellation before starting
1109        if self.cancel_token.is_cancelled() {
1110            let duration = workflow_start.elapsed();
1111            self.event_log.emit(EventKind::WorkflowAborted {
1112                reason: "Workflow cancelled before start".to_string(),
1113                duration_ms: duration.as_millis() as u64,
1114                running_tasks: vec![],
1115            });
1116            self.write_trace();
1117            return Err(NikaError::WorkflowCancelled {
1118                phase: "before start".to_string(),
1119            });
1120        }
1121
1122        // Load context files if workflow has context_files
1123        let base_path = std::env::current_dir().unwrap_or_else(|e| {
1124            tracing::warn!(error = %e, "Failed to get current directory, using '.'");
1125            std::path::PathBuf::from(".")
1126        });
1127
1128        // Set workspace root for CAS media store path resolution
1129        self.datastore.set_workspace_root(base_path.clone());
1130
1131        // RAII lockfile: auto-removed on all exit paths (normal, error, panic)
1132        let _lockfile_guard = LockfileGuard::create(
1133            base_path
1134                .join(".nika")
1135                .join("media")
1136                .join("store")
1137                .join(".nika-run.lock"),
1138        );
1139
1140        if !self.workflow.context_files.is_empty() {
1141            let loaded_context =
1142                load_context_analyzed(&self.workflow.context_files, &base_path).await?;
1143            self.datastore.set_context(loaded_context);
1144            debug!("Loaded {} context files", self.workflow.context_files.len());
1145        }
1146
1147        // Load inputs if workflow has inputs
1148        if !self.workflow.inputs.is_empty() {
1149            let inputs_map: rustc_hash::FxHashMap<String, serde_json::Value> = self
1150                .workflow
1151                .inputs
1152                .iter()
1153                .map(|(k, v)| (k.clone(), v.clone()))
1154                .collect();
1155            self.datastore.set_inputs(inputs_map);
1156            debug!("Loaded {} input parameters", self.workflow.inputs.len());
1157        }
1158
1159        // Resolve agents
1160        if self.workflow.agents.is_some() {
1161            self.resolved_assets = resolve_assets_analyzed(&self.workflow, &base_path).await?;
1162            debug!(
1163                agents = self.resolved_assets.agents.len(),
1164                skills = self.resolved_assets.skills.len(),
1165                "Resolved workflow assets"
1166            );
1167        }
1168
1169        // Wire workflow-level skills mapping into the executor for agent skill injection.
1170        // TaskExecutor is Clone, so we clone-and-replace to call the builder-style setter.
1171        if !self.workflow.skills_map.is_empty() {
1172            self.executor = self
1173                .executor
1174                .clone()
1175                .with_skills(self.workflow.skills_map.clone(), base_path.clone());
1176            debug!(
1177                skills_count = self.workflow.skills_map.len(),
1178                "Wired skills mapping into executor"
1179            );
1180        }
1181
1182        let total_tasks = self.workflow.tasks.len();
1183        let mut _completed = 0;
1184
1185        // EMIT: WorkflowStarted
1186        self.event_log.emit(EventKind::WorkflowStarted {
1187            task_count: total_tasks,
1188            generation_id: self.generation_id.clone(),
1189            workflow_hash: self.workflow.compute_hash(),
1190            nika_version: env!("CARGO_PKG_VERSION").to_string(),
1191        });
1192
1193        if !self.quiet {
1194            println!();
1195
1196            // IMP-5: Warn if no task has output or artifact config
1197            let has_observable_output = self
1198                .workflow
1199                .tasks
1200                .iter()
1201                .any(|t| t.output.is_some() || t.artifact.is_some());
1202            if !has_observable_output && total_tasks > 1 {
1203                println!(
1204                    "  {} {}\n",
1205                    "⚠".yellow(),
1206                    "No tasks have output: or artifact: config — results won't be persisted"
1207                        .yellow()
1208                );
1209            }
1210        }
1211
1212        // PERF(M2): Compute DAG layers ONCE, reuse for both display and summary.
1213        // compute_layers() is O(N²) worst-case; previously called twice with identical inputs.
1214        let cached_depths = if total_tasks > 1 {
1215            let nodes: Vec<&str> = self
1216                .workflow
1217                .tasks
1218                .iter()
1219                .map(|t| t.name.as_str())
1220                .collect();
1221            let edges: Vec<(&str, &str)> = self
1222                .workflow
1223                .tasks
1224                .iter()
1225                .flat_map(|task| {
1226                    task.depends_on.iter().filter_map(|dep_id| {
1227                        self.workflow
1228                            .task_table
1229                            .get_name(*dep_id)
1230                            .map(|dep_name| (dep_name, task.name.as_str()))
1231                    })
1232                })
1233                .collect();
1234            Some(crate::dag::flow::compute_layers(&nodes, &edges))
1235        } else {
1236            None
1237        };
1238
1239        // Print static DAG + set up CliRenderer task layers
1240        if let Some(ref mut renderer) = self.cli_renderer {
1241            if let Some(ref depths) = cached_depths {
1242                use crate::display::dag::{StaticDagEdge, StaticDagTask};
1243                let dag_tasks: Vec<StaticDagTask> = self
1244                    .workflow
1245                    .tasks
1246                    .iter()
1247                    .map(|t| StaticDagTask {
1248                        id: t.name.clone(),
1249                        verb: t.action.verb_name().to_string(),
1250                        layer: depths[t.name.as_str()],
1251                    })
1252                    .collect();
1253                let total_deps: usize =
1254                    self.workflow.tasks.iter().map(|t| t.depends_on.len()).sum();
1255                let mut dag_edges = Vec::with_capacity(total_deps);
1256                for task in &self.workflow.tasks {
1257                    for dep_id in &task.depends_on {
1258                        if let Some(dep_name) = self.workflow.task_table.get_name(*dep_id) {
1259                            dag_edges.push(StaticDagEdge {
1260                                from: dep_name.to_string(),
1261                                to: task.name.clone(),
1262                            });
1263                        }
1264                    }
1265                }
1266                crate::display::dag::print_static_dag(&dag_tasks, &dag_edges);
1267                println!("{}", "\u{254C}".repeat(69).dimmed());
1268                println!();
1269                let task_layers: std::collections::HashMap<Arc<str>, usize> = self
1270                    .workflow
1271                    .tasks
1272                    .iter()
1273                    .map(|t| (Arc::from(t.name.as_str()), depths[t.name.as_str()]))
1274                    .collect();
1275                renderer.set_task_layers(task_layers);
1276            }
1277
1278            // Initialize live task bars (no-op for Classic renderer)
1279            let task_ids: Vec<String> =
1280                self.workflow.tasks.iter().map(|t| t.name.clone()).collect();
1281            let task_deps: std::collections::HashMap<String, Vec<String>> = self
1282                .workflow
1283                .tasks
1284                .iter()
1285                .map(|t| {
1286                    let deps: Vec<String> = t
1287                        .depends_on
1288                        .iter()
1289                        .filter_map(|dep_id| {
1290                            self.workflow
1291                                .task_table
1292                                .get_name(*dep_id)
1293                                .map(|s| s.to_string())
1294                        })
1295                        .collect();
1296                    (t.name.clone(), deps)
1297                })
1298                .collect();
1299            renderer.init_tasks(&task_ids, &task_deps);
1300        }
1301
1302        loop {
1303            // Check for cancellation at start of each loop iteration
1304            if self.cancel_token.is_cancelled() {
1305                let duration = workflow_start.elapsed();
1306                // Collect IDs of tasks that haven't completed yet
1307                let running_tasks: Vec<Arc<str>> = self
1308                    .workflow
1309                    .tasks
1310                    .iter()
1311                    .filter(|t| !self.datastore.contains(&t.name))
1312                    .map(|t| Arc::from(t.name.as_str()))
1313                    .collect();
1314
1315                self.event_log.emit(EventKind::WorkflowAborted {
1316                    reason: "Workflow cancelled by user".to_string(),
1317                    duration_ms: duration.as_millis() as u64,
1318                    running_tasks,
1319                });
1320                self.write_trace();
1321                return Err(NikaError::WorkflowCancelled {
1322                    phase: "by user".to_string(),
1323                });
1324            }
1325
1326            // Check for pause at start of each loop iteration
1327            // Waits until resumed, while also checking for cancellation
1328            while self.paused.load(Ordering::SeqCst) {
1329                tokio::select! {
1330                    biased;
1331                    _ = self.cancel_token.cancelled() => {
1332                        // Cancelled while paused
1333                        let duration = workflow_start.elapsed();
1334                        let running_tasks: Vec<Arc<str>> = self
1335                            .workflow
1336                            .tasks
1337                            .iter()
1338                            .filter(|t| !self.datastore.contains(&t.name))
1339                            .map(|t| Arc::from(t.name.as_str()))
1340                            .collect();
1341
1342                        self.event_log.emit(EventKind::WorkflowAborted {
1343                            reason: "Workflow cancelled while paused".to_string(),
1344                            duration_ms: duration.as_millis() as u64,
1345                            running_tasks,
1346                        });
1347                        self.write_trace();
1348                        return Err(NikaError::WorkflowCancelled {
1349                            phase: "while paused".to_string(),
1350                        });
1351                    }
1352                    _ = self.resume_notify.notified() => {
1353                        // Resumed, continue loop
1354                    }
1355                }
1356            }
1357
1358            let mut renderer = self.cli_renderer.take();
1359
1360            let ready = self.get_ready_tasks();
1361
1362            // Check for completion or deadlock
1363            if ready.is_empty() {
1364                self.cli_renderer = renderer;
1365
1366                if self.all_done() {
1367                    // Check if any tasks actually failed before declaring success.
1368                    // all_done() returns true when all tasks are in the datastore,
1369                    // including failed/dependency-failed tasks.
1370                    let failed_tasks: Vec<String> = self
1371                        .workflow
1372                        .tasks
1373                        .iter()
1374                        .filter(|t| self.datastore.is_failed(&t.name))
1375                        .map(|t| t.name.clone())
1376                        .collect();
1377
1378                    if !failed_tasks.is_empty() {
1379                        let root_failure = self.find_root_failure();
1380                        let dep_failed_count = failed_tasks
1381                            .iter()
1382                            .filter(|t| self.datastore.is_dependency_failed(t))
1383                            .count();
1384
1385                        self.event_log.emit(EventKind::WorkflowFailed {
1386                            error: format!(
1387                                "{} task(s) failed ({} direct, {} from dependency chain)",
1388                                failed_tasks.len(),
1389                                failed_tasks.len() - dep_failed_count,
1390                                dep_failed_count,
1391                            ),
1392                            failed_task: root_failure.clone().map(Arc::from),
1393                        });
1394                        self.write_trace();
1395                        return Err(NikaError::DependencyChainFailed {
1396                            count: failed_tasks.len(),
1397                            blocked_tasks: failed_tasks,
1398                            root_failure,
1399                        });
1400                    }
1401                    break;
1402                }
1403
1404                // Check if we're blocked due to dependency failures (not a deadlock)
1405                let pending = self.get_pending_tasks();
1406                if pending.is_empty() {
1407                    // All tasks are done - shouldn't happen, but check for consistency
1408                    break;
1409                }
1410
1411                // Check for dependency chain failures
1412                let blocked_by_dep_failure: Vec<String> = self
1413                    .workflow
1414                    .tasks
1415                    .iter()
1416                    .filter(|t| self.datastore.is_dependency_failed(&t.name))
1417                    .map(|t| t.name.clone())
1418                    .collect();
1419
1420                if !blocked_by_dep_failure.is_empty() {
1421                    // Not a deadlock - tasks are blocked due to failed dependencies
1422                    let root_failure = self.find_root_failure();
1423
1424                    self.event_log.emit(EventKind::WorkflowFailed {
1425                        error: format!(
1426                            "Dependency chain failed: {} task(s) blocked by failed dependencies",
1427                            blocked_by_dep_failure.len()
1428                        ),
1429                        failed_task: root_failure.clone().map(Arc::from),
1430                    });
1431                    self.write_trace();
1432                    return Err(NikaError::DependencyChainFailed {
1433                        count: blocked_by_dep_failure.len(),
1434                        blocked_tasks: blocked_by_dep_failure,
1435                        root_failure,
1436                    });
1437                }
1438
1439                // Actual deadlock - no tasks ready and no dependency failures detected
1440                // This indicates a cycle or other structural issue
1441                self.event_log.emit(EventKind::WorkflowFailed {
1442                    error: "Deadlock: no tasks ready but workflow not complete".to_string(),
1443                    failed_task: None,
1444                });
1445                self.write_trace();
1446                return Err(NikaError::RuntimeDeadlock {
1447                    details:
1448                        "no tasks ready but workflow not complete. Check for circular dependencies."
1449                            .to_string(),
1450                });
1451            }
1452
1453            // Spawn all ready tasks in parallel (Tokio handles concurrency)
1454            let mut join_set = JoinSet::new();
1455
1456            // Per-parent cancellation tokens for for_each fail_fast.
1457            // Using targeted cancellation instead of JoinSet::abort_all() to avoid
1458            // killing unrelated sibling tasks from other for_each parents (Bug #26).
1459            let mut for_each_cancel_tokens: rustc_hash::FxHashMap<Arc<str>, CancellationToken> =
1460                rustc_hash::FxHashMap::default();
1461
1462            // Prepare artifact config for all tasks in this batch
1463            let workflow_artifacts = self.workflow.artifacts.clone();
1464            let artifact_base_path = base_path.clone();
1465
1466            for task in ready {
1467                let task_id = intern(&task.name);
1468
1469                // EMIT: TaskScheduled
1470                let deps = self.flow_graph.get_dependencies(&task.name);
1471                let sched_kind = EventKind::TaskScheduled {
1472                    task_id: Arc::clone(&task_id),
1473                    dependencies: deps.to_vec(),
1474                };
1475                if let Some(ref mut r) = renderer {
1476                    r.render_kind(&sched_kind);
1477                }
1478                self.event_log.emit(sched_kind);
1479
1480                // Check if task has decompose - expands to for_each items
1481                // decompose takes priority over for_each (they're mutually exclusive)
1482                let for_each_items: Option<Vec<Value>> = if let Some(decompose) =
1483                    task.decompose.as_ref()
1484                {
1485                    debug!(
1486                        task_id = %task.name,
1487                        strategy = ?decompose.strategy,
1488                        traverse = %decompose.traverse,
1489                        "Expanding decompose modifier"
1490                    );
1491                    self.event_log.emit(EventKind::DecomposeStarted {
1492                        task_id: Arc::from(task.name.as_str()),
1493                        strategy: format!("{:?}", decompose.strategy).to_lowercase(),
1494                    });
1495                    // Resolve bindings for decompose source
1496                    let bindings = match ResolvedBindings::from_with_spec(
1497                        Some(&task.with_spec),
1498                        &self.datastore,
1499                    ) {
1500                        Ok(b) => b,
1501                        Err(e) => {
1502                            tracing::error!(
1503                                task_id = %task.name,
1504                                error = %e,
1505                                "Failed to resolve bindings for decompose"
1506                            );
1507                            self.datastore.insert(
1508                                intern(&task.name),
1509                                TaskResult::failed(
1510                                    format!("Decompose binding resolution failed: {e}"),
1511                                    std::time::Duration::ZERO,
1512                                ),
1513                            );
1514                            continue;
1515                        }
1516                    };
1517                    // Expand decompose using executor (with timeout to prevent silent hangs)
1518                    let decompose_start = Instant::now();
1519                    let decompose_result = tokio::time::timeout(
1520                        DECOMPOSE_TIMEOUT,
1521                        self.executor
1522                            .expand_decompose(decompose, &bindings, &self.datastore),
1523                    )
1524                    .await;
1525
1526                    match decompose_result {
1527                        Ok(Ok(items)) => {
1528                            self.event_log.emit(EventKind::DecomposeCompleted {
1529                                task_id: Arc::from(task.name.as_str()),
1530                                strategy: format!("{:?}", decompose.strategy).to_lowercase(),
1531                                item_count: items.len(),
1532                                duration_ms: decompose_start.elapsed().as_millis() as u64,
1533                            });
1534                            Some(items)
1535                        }
1536                        Ok(Err(e)) => {
1537                            // Decompose expansion failed
1538                            self.event_log.emit(EventKind::DecomposeCompleted {
1539                                task_id: Arc::from(task.name.as_str()),
1540                                strategy: format!("{:?}", decompose.strategy).to_lowercase(),
1541                                item_count: 0,
1542                                duration_ms: decompose_start.elapsed().as_millis() as u64,
1543                            });
1544                            self.datastore.insert(
1545                                intern(&task.name),
1546                                TaskResult::failed(e.to_string(), std::time::Duration::ZERO),
1547                            );
1548                            continue;
1549                        }
1550                        Err(_timeout) => {
1551                            // Decompose expansion timed out
1552                            self.event_log.emit(EventKind::DecomposeCompleted {
1553                                task_id: Arc::from(task.name.as_str()),
1554                                strategy: format!("{:?}", decompose.strategy).to_lowercase(),
1555                                item_count: 0,
1556                                duration_ms: decompose_start.elapsed().as_millis() as u64,
1557                            });
1558                            let timeout_error = NikaError::DecomposeTimeout {
1559                                task_id: task.name.clone(),
1560                                timeout_secs: DECOMPOSE_TIMEOUT.as_secs(),
1561                            };
1562                            self.datastore.insert(
1563                                intern(&task.name),
1564                                TaskResult::failed(timeout_error.to_string(), DECOMPOSE_TIMEOUT),
1565                            );
1566                            continue;
1567                        }
1568                    }
1569                } else if let Some(ref for_each) = task.for_each {
1570                    // AnalyzedForEach has structured fields: items, as_var, concurrency, fail_fast
1571                    let items_str = &for_each.items;
1572
1573                    if for_each.is_binding() {
1574                        // Binding reference ($alias, {{with.alias}}, {{inputs.xxx}})
1575                        let bindings = match ResolvedBindings::from_with_spec(
1576                            Some(&task.with_spec),
1577                            &self.datastore,
1578                        ) {
1579                            Ok(b) => b,
1580                            Err(e) => {
1581                                tracing::error!(
1582                                    task_id = %task.name,
1583                                    error = %e,
1584                                    "Failed to resolve bindings for for_each"
1585                                );
1586                                self.datastore.insert(
1587                                    intern(&task.name),
1588                                    TaskResult::failed(
1589                                        format!("for_each binding resolution failed: {e}"),
1590                                        std::time::Duration::ZERO,
1591                                    ),
1592                                );
1593                                continue;
1594                            }
1595                        };
1596
1597                        if let Some(alias) = items_str.strip_prefix('$') {
1598                            // Check for $inputs.xxx format first (workflow inputs)
1599                            if alias.starts_with("inputs.") {
1600                                match self.datastore.resolve_input_path(alias) {
1601                                    Some(value) => match value_to_array(&value) {
1602                                        Some(items) => Some(items),
1603                                        None => {
1604                                            self.datastore.insert(
1605                                                intern(&task.name),
1606                                                TaskResult::failed(
1607                                                    format!(
1608                                                        "for_each binding '${}' resolved to non-array value",
1609                                                        alias
1610                                                    ),
1611                                                    std::time::Duration::ZERO,
1612                                                ),
1613                                            );
1614                                            continue;
1615                                        }
1616                                    },
1617                                    None => {
1618                                        self.datastore.insert(
1619                                            intern(&task.name),
1620                                            TaskResult::failed(
1621                                                format!(
1622                                                    "for_each input '{}' not found in workflow inputs",
1623                                                    alias
1624                                                ),
1625                                                std::time::Duration::ZERO,
1626                                            ),
1627                                        );
1628                                        continue;
1629                                    }
1630                                }
1631                            } else {
1632                                // $alias or $alias.nested.path format
1633                                let mut segments = alias.split('.');
1634                                let Some(base_alias) = segments.next() else {
1635                                    self.datastore.insert(
1636                                        intern(&task.name),
1637                                        TaskResult::failed(
1638                                            "for_each: empty alias after '$' prefix".to_string(),
1639                                            std::time::Duration::ZERO,
1640                                        ),
1641                                    );
1642                                    continue;
1643                                };
1644
1645                                // Try with: bindings first, then fall back to datastore
1646                                let base_result = bindings
1647                                    .get_resolved(base_alias, &self.datastore)
1648                                    .or_else(|_| {
1649                                        // Fall back to direct datastore lookup for $task_id
1650                                        self.datastore
1651                                            .get_output(base_alias)
1652                                            .map(|arc| arc.as_ref().clone())
1653                                            .ok_or_else(|| NikaError::BindingNotFound {
1654                                                alias: base_alias.to_string(),
1655                                            })
1656                                    });
1657                                match base_result {
1658                                    Ok(base_value) => {
1659                                        // Auto-parse JSON strings before traversal
1660                                        let parsed_value;
1661                                        let working_value: &Value = if let Some(v) =
1662                                            crate::binding::jsonpath::try_parse_json_str(
1663                                                &base_value,
1664                                            ) {
1665                                            parsed_value = v;
1666                                            &parsed_value
1667                                        } else {
1668                                            &base_value
1669                                        };
1670
1671                                        // Traverse nested path segments if present
1672                                        let mut value_ref: &Value = working_value;
1673                                        let mut traversal_failed = false;
1674
1675                                        for segment in segments {
1676                                            let next = if let Ok(idx) = segment.parse::<usize>() {
1677                                                value_ref.get(idx)
1678                                            } else {
1679                                                value_ref.get(segment)
1680                                            };
1681
1682                                            match next {
1683                                                Some(v) => value_ref = v,
1684                                                None => {
1685                                                    self.datastore.insert(
1686                                                        intern(&task.name),
1687                                                        TaskResult::failed(
1688                                                            format!(
1689                                                                "for_each binding '${}': nested path segment '{}' not found",
1690                                                                alias, segment
1691                                                            ),
1692                                                            std::time::Duration::ZERO,
1693                                                        ),
1694                                                    );
1695                                                    traversal_failed = true;
1696                                                    break;
1697                                                }
1698                                            }
1699                                        }
1700
1701                                        if traversal_failed {
1702                                            continue;
1703                                        }
1704
1705                                        match value_to_array(value_ref) {
1706                                            Some(items) => Some(items),
1707                                            None => {
1708                                                self.datastore.insert(
1709                                                    intern(&task.name),
1710                                                    TaskResult::failed(
1711                                                        format!(
1712                                                            "for_each binding '${}' resolved to non-array value",
1713                                                            alias
1714                                                        ),
1715                                                        std::time::Duration::ZERO,
1716                                                    ),
1717                                                );
1718                                                continue;
1719                                            }
1720                                        }
1721                                    }
1722                                    Err(e) => {
1723                                        self.datastore.insert(
1724                                            intern(&task.name),
1725                                            TaskResult::failed(
1726                                                format!(
1727                                                    "for_each binding '{}' not found: {}",
1728                                                    base_alias, e
1729                                                ),
1730                                                std::time::Duration::ZERO,
1731                                            ),
1732                                        );
1733                                        continue;
1734                                    }
1735                                }
1736                            }
1737                        } else if items_str.contains("{{inputs.") {
1738                            // Template format for inputs (e.g., "{{inputs.items}}")
1739                            if let Some(start) = items_str.find("{{inputs.") {
1740                                let after = &items_str[start + 9..];
1741                                if let Some(end) = after.find("}}") {
1742                                    let param_path = &after[..end];
1743                                    let full_path = format!("inputs.{}", param_path);
1744                                    match self.datastore.resolve_input_path(&full_path) {
1745                                        Some(value) => match value_to_array(&value) {
1746                                            Some(items) => Some(items),
1747                                            None => {
1748                                                self.datastore.insert(
1749                                                    intern(&task.name),
1750                                                    TaskResult::failed(
1751                                                        format!(
1752                                                            "for_each binding '{{{{inputs.{}}}}}' resolved to non-array value",
1753                                                            param_path
1754                                                        ),
1755                                                        std::time::Duration::ZERO,
1756                                                    ),
1757                                                );
1758                                                continue;
1759                                            }
1760                                        },
1761                                        None => {
1762                                            self.datastore.insert(
1763                                                intern(&task.name),
1764                                                TaskResult::failed(
1765                                                    format!(
1766                                                        "for_each input '{}' not found in workflow inputs",
1767                                                        full_path
1768                                                    ),
1769                                                    std::time::Duration::ZERO,
1770                                                ),
1771                                            );
1772                                            continue;
1773                                        }
1774                                    }
1775                                } else {
1776                                    None
1777                                }
1778                            } else {
1779                                None
1780                            }
1781                        } else if items_str.contains("{{with.") {
1782                            // Template format (e.g., "{{with.locales}}")
1783                            let prefix_info = items_str.find("{{with.").map(|s| (s, 7usize));
1784                            if let Some((start, prefix_len)) = prefix_info {
1785                                let after = &items_str[start + prefix_len..];
1786                                if let Some(end) = after.find("}}") {
1787                                    let path = &after[..end];
1788                                    let mut parts = path.split('.');
1789                                    let Some(alias) = parts.next() else {
1790                                        continue;
1791                                    };
1792
1793                                    match bindings.get_resolved(alias, &self.datastore) {
1794                                        Ok(base_value) => {
1795                                            // Auto-parse JSON strings
1796                                            let parsed_value;
1797                                            let working_value: &Value = if let Some(v) =
1798                                                crate::binding::jsonpath::try_parse_json_str(
1799                                                    &base_value,
1800                                                ) {
1801                                                parsed_value = v;
1802                                                &parsed_value
1803                                            } else {
1804                                                &base_value
1805                                            };
1806
1807                                            let mut value_ref: &Value = working_value;
1808                                            let mut traversal_failed = false;
1809
1810                                            for segment in parts {
1811                                                let next = if let Ok(idx) = segment.parse::<usize>()
1812                                                {
1813                                                    value_ref.get(idx)
1814                                                } else {
1815                                                    value_ref.get(segment)
1816                                                };
1817
1818                                                match next {
1819                                                    Some(v) => value_ref = v,
1820                                                    None => {
1821                                                        tracing::warn!(
1822                                                            task_id = %task.name,
1823                                                            path = %path,
1824                                                            segment = %segment,
1825                                                            "for_each nested path segment not found"
1826                                                        );
1827                                                        traversal_failed = true;
1828                                                        break;
1829                                                    }
1830                                                }
1831                                            }
1832
1833                                            if traversal_failed {
1834                                                self.datastore.insert(
1835                                                    intern(&task.name),
1836                                                    TaskResult::failed(
1837                                                        format!(
1838                                                            "for_each items: path traversal failed for '{{{{with.{}}}}}'",
1839                                                            path
1840                                                        ),
1841                                                        std::time::Duration::ZERO,
1842                                                    ),
1843                                                );
1844                                                continue;
1845                                            } else {
1846                                                match value_to_array(value_ref) {
1847                                                    Some(items) => Some(items),
1848                                                    None => {
1849                                                        self.datastore.insert(
1850                                                            intern(&task.name),
1851                                                            TaskResult::failed(
1852                                                                format!(
1853                                                                    "for_each binding '{{{{with.{}}}}}' resolved to non-array value",
1854                                                                    path
1855                                                                ),
1856                                                                std::time::Duration::ZERO,
1857                                                            ),
1858                                                        );
1859                                                        continue;
1860                                                    }
1861                                                }
1862                                            }
1863                                        }
1864                                        Err(e) => {
1865                                            self.datastore.insert(
1866                                                intern(&task.name),
1867                                                TaskResult::failed(
1868                                                    format!(
1869                                                        "for_each binding '{}' not found: {}",
1870                                                        alias, e
1871                                                    ),
1872                                                    std::time::Duration::ZERO,
1873                                                ),
1874                                            );
1875                                            continue;
1876                                        }
1877                                    }
1878                                } else {
1879                                    None
1880                                }
1881                            } else {
1882                                None
1883                            }
1884                        } else {
1885                            None
1886                        }
1887                    } else if for_each.is_array() {
1888                        // Direct JSON array literal
1889                        for_each.parse_items()
1890                    } else {
1891                        None
1892                    }
1893                } else {
1894                    None
1895                };
1896
1897                // Check if task has for_each or decompose items
1898                if let Some(items) = for_each_items {
1899                    if !items.is_empty() {
1900                        // Note: total_tasks was used for progress display [N/M] but
1901                        // the new CLI format uses verb icons instead. Keeping the
1902                        // adjustment in case we add the counter back.
1903
1904                        // Get concurrency settings: for_each overrides, then standalone task fields
1905                        let fe = task.for_each.as_ref();
1906                        let concurrency = fe
1907                            .and_then(|f| f.concurrency)
1908                            .or(task.concurrency)
1909                            .unwrap_or(1)
1910                            .max(1) as usize;
1911                        let fail_fast = fe.map(|f| f.fail_fast).or(task.fail_fast).unwrap_or(true);
1912
1913                        debug!(
1914                            task_id = %task.name,
1915                            items = items.len(),
1916                            concurrency = concurrency,
1917                            fail_fast = fail_fast,
1918                            "Starting for_each iteration"
1919                        );
1920                        self.event_log.emit(EventKind::ForEachStarted {
1921                            task_id: Arc::from(task.name.as_str()),
1922                            item_count: items.len(),
1923                            concurrency,
1924                            fail_fast,
1925                        });
1926
1927                        // Create semaphore for concurrency limiting
1928                        let semaphore = Arc::new(Semaphore::new(concurrency));
1929                        // Create cancellation token for fail_fast (notification-based, no busy-poll)
1930                        let cancel = CancellationToken::new();
1931
1932                        // Store token so the result-collection loop can cancel only THIS
1933                        // parent's iterations on fail_fast, not the entire JoinSet.
1934                        if fail_fast {
1935                            for_each_cancel_tokens.insert(intern(&task.name), cancel.clone());
1936                        }
1937
1938                        // Spawn one execution per item in the array
1939                        let var_name = fe.map(|f| f.as_var.as_str()).unwrap_or("item").to_string();
1940                        // PERF(M1): Wrap in Arc once, then Arc::clone per iteration
1941                        // instead of deep-cloning AnalyzedTask (~800-1200 bytes each).
1942                        let task = Arc::new(task.clone());
1943                        // PERF(L7): Pre-allocate format buffer for task_id construction
1944                        let mut task_id_buf = String::with_capacity(task.name.len() + 8);
1945                        for (idx, item) in items.iter().enumerate() {
1946                            // Check if cancelled before spawning
1947                            if fail_fast && cancel.is_cancelled() {
1948                                debug!(
1949                                    task_id = %task.name,
1950                                    idx = idx,
1951                                    "Skipping iteration due to fail_fast cancellation"
1952                                );
1953                                break;
1954                            }
1955
1956                            let task = Arc::clone(&task);
1957                            task_id_buf.clear();
1958                            use std::fmt::Write;
1959                            let _ = write!(task_id_buf, "{}[{}]", task.name, idx);
1960                            let task_id = intern(&task_id_buf);
1961                            let parent_task_id = intern(&task.name);
1962                            let datastore = self.datastore.clone();
1963                            let executor = self.executor.clone();
1964                            let event_log = self.event_log.clone();
1965                            let item = item.clone();
1966                            let var_name = var_name.clone();
1967                            let semaphore = Arc::clone(&semaphore);
1968                            let cancel = cancel.clone();
1969                            let workflow_artifacts = workflow_artifacts.clone();
1970                            let artifact_base_path = artifact_base_path.clone();
1971
1972                            join_set.spawn(async move {
1973                                // Check cancellation BEFORE acquiring semaphore
1974                                if cancel.is_cancelled() {
1975                                    return IterationResult {
1976                                        store_id: task_id,
1977                                        result: TaskResult::skipped(
1978                                            "Cancelled due to fail_fast before semaphore acquire"
1979                                                .to_string(),
1980                                        ),
1981                                        for_each_info: Some((parent_task_id, idx)),
1982                                        artifact_paths: vec![],
1983                                    };
1984                                }
1985
1986                                // Race semaphore acquisition against cancellation token
1987                                let _permit = tokio::select! {
1988                                    biased;
1989
1990                                    _ = cancel.cancelled() => {
1991                                        return IterationResult {
1992                                            store_id: task_id,
1993                                            result: TaskResult::skipped(
1994                                                "Cancelled while waiting for semaphore".to_string(),
1995                                            ),
1996                                            for_each_info: Some((parent_task_id, idx)),
1997                                            artifact_paths: vec![],
1998                                        };
1999                                    }
2000
2001                                    permit = semaphore.acquire() => {
2002                                        match permit {
2003                                            Ok(p) => p,
2004                                            Err(_) => {
2005                                                return IterationResult {
2006                                                    store_id: task_id,
2007                                                    result: TaskResult::failed(
2008                                                        "Semaphore closed unexpectedly".to_string(),
2009                                                        std::time::Duration::ZERO,
2010                                                    ),
2011                                                    for_each_info: Some((parent_task_id, idx)),
2012                                                    artifact_paths: vec![],
2013                                                };
2014                                            }
2015                                        }
2016                                    }
2017                                };
2018
2019                                // Final check after acquiring permit
2020                                if cancel.is_cancelled() {
2021                                    return IterationResult {
2022                                        store_id: task_id,
2023                                        result: TaskResult::skipped(
2024                                            "Cancelled after semaphore acquire".to_string(),
2025                                        ),
2026                                        for_each_info: Some((parent_task_id, idx)),
2027                                        artifact_paths: vec![],
2028                                    };
2029                                }
2030
2031                                let result = Self::execute_task_iteration(
2032                                    task,
2033                                    Arc::clone(&task_id),
2034                                    Arc::clone(&parent_task_id),
2035                                    datastore,
2036                                    executor,
2037                                    event_log,
2038                                    Some((var_name, item, idx)),
2039                                    workflow_artifacts,
2040                                    artifact_base_path,
2041                                )
2042                                .await;
2043
2044                                // If failed and fail_fast, signal cancellation
2045                                if !result.result.is_success() && fail_fast {
2046                                    cancel.cancel();
2047                                }
2048
2049                                result
2050                            });
2051                        }
2052                    } else {
2053                        // Empty for_each array: store empty array result immediately
2054                        debug!(
2055                            task_id = %task.name,
2056                            "for_each items array is empty, storing empty result"
2057                        );
2058                        self.datastore.insert(
2059                            intern(&task.name),
2060                            TaskResult::success(Value::Array(vec![]), std::time::Duration::ZERO),
2061                        );
2062                    }
2063                } else if task.for_each.is_some() {
2064                    // for_each was declared but items could not be resolved
2065                    // (unrecognized pattern, malformed JSON, malformed template).
2066                    // Fail explicitly instead of silently running as a regular task.
2067                    self.datastore.insert(
2068                        intern(&task.name),
2069                        TaskResult::failed(
2070                            format!(
2071                                "for_each items could not be resolved for task '{}'. \
2072                                 Check the binding reference.",
2073                                task.name
2074                            ),
2075                            std::time::Duration::ZERO,
2076                        ),
2077                    );
2078                    continue;
2079                } else {
2080                    // Regular task without for_each
2081                    let task = Arc::new(task.clone());
2082                    let datastore = self.datastore.clone();
2083                    let executor = self.executor.clone();
2084                    let event_log = self.event_log.clone();
2085                    let workflow_artifacts = workflow_artifacts.clone();
2086                    let artifact_base_path = artifact_base_path.clone();
2087
2088                    join_set.spawn(async move {
2089                        Self::execute_task_iteration(
2090                            task,
2091                            Arc::clone(&task_id),
2092                            task_id,
2093                            datastore,
2094                            executor,
2095                            event_log,
2096                            None,
2097                            workflow_artifacts,
2098                            artifact_base_path,
2099                        )
2100                        .await
2101                    });
2102                }
2103            }
2104
2105            self.cli_renderer = renderer;
2106
2107            // Collect for_each results for aggregation: parent_id -> Vec<(index, result)>
2108            // Use IndexMap to preserve insertion order (deterministic iteration)
2109            let mut for_each_results: IndexMap<Arc<str>, Vec<(usize, TaskResult)>> =
2110                IndexMap::new();
2111
2112            // Wait for all spawned tasks to complete (with cancellation support)
2113            loop {
2114                tokio::select! {
2115                    biased;
2116                    // Check for cancellation first (biased ensures priority)
2117                    _ = self.cancel_token.cancelled() => {
2118                        // Abort all pending tasks
2119                        join_set.abort_all();
2120
2121                        let duration = workflow_start.elapsed();
2122                        // Collect IDs of tasks that haven't completed yet
2123                        let running_tasks: Vec<Arc<str>> = self
2124                            .workflow
2125                            .tasks
2126                            .iter()
2127                            .filter(|t| !self.datastore.contains(&t.name))
2128                            .map(|t| Arc::from(t.name.as_str()))
2129                            .collect();
2130
2131                        self.event_log.emit(EventKind::WorkflowAborted {
2132                            reason: "Workflow cancelled during execution".to_string(),
2133                            duration_ms: duration.as_millis() as u64,
2134                            running_tasks,
2135                        });
2136                        self.write_trace();
2137                        return Err(NikaError::WorkflowCancelled {
2138                            phase: "during execution".to_string(),
2139                        });
2140                    }
2141                    // Wait for next task result
2142                    result = join_set.join_next() => {
2143                        match result {
2144                            Some(Ok(iteration_result)) => {
2145                                let IterationResult {
2146                                    store_id,
2147                                    result: task_result,
2148                                    for_each_info,
2149                                    artifact_paths: _,
2150                                } = iteration_result;
2151
2152                                _completed += 1;
2153                                let success = task_result.is_success();
2154                                let skipped = task_result.is_skipped();
2155
2156                                // Render new events via CliRenderer
2157                                if let Some(ref mut r) = self.cli_renderer {
2158                                    self.event_log.with_events_since(r.last_rendered_id(), |events| {
2159                                        r.render_new_events(events);
2160                                    });
2161                                }
2162
2163                                // Store individual result
2164                                self.datastore
2165                                    .insert(Arc::clone(&store_id), task_result.clone());
2166
2167                                // If this is a for_each failure with fail_fast,
2168                                // cancel only THIS parent's remaining iterations via its
2169                                // CancellationToken. This avoids killing unrelated sibling
2170                                // tasks from other for_each parents (Bug #26).
2171                                if !success && !skipped {
2172                                    if let Some((ref parent_id, _)) = for_each_info {
2173                                        if let Some(token) = for_each_cancel_tokens.get(parent_id) {
2174                                            if !token.is_cancelled() {
2175                                                debug!(
2176                                                    store_id = %store_id,
2177                                                    parent_id = %parent_id,
2178                                                    "Triggering fail_fast cancellation for parent"
2179                                                );
2180                                                token.cancel();
2181                                            }
2182                                        }
2183                                    }
2184                                }
2185
2186                                // If this is a for_each iteration, collect for aggregation
2187                                if let Some((parent_id, idx)) = for_each_info {
2188                                    for_each_results
2189                                        .entry(parent_id)
2190                                        .or_default()
2191                                        .push((idx, task_result));
2192                                }
2193                            }
2194                            Some(Err(e)) => {
2195                                // Task was cancelled or panicked
2196                                if e.is_cancelled() {
2197                                    // Task was cancelled (workflow abort or fail_fast) - expected
2198                                    debug!("Task cancelled (workflow abort or fail_fast)");
2199                                    // Continue collecting remaining results
2200                                } else {
2201                                    // EMIT: WorkflowFailed (task panic)
2202                                    self.event_log.emit(EventKind::WorkflowFailed {
2203                                        error: format!("Task panicked: {}", e),
2204                                        failed_task: None,
2205                                    });
2206                                    self.write_trace();
2207                                    return Err(NikaError::TaskPanicked { reason: format!("{}", e) });
2208                                }
2209                            }
2210                            None => {
2211                                // All tasks in this batch completed
2212                                break;
2213                            }
2214                        }
2215                    }
2216                }
2217            }
2218
2219            // Aggregate for_each results into parent task
2220            for (parent_id, mut results) in for_each_results {
2221                // Sort by index to preserve order
2222                results.sort_by_key(|(idx, _)| *idx);
2223
2224                // Collect outputs into JSON array
2225                let outputs: Vec<Value> = results
2226                    .iter()
2227                    .map(|(_, r)| {
2228                        // Try to parse as JSON, fall back to string
2229                        let output_str = r.output_str();
2230                        serde_json::from_str(&output_str)
2231                            .unwrap_or(Value::String(output_str.into_owned()))
2232                    })
2233                    .collect();
2234
2235                // Calculate aggregate duration and success
2236                let total_duration: std::time::Duration =
2237                    results.iter().map(|(_, r)| r.duration).sum();
2238                let all_success = results.iter().all(|(_, r)| r.is_success());
2239
2240                // Merge media refs from all successful iterations
2241                let merged_media: Vec<crate::media::MediaRef> = results
2242                    .iter()
2243                    .filter(|(_, r)| r.is_success())
2244                    .flat_map(|(_, r)| r.media.iter().cloned())
2245                    .collect();
2246
2247                // Create aggregated result with JSON array + merged media
2248                let aggregated_result = if all_success {
2249                    TaskResult::success(Value::Array(outputs), total_duration)
2250                        .with_media(merged_media)
2251                } else {
2252                    // Collect errors
2253                    let errors: Vec<String> = results
2254                        .iter()
2255                        .filter_map(|(idx, r)| r.error().map(|e| format!("[{}]: {}", idx, e)))
2256                        .collect();
2257                    // Preserve partial results in output even on failure
2258                    let mut result = TaskResult::failed(errors.join("; "), total_duration)
2259                        .with_media(merged_media);
2260                    result.output = Arc::new(Value::Array(outputs));
2261                    result
2262                };
2263
2264                // Emit ForEachCompleted before storing (parent_id is consumed by insert)
2265                self.event_log.emit(EventKind::ForEachCompleted {
2266                    task_id: Arc::clone(&parent_id),
2267                    total: results.len() as u32,
2268                    succeeded: results.iter().filter(|(_, r)| r.is_success()).count() as u32,
2269                    failed: results
2270                        .iter()
2271                        .filter(|(_, r)| !r.is_success() && !r.is_skipped())
2272                        .count() as u32,
2273                    skipped: results.iter().filter(|(_, r)| r.is_skipped()).count() as u32,
2274                    duration_ms: total_duration.as_millis() as u64,
2275                });
2276
2277                // Store aggregated result under parent ID
2278                self.datastore.insert(parent_id, aggregated_result);
2279            }
2280        }
2281
2282        // Verify media integrity (warn-only, never fail successful workflows)
2283        let media_warnings = self.verify_media_integrity();
2284
2285        // Lockfile is removed automatically when `_lockfile_guard` drops
2286        // (at function exit -- normal return, error, or panic).
2287
2288        // Get final output
2289        let output = self.get_final_output().unwrap_or_default();
2290
2291        // EMIT: WorkflowCompleted
2292        self.event_log.emit(EventKind::WorkflowCompleted {
2293            final_output: Arc::new(Value::String(output.clone())),
2294            total_duration_ms: workflow_start.elapsed().as_millis() as u64,
2295        });
2296
2297        if media_warnings > 0 {
2298            tracing::warn!(
2299                warnings = media_warnings,
2300                "Media integrity check completed with warnings"
2301            );
2302        }
2303
2304        // Write execution trace to .nika/traces/
2305        let trace_path = self.write_trace();
2306
2307        if let Some(ref mut renderer) = self.cli_renderer {
2308            self.event_log
2309                .with_events_since(renderer.last_rendered_id(), |events| {
2310                    renderer.render_new_events(events);
2311                });
2312            let total_duration_ms = workflow_start.elapsed().as_millis() as u64;
2313            if self.quiet {
2314                renderer.render_quiet_summary(total_duration_ms);
2315            } else {
2316                renderer.render_summary(total_duration_ms, trace_path.as_deref());
2317            }
2318        } else if !self.quiet {
2319            let elapsed = workflow_start.elapsed();
2320            let elapsed_str = if elapsed.as_secs() >= 60 {
2321                format!(
2322                    "{}m {:.1}s",
2323                    elapsed.as_secs() / 60,
2324                    elapsed.as_secs_f64() % 60.0
2325                )
2326            } else {
2327                format!("{:.1}s", elapsed.as_secs_f64())
2328            };
2329            let events = self.event_log.events();
2330            let (total_tokens, total_cost) =
2331                events.iter().fold((0u64, 0.0f64), |(tokens, cost), e| {
2332                    if let EventKind::ProviderResponded {
2333                        input_tokens,
2334                        output_tokens,
2335                        cost_usd,
2336                        ..
2337                    } = &e.kind
2338                    {
2339                        (tokens + input_tokens + output_tokens, cost + cost_usd)
2340                    } else {
2341                        (tokens, cost)
2342                    }
2343                });
2344            // PERF(M2): Reuse cached_depths from earlier computation
2345            let task_count = self.workflow.tasks.len();
2346            let parallel_count = if let Some(ref depths) = cached_depths {
2347                let max_layer = depths.values().copied().max().unwrap_or(0);
2348                let mut layers: Vec<Vec<&str>> = vec![Vec::new(); max_layer + 1];
2349                for task in &self.workflow.tasks {
2350                    if let Some(&layer) = depths.get(task.name.as_str()) {
2351                        layers[layer].push(task.name.as_str());
2352                    }
2353                }
2354                layers
2355                    .iter()
2356                    .filter(|l| l.len() > 1)
2357                    .flat_map(|l| l.iter())
2358                    .count()
2359            } else {
2360                0
2361            };
2362            crate::display::print_done_summary(
2363                &elapsed_str,
2364                total_tokens,
2365                total_cost,
2366                trace_path.as_deref(),
2367                task_count,
2368                parallel_count,
2369            );
2370        }
2371
2372        // Gracefully shut down MCP server processes to avoid orphans
2373        self.executor.shutdown_mcp().await;
2374
2375        Ok(output)
2376    }
2377}
2378
2379#[cfg(test)]
2380mod tests {
2381    use super::*;
2382    use crate::ast::analyzed::{
2383        AnalyzedExecAction, AnalyzedForEach, AnalyzedInferAction, AnalyzedOutput, AnalyzedTask,
2384        AnalyzedTaskAction, AnalyzedWorkflow, OutputFormat as AnalyzedOutputFormat, TaskId,
2385        TaskTable,
2386    };
2387    use crate::ast::schema::SchemaVersion;
2388    use crate::ast::structured::StructuredOutputSpec;
2389    use crate::binding::types::{BindingPath, BindingSource};
2390    use crate::binding::{WithEntry, WithSpec};
2391    use crate::source::Span;
2392    use indexmap::IndexMap;
2393    use serde_json::json;
2394    use std::time::Duration;
2395
2396    // ═══════════════════════════════════════════════════════════════
2397    // QUIET MODE TEST
2398    // ═══════════════════════════════════════════════════════════════
2399
2400    fn make_empty_workflow() -> AnalyzedWorkflow {
2401        AnalyzedWorkflow {
2402            schema_version: SchemaVersion::V03,
2403            name: None,
2404            description: None,
2405            provider: Some("mock".to_string()),
2406            model: None,
2407            task_table: TaskTable::new(),
2408            tasks: vec![],
2409            mcp_servers: IndexMap::new(),
2410            context_files: vec![],
2411            imports: vec![],
2412            inputs: IndexMap::new(),
2413            artifacts: None,
2414            log: None,
2415            agents: None,
2416            skills_map: std::collections::HashMap::new(),
2417            span: Span::dummy(),
2418        }
2419    }
2420
2421    #[test]
2422    fn test_runner_quiet_mode() {
2423        // Default should not be quiet
2424        let runner = Runner::new(make_empty_workflow()).unwrap();
2425        assert!(!runner.quiet, "Runner should not be quiet by default");
2426
2427        // quiet() should enable quiet mode
2428        let runner = Runner::new(make_empty_workflow()).unwrap().quiet();
2429        assert!(runner.quiet, "Runner should be quiet after .quiet()");
2430
2431        // Can chain with with_event_log
2432        let event_log = crate::event::EventLog::new();
2433        let runner = Runner::with_event_log(make_empty_workflow(), event_log)
2434            .unwrap()
2435            .quiet();
2436        assert!(runner.quiet, "Runner should be quiet when chained");
2437    }
2438
2439    // ═══════════════════════════════════════════════════════════════
2440    // INITIAL CONTEXT TESTS
2441    // ═══════════════════════════════════════════════════════════════
2442
2443    #[test]
2444    fn test_with_initial_context_stores_value() {
2445        use serde_json::json;
2446
2447        let workflow = make_empty_workflow();
2448        let runner = Runner::new(workflow).unwrap().with_initial_context(
2449            "__parent_context__",
2450            json!({"key": "value", "nested": {"deep": true}}),
2451        );
2452
2453        // Context should be stored in datastore
2454        let result = runner.datastore.get("__parent_context__");
2455        assert!(result.is_some(), "Context should be stored");
2456
2457        let stored = result.unwrap();
2458        assert!(stored.is_success(), "Should be stored as success");
2459
2460        let output = stored.output_str();
2461        assert!(output.contains("key"), "Should contain 'key'");
2462        assert!(output.contains("value"), "Should contain 'value'");
2463    }
2464
2465    #[test]
2466    fn test_with_initial_context_chaining() {
2467        use serde_json::json;
2468
2469        // Should chain with other builder methods
2470        let workflow = make_empty_workflow();
2471        let event_log = EventLog::new();
2472        let runner = Runner::with_event_log(workflow, event_log)
2473            .unwrap()
2474            .quiet()
2475            .with_initial_context("test_ctx", json!({"test": 123}));
2476
2477        assert!(runner.quiet, "Should be quiet");
2478        assert!(
2479            runner.datastore.get("test_ctx").is_some(),
2480            "Context should exist"
2481        );
2482    }
2483
2484    // ═══════════════════════════════════════════════════════════════
2485    // FOR_EACH RESULT AGGREGATION TESTS
2486    // ═══════════════════════════════════════════════════════════════
2487
2488    /// Helper to create a workflow with a single for_each exec task
2489    fn create_for_each_workflow(
2490        task_id: &str,
2491        items_json: &str,
2492        as_var: &str,
2493        command: &str,
2494        concurrency: Option<u32>,
2495        fail_fast: bool,
2496        shell: bool,
2497    ) -> AnalyzedWorkflow {
2498        let mut task_table = TaskTable::new();
2499        task_table.insert(task_id);
2500        let tid = task_table.get_id(task_id).unwrap();
2501
2502        let task = AnalyzedTask {
2503            id: tid,
2504            name: task_id.to_string(),
2505            description: None,
2506            action: AnalyzedTaskAction::Exec(AnalyzedExecAction {
2507                command: command.to_string(),
2508                shell,
2509                cwd: None,
2510                env: IndexMap::new(),
2511                timeout_ms: None,
2512                span: Span::dummy(),
2513            }),
2514            provider: None,
2515            model: None,
2516            with_spec: Default::default(),
2517            depends_on: vec![],
2518            implicit_deps: vec![],
2519            output: None,
2520            for_each: Some(AnalyzedForEach {
2521                items: items_json.to_string(),
2522                as_var: as_var.to_string(),
2523                concurrency,
2524                fail_fast,
2525                span: Span::dummy(),
2526            }),
2527            retry: None,
2528            decompose: None,
2529            concurrency: None,
2530            fail_fast: None,
2531            artifact: None,
2532            log: None,
2533            structured: None,
2534            span: Span::dummy(),
2535        };
2536
2537        AnalyzedWorkflow {
2538            schema_version: SchemaVersion::V03,
2539            name: None,
2540            description: None,
2541            provider: Some("mock".to_string()),
2542            model: None,
2543            task_table,
2544            tasks: vec![task],
2545            mcp_servers: IndexMap::new(),
2546            context_files: vec![],
2547            imports: vec![],
2548            inputs: IndexMap::new(),
2549            artifacts: None,
2550            log: None,
2551            agents: None,
2552            skills_map: std::collections::HashMap::new(),
2553            span: Span::dummy(),
2554        }
2555    }
2556
2557    #[tokio::test]
2558    async fn test_for_each_collects_all_results() {
2559        let workflow = create_for_each_workflow(
2560            "echo_items",
2561            r#"["a", "b", "c"]"#,
2562            "item",
2563            "echo {{with.item}}",
2564            None,  // sequential
2565            true,  // fail_fast default
2566            false, // no shell
2567        );
2568
2569        let mut runner = Runner::new(workflow).unwrap();
2570        let result = runner.run().await;
2571        assert!(
2572            result.is_ok(),
2573            "Workflow should complete: {:?}",
2574            result.err()
2575        );
2576
2577        let parent_result = runner.datastore.get("echo_items");
2578        assert!(parent_result.is_some(), "Parent task result should exist");
2579
2580        let result = parent_result.unwrap();
2581        let output = result.output_str();
2582        let has_a = output.contains("a") || output.contains("\"a\"");
2583        let has_b = output.contains("b") || output.contains("\"b\"");
2584        let has_c = output.contains("c") || output.contains("\"c\"");
2585
2586        assert!(
2587            has_a && has_b && has_c,
2588            "Output should contain all 3 results, got: {}",
2589            output
2590        );
2591    }
2592
2593    #[tokio::test]
2594    async fn test_for_each_preserves_order() {
2595        let workflow = create_for_each_workflow(
2596            "ordered",
2597            r#"["first", "second", "third"]"#,
2598            "x",
2599            "echo {{with.x}}",
2600            None,
2601            true,
2602            false,
2603        );
2604
2605        let mut runner = Runner::new(workflow).unwrap();
2606        runner.run().await.unwrap();
2607
2608        let parent_result = runner.datastore.get("ordered");
2609        assert!(parent_result.is_some(), "Parent task result should exist");
2610
2611        let result = parent_result.unwrap();
2612        let output = result.output_str();
2613        if let Ok(arr) = serde_json::from_str::<Vec<serde_json::Value>>(&output) {
2614            assert_eq!(arr.len(), 3, "Should have 3 results");
2615            let first = arr[0].as_str().unwrap_or("");
2616            let last = arr[2].as_str().unwrap_or("");
2617            assert!(
2618                first.contains("first"),
2619                "First element should contain 'first'"
2620            );
2621            assert!(
2622                last.contains("third"),
2623                "Last element should contain 'third'"
2624            );
2625        }
2626    }
2627
2628    // ═══════════════════════════════════════════════════════════════
2629    // BASIC WORKFLOW TESTS
2630    // ═══════════════════════════════════════════════════════════════
2631
2632    /// Helper to create a minimal workflow with exec tasks
2633    fn create_exec_workflow(
2634        tasks: Vec<(&str, &str)>,
2635        edges: Vec<(&str, &str)>,
2636    ) -> AnalyzedWorkflow {
2637        let mut task_table = TaskTable::new();
2638        for (id, _) in &tasks {
2639            task_table.insert(id);
2640        }
2641
2642        let analyzed_tasks: Vec<AnalyzedTask> = tasks
2643            .into_iter()
2644            .map(|(id, cmd)| {
2645                let task_id = task_table.get_id(id).unwrap();
2646                let depends_on: Vec<_> = edges
2647                    .iter()
2648                    .filter(|(_, tgt)| *tgt == id)
2649                    .filter_map(|(src, _)| task_table.get_id(src))
2650                    .collect();
2651                AnalyzedTask {
2652                    id: task_id,
2653                    name: id.to_string(),
2654                    description: None,
2655                    action: AnalyzedTaskAction::Exec(AnalyzedExecAction {
2656                        command: cmd.to_string(),
2657                        shell: false,
2658                        cwd: None,
2659                        env: IndexMap::new(),
2660                        timeout_ms: None,
2661                        span: Span::dummy(),
2662                    }),
2663                    provider: None,
2664                    model: None,
2665                    with_spec: Default::default(),
2666                    depends_on,
2667                    implicit_deps: vec![],
2668                    output: None,
2669                    for_each: None,
2670                    retry: None,
2671                    decompose: None,
2672                    concurrency: None,
2673                    fail_fast: None,
2674                    artifact: None,
2675                    log: None,
2676                    structured: None,
2677                    span: Span::dummy(),
2678                }
2679            })
2680            .collect();
2681
2682        AnalyzedWorkflow {
2683            schema_version: SchemaVersion::V01,
2684            name: None,
2685            description: None,
2686            provider: Some("mock".to_string()),
2687            model: None,
2688            task_table,
2689            tasks: analyzed_tasks,
2690            mcp_servers: IndexMap::new(),
2691            context_files: vec![],
2692            imports: vec![],
2693            inputs: IndexMap::new(),
2694            artifacts: None,
2695            log: None,
2696            agents: None,
2697            skills_map: std::collections::HashMap::new(),
2698            span: Span::dummy(),
2699        }
2700    }
2701
2702    #[tokio::test]
2703    async fn event_sequence_for_single_task() {
2704        let workflow = create_exec_workflow(vec![("greet", "echo hello")], vec![]);
2705        let mut runner = Runner::new(workflow).unwrap();
2706
2707        let result = runner.run().await.unwrap();
2708        assert_eq!(result, "hello");
2709
2710        // Verify event sequence
2711        let events = runner.event_log().events();
2712
2713        // Expected sequence:
2714        // 1. WorkflowStarted
2715        // 2. TaskScheduled
2716        // 3. TaskStarted (with inputs from ResolvedBindings)
2717        // 4. TemplateResolved (from executor)
2718        // 5. TaskCompleted
2719        // 6. WorkflowCompleted
2720
2721        assert!(
2722            events.len() >= 5,
2723            "Expected at least 5 events, got {}",
2724            events.len()
2725        );
2726
2727        // First event should be WorkflowStarted
2728        assert!(matches!(
2729            &events[0].kind,
2730            EventKind::WorkflowStarted { task_count: 1, .. }
2731        ));
2732
2733        // Last event should be WorkflowCompleted
2734        let last = events.last().unwrap();
2735        assert!(matches!(&last.kind, EventKind::WorkflowCompleted { .. }));
2736
2737        // Verify task events exist
2738        let task_events = runner.event_log().filter_task("greet");
2739        assert!(task_events.len() >= 3, "Expected at least 3 task events");
2740
2741        // Verify TaskCompleted with correct output
2742        let completed = task_events
2743            .iter()
2744            .find(|e| matches!(&e.kind, EventKind::TaskCompleted { .. }));
2745        assert!(completed.is_some(), "TaskCompleted event not found");
2746    }
2747
2748    #[tokio::test]
2749    async fn event_sequence_for_chained_tasks() {
2750        // Two tasks: greet -> shout (shout depends on greet)
2751        let workflow = create_exec_workflow(
2752            vec![("greet", "echo hello"), ("shout", "echo DONE")],
2753            vec![("greet", "shout")],
2754        );
2755        let mut runner = Runner::new(workflow).unwrap();
2756
2757        runner.run().await.unwrap();
2758
2759        let events = runner.event_log().events();
2760
2761        // Verify WorkflowStarted with correct task count
2762        assert!(matches!(
2763            &events[0].kind,
2764            EventKind::WorkflowStarted { task_count: 2, .. }
2765        ));
2766
2767        // Verify both tasks have complete event sequences
2768        let greet_events = runner.event_log().filter_task("greet");
2769        let shout_events = runner.event_log().filter_task("shout");
2770
2771        assert!(!greet_events.is_empty(), "greet task events missing");
2772        assert!(!shout_events.is_empty(), "shout task events missing");
2773
2774        // Verify order: greet TaskCompleted must come before shout TaskStarted
2775        let greet_completed_id = greet_events
2776            .iter()
2777            .find(|e| matches!(&e.kind, EventKind::TaskCompleted { .. }))
2778            .map(|e| e.id);
2779        let shout_started_id = shout_events
2780            .iter()
2781            .find(|e| matches!(&e.kind, EventKind::TaskStarted { .. }))
2782            .map(|e| e.id);
2783
2784        assert!(greet_completed_id.is_some());
2785        assert!(shout_started_id.is_some());
2786        assert!(
2787            greet_completed_id.unwrap() < shout_started_id.unwrap(),
2788            "greet should complete before shout starts"
2789        );
2790    }
2791
2792    #[tokio::test]
2793    async fn event_sequence_for_parallel_tasks() {
2794        // Two independent tasks that can run in parallel
2795        let workflow = create_exec_workflow(
2796            vec![("task_a", "echo A"), ("task_b", "echo B")],
2797            vec![], // No dependencies = parallel
2798        );
2799        let mut runner = Runner::new(workflow).unwrap();
2800
2801        runner.run().await.unwrap();
2802
2803        let events = runner.event_log().events();
2804
2805        // Verify WorkflowStarted
2806        assert!(matches!(
2807            &events[0].kind,
2808            EventKind::WorkflowStarted { task_count: 2, .. }
2809        ));
2810
2811        // Both tasks should have been scheduled
2812        let scheduled: Vec<_> = events
2813            .iter()
2814            .filter(|e| matches!(&e.kind, EventKind::TaskScheduled { .. }))
2815            .collect();
2816        assert_eq!(scheduled.len(), 2, "Both tasks should be scheduled");
2817
2818        // Both tasks should complete
2819        let completed: Vec<_> = events
2820            .iter()
2821            .filter(|e| matches!(&e.kind, EventKind::TaskCompleted { .. }))
2822            .collect();
2823        assert_eq!(completed.len(), 2, "Both tasks should complete");
2824
2825        // WorkflowCompleted should be last
2826        let last = events.last().unwrap();
2827        assert!(matches!(&last.kind, EventKind::WorkflowCompleted { .. }));
2828    }
2829
2830    #[tokio::test]
2831    async fn event_ids_are_monotonic() {
2832        let workflow = create_exec_workflow(
2833            vec![("a", "echo 1"), ("b", "echo 2"), ("c", "echo 3")],
2834            vec![("a", "b"), ("b", "c")],
2835        );
2836        let mut runner = Runner::new(workflow).unwrap();
2837
2838        runner.run().await.unwrap();
2839
2840        let events = runner.event_log().events();
2841        let ids: Vec<u64> = events.iter().map(|e| e.id).collect();
2842
2843        // Verify monotonic and sequential
2844        for (i, &id) in ids.iter().enumerate() {
2845            assert_eq!(id, i as u64, "IDs should be sequential from 0");
2846        }
2847    }
2848
2849    #[tokio::test]
2850    async fn timestamps_are_relative_and_increasing() {
2851        let workflow = create_exec_workflow(
2852            vec![("fast", "echo quick"), ("slow", "echo done")],
2853            vec![("fast", "slow")],
2854        );
2855        let mut runner = Runner::new(workflow).unwrap();
2856
2857        runner.run().await.unwrap();
2858
2859        let events = runner.event_log().events();
2860
2861        // First timestamp should be small (relative to start)
2862        // Use generous 5000ms threshold for CI environments under load
2863        assert!(
2864            events[0].timestamp_ms < 5000,
2865            "First event should be near start (got {}ms, expected < 5000ms)",
2866            events[0].timestamp_ms
2867        );
2868
2869        // Timestamps should generally increase
2870        for window in events.windows(2) {
2871            assert!(
2872                window[1].timestamp_ms >= window[0].timestamp_ms,
2873                "Timestamps should not decrease"
2874            );
2875        }
2876    }
2877
2878    #[tokio::test]
2879    async fn failed_task_emits_task_failed_event() {
2880        let workflow = create_exec_workflow(vec![("fail", "exit 1")], vec![]);
2881        let mut runner = Runner::new(workflow).unwrap();
2882
2883        // Workflow run() now returns Err when tasks fail (NIKA-084)
2884        let result = runner.run().await;
2885        assert!(
2886            result.is_err(),
2887            "workflow should return Err when tasks fail"
2888        );
2889
2890        let events = runner.event_log().filter_task("fail");
2891        let failed = events
2892            .iter()
2893            .find(|e| matches!(&e.kind, EventKind::TaskFailed { .. }));
2894
2895        assert!(failed.is_some(), "TaskFailed event should be emitted");
2896    }
2897
2898    #[tokio::test]
2899    async fn template_resolved_event_captures_before_and_after() {
2900        // Create workflow with task that has a command
2901        let workflow = create_exec_workflow(vec![("echo_test", "echo hello world")], vec![]);
2902        let mut runner = Runner::new(workflow).unwrap();
2903
2904        runner.run().await.unwrap();
2905
2906        let events = runner.event_log().filter_task("echo_test");
2907        let template_event = events
2908            .iter()
2909            .find(|e| matches!(&e.kind, EventKind::TemplateResolved { .. }));
2910
2911        assert!(template_event.is_some(), "TemplateResolved event expected");
2912
2913        if let EventKind::TemplateResolved {
2914            template, result, ..
2915        } = &template_event.unwrap().kind
2916        {
2917            assert_eq!(template, "echo hello world");
2918            assert_eq!(result, "echo hello world");
2919        }
2920    }
2921
2922    #[tokio::test]
2923    async fn event_log_to_json_serializes_correctly() {
2924        let workflow = create_exec_workflow(vec![("simple", "echo test")], vec![]);
2925        let mut runner = Runner::new(workflow).unwrap();
2926
2927        runner.run().await.unwrap();
2928
2929        let json = runner.event_log().to_json();
2930        assert!(json.is_array());
2931
2932        let array = json.as_array().unwrap();
2933        assert!(!array.is_empty());
2934
2935        // Verify structure of first event
2936        let first = &array[0];
2937        assert!(first.get("id").is_some());
2938        assert!(first.get("timestamp_ms").is_some());
2939        assert!(first.get("kind").is_some());
2940        assert_eq!(first["kind"]["type"], "workflow_started");
2941    }
2942
2943    // ═══════════════════════════════════════════════════════════════
2944    // UNIT TESTS FOR RUNNER INTERNAL METHODS
2945    // ═══════════════════════════════════════════════════════════════
2946
2947    #[test]
2948    fn get_ready_tasks_returns_tasks_with_no_deps() {
2949        // Two independent tasks - both should be ready
2950        let workflow = create_exec_workflow(
2951            vec![("a", "echo A"), ("b", "echo B")],
2952            vec![], // No flows = no dependencies
2953        );
2954        let runner = Runner::new(workflow).unwrap();
2955
2956        let ready = runner.get_ready_tasks();
2957        assert_eq!(ready.len(), 2, "Both tasks should be ready");
2958
2959        let names: Vec<&str> = ready.iter().map(|t| t.name.as_str()).collect();
2960        assert!(names.contains(&"a"), "Task 'a' should be ready");
2961        assert!(names.contains(&"b"), "Task 'b' should be ready");
2962    }
2963
2964    #[test]
2965    fn get_ready_tasks_respects_dependencies() {
2966        // Chain: a -> b -> c
2967        let workflow = create_exec_workflow(
2968            vec![("a", "echo A"), ("b", "echo B"), ("c", "echo C")],
2969            vec![("a", "b"), ("b", "c")],
2970        );
2971        let runner = Runner::new(workflow).unwrap();
2972
2973        let ready = runner.get_ready_tasks();
2974        assert_eq!(ready.len(), 1, "Only first task should be ready");
2975        assert_eq!(ready[0].name, "a", "Task 'a' should be ready");
2976    }
2977
2978    #[test]
2979    fn get_ready_tasks_excludes_completed_tasks() {
2980        let workflow = create_exec_workflow(vec![("only", "echo x")], vec![]);
2981        let runner = Runner::new(workflow).unwrap();
2982
2983        // Initially task is ready
2984        let ready = runner.get_ready_tasks();
2985        assert_eq!(ready.len(), 1);
2986
2987        // Mark task as done
2988        runner.datastore.insert(
2989            intern("only"),
2990            TaskResult::success_str("done", std::time::Duration::ZERO),
2991        );
2992
2993        // Now no tasks should be ready
2994        let ready = runner.get_ready_tasks();
2995        assert_eq!(ready.len(), 0, "Completed task should not be ready");
2996    }
2997
2998    #[test]
2999    fn all_done_returns_false_when_tasks_pending() {
3000        let workflow = create_exec_workflow(vec![("a", "echo A"), ("b", "echo B")], vec![]);
3001        let runner = Runner::new(workflow).unwrap();
3002
3003        assert!(!runner.all_done(), "Not all tasks are done initially");
3004    }
3005
3006    #[test]
3007    fn all_done_returns_true_when_all_completed() {
3008        let workflow = create_exec_workflow(vec![("a", "echo A"), ("b", "echo B")], vec![]);
3009        let runner = Runner::new(workflow).unwrap();
3010
3011        // Mark all tasks as done
3012        runner.datastore.insert(
3013            intern("a"),
3014            TaskResult::success_str("A", std::time::Duration::ZERO),
3015        );
3016        runner.datastore.insert(
3017            intern("b"),
3018            TaskResult::success_str("B", std::time::Duration::ZERO),
3019        );
3020
3021        assert!(runner.all_done(), "All tasks should be done");
3022    }
3023
3024    #[test]
3025    fn get_final_output_returns_output_from_final_task() {
3026        // Chain: a -> b (b is final)
3027        let workflow =
3028            create_exec_workflow(vec![("a", "echo A"), ("b", "echo B")], vec![("a", "b")]);
3029        let runner = Runner::new(workflow).unwrap();
3030
3031        // Mark tasks as done
3032        runner.datastore.insert(
3033            intern("a"),
3034            TaskResult::success_str("A", std::time::Duration::ZERO),
3035        );
3036        runner.datastore.insert(
3037            intern("b"),
3038            TaskResult::success_str("final output", std::time::Duration::ZERO),
3039        );
3040
3041        let output = runner.get_final_output();
3042        assert!(output.is_some());
3043        assert_eq!(output.unwrap(), "final output");
3044    }
3045
3046    #[test]
3047    fn get_final_output_returns_none_when_no_results() {
3048        let workflow = create_exec_workflow(vec![("only", "echo x")], vec![]);
3049        let runner = Runner::new(workflow).unwrap();
3050
3051        let output = runner.get_final_output();
3052        assert!(output.is_none(), "No output when tasks not complete");
3053    }
3054
3055    #[test]
3056    fn get_final_output_skips_failed_tasks() {
3057        let workflow = create_exec_workflow(
3058            vec![("a", "echo A"), ("b", "exit 1")],
3059            vec![], // Both are final tasks (no successors)
3060        );
3061        let runner = Runner::new(workflow).unwrap();
3062
3063        // a succeeds, b fails
3064        runner.datastore.insert(
3065            intern("a"),
3066            TaskResult::success_str("success", std::time::Duration::ZERO),
3067        );
3068        runner.datastore.insert(
3069            intern("b"),
3070            TaskResult::failed("error", std::time::Duration::ZERO),
3071        );
3072
3073        let output = runner.get_final_output();
3074        assert!(output.is_some());
3075        assert_eq!(
3076            output.unwrap(),
3077            "success",
3078            "Should return successful task output"
3079        );
3080    }
3081
3082    // ═══════════════════════════════════════════════════════════════
3083    // FOR_EACH CONCURRENCY AND FAIL_FAST TESTS
3084    // ═══════════════════════════════════════════════════════════════
3085
3086    #[tokio::test]
3087    async fn for_each_with_explicit_concurrency() {
3088        let workflow = create_for_each_workflow(
3089            "concurrent",
3090            r#"["a", "b", "c", "d"]"#,
3091            "item",
3092            "echo {{with.item}}",
3093            Some(2), // Limit to 2 concurrent
3094            true,
3095            false,
3096        );
3097
3098        let mut runner = Runner::new(workflow).unwrap();
3099        let result = runner.run().await;
3100        assert!(
3101            result.is_ok(),
3102            "Workflow should complete: {:?}",
3103            result.err()
3104        );
3105
3106        let parent_result = runner.datastore.get("concurrent");
3107        assert!(parent_result.is_some(), "Parent task result should exist");
3108
3109        let result = parent_result.unwrap();
3110        let output = result.output_str();
3111        assert!(output.contains("a") || output.contains("\"a\""));
3112        assert!(output.contains("d") || output.contains("\"d\""));
3113    }
3114
3115    #[tokio::test]
3116    async fn for_each_fail_fast_stops_on_first_error() {
3117        let workflow = create_for_each_workflow(
3118            "failfast",
3119            r#"["ok1", "FAIL", "ok2", "ok3"]"#,
3120            "item",
3121            "test '{{with.item}}' != 'FAIL' && echo {{with.item}}",
3122            Some(1), // Sequential to make failure predictable
3123            true,    // fail_fast
3124            false,
3125        );
3126
3127        let mut runner = Runner::new(workflow).unwrap();
3128        let result = runner.run().await;
3129        // runner.run() now returns Err when tasks fail (NIKA-084)
3130        assert!(
3131            result.is_err(),
3132            "Workflow should fail when fail_fast triggers"
3133        );
3134    }
3135
3136    #[tokio::test]
3137    async fn for_each_fail_fast_false_continues_on_error() {
3138        let workflow = create_for_each_workflow(
3139            "continue",
3140            r#"["ok1", "ok2"]"#,
3141            "item",
3142            "echo {{with.item}}",
3143            None,
3144            false, // Explicitly disable fail_fast
3145            false,
3146        );
3147
3148        let mut runner = Runner::new(workflow).unwrap();
3149        let result = runner.run().await;
3150        assert!(result.is_ok(), "Workflow should complete");
3151
3152        let parent_result = runner.datastore.get("continue");
3153        assert!(parent_result.is_some());
3154    }
3155
3156    // ═══════════════════════════════════════════════════════════════
3157    // FOR_EACH WITH INPUTS.* SUPPORT
3158    // ═══════════════════════════════════════════════════════════════
3159
3160    /// Helper to create a for_each workflow with inputs
3161    fn create_for_each_with_inputs(
3162        task_id: &str,
3163        items_expr: &str,
3164        as_var: &str,
3165        command: &str,
3166        inputs: IndexMap<String, serde_json::Value>,
3167        concurrency: Option<u32>,
3168    ) -> AnalyzedWorkflow {
3169        let mut workflow = create_for_each_workflow(
3170            task_id,
3171            items_expr,
3172            as_var,
3173            command,
3174            concurrency,
3175            true,
3176            false,
3177        );
3178        workflow.inputs = inputs;
3179        workflow
3180    }
3181
3182    #[tokio::test]
3183    async fn for_each_with_dollar_inputs_array() {
3184        let mut inputs = IndexMap::new();
3185        inputs.insert(
3186            "items".to_string(),
3187            json!({
3188                "type": "array",
3189                "default": ["alpha", "beta", "gamma"]
3190            }),
3191        );
3192        let workflow = create_for_each_with_inputs(
3193            "process_items",
3194            "$inputs.items",
3195            "item",
3196            "echo {{with.item}}",
3197            inputs,
3198            None,
3199        );
3200
3201        let mut runner = Runner::new(workflow).unwrap();
3202        let result = runner.run().await;
3203        assert!(
3204            result.is_ok(),
3205            "Workflow should complete: {:?}",
3206            result.err()
3207        );
3208
3209        let task_result = runner.datastore.get("process_items");
3210        assert!(task_result.is_some(), "Task result should exist");
3211        assert!(task_result.unwrap().is_success(), "Task should succeed");
3212    }
3213
3214    #[tokio::test]
3215    async fn for_each_with_template_inputs() {
3216        let mut inputs = IndexMap::new();
3217        inputs.insert(
3218            "locales".to_string(),
3219            json!({
3220                "type": "array",
3221                "default": ["fr-FR", "en-US"]
3222            }),
3223        );
3224        let workflow = create_for_each_with_inputs(
3225            "translate",
3226            "{{inputs.locales}}",
3227            "locale",
3228            "echo Translating to {{with.locale}}",
3229            inputs,
3230            Some(2),
3231        );
3232
3233        let mut runner = Runner::new(workflow).unwrap();
3234        let result = runner.run().await;
3235        assert!(
3236            result.is_ok(),
3237            "Workflow should complete: {:?}",
3238            result.err()
3239        );
3240
3241        let task_result = runner.datastore.get("translate");
3242        assert!(task_result.is_some(), "Task result should exist");
3243        assert!(task_result.unwrap().is_success(), "Task should succeed");
3244    }
3245
3246    #[tokio::test]
3247    async fn for_each_with_inputs_missing_fails_gracefully() {
3248        let mut inputs = IndexMap::new();
3249        inputs.insert(
3250            "other_param".to_string(),
3251            json!({
3252                "type": "string",
3253                "default": "test"
3254            }),
3255        );
3256        let workflow = create_for_each_with_inputs(
3257            "missing_input",
3258            "$inputs.nonexistent",
3259            "item",
3260            "echo {{with.item}}",
3261            inputs,
3262            None,
3263        );
3264
3265        let mut runner = Runner::new(workflow).unwrap();
3266        let result = runner.run().await;
3267        // Now returns Err(DependencyChainFailed) when task fails (NIKA-084)
3268        assert!(result.is_err(), "Workflow should fail when task fails");
3269
3270        let task_result = runner.datastore.get("missing_input");
3271        assert!(task_result.is_some(), "Task result should exist");
3272        let tr = task_result.unwrap();
3273        assert!(!tr.is_success(), "Task should fail due to missing input");
3274        let error_msg = tr.error().expect("Failed task should have error message");
3275        assert!(
3276            error_msg.contains("not found"),
3277            "Error should mention 'not found': {}",
3278            error_msg
3279        );
3280    }
3281
3282    #[tokio::test]
3283    async fn for_each_with_inputs_nested_path() {
3284        let mut inputs = IndexMap::new();
3285        inputs.insert(
3286            "data".to_string(),
3287            json!({
3288                "type": "object",
3289                "default": {
3290                    "items": ["one", "two", "three"]
3291                }
3292            }),
3293        );
3294        let workflow = create_for_each_with_inputs(
3295            "nested",
3296            "$inputs.data.items",
3297            "n",
3298            "echo {{with.n}}",
3299            inputs,
3300            None,
3301        );
3302
3303        let mut runner = Runner::new(workflow).unwrap();
3304        let result = runner.run().await;
3305        assert!(
3306            result.is_ok(),
3307            "Workflow should complete: {:?}",
3308            result.err()
3309        );
3310
3311        let task_result = runner.datastore.get("nested");
3312        assert!(task_result.is_some(), "Task result should exist");
3313        assert!(task_result.unwrap().is_success(), "Task should succeed");
3314    }
3315
3316    // ═══════════════════════════════════════════════════════════════
3317    // for_each Pattern 2 ($alias) — nested paths + error
3318    // ═══════════════════════════════════════════════════════════════
3319
3320    /// Helper to create a 2-step workflow where step1 produces output, step2 iterates with for_each
3321    fn create_two_step_for_each_workflow(
3322        step1_cmd: &str,
3323        step1_shell: bool,
3324        for_each_items: &str,
3325        step2_cmd: &str,
3326    ) -> AnalyzedWorkflow {
3327        let mut task_table = TaskTable::new();
3328        task_table.insert("step1");
3329        task_table.insert("step2");
3330        let tid1 = task_table.get_id("step1").unwrap();
3331        let tid2 = task_table.get_id("step2").unwrap();
3332
3333        let step1 = AnalyzedTask {
3334            id: tid1,
3335            name: "step1".to_string(),
3336            description: None,
3337            action: AnalyzedTaskAction::Exec(AnalyzedExecAction {
3338                command: step1_cmd.to_string(),
3339                shell: step1_shell,
3340                cwd: None,
3341                env: IndexMap::new(),
3342                timeout_ms: None,
3343                span: Span::dummy(),
3344            }),
3345            provider: None,
3346            model: None,
3347            with_spec: Default::default(),
3348            depends_on: vec![],
3349            implicit_deps: vec![],
3350            output: None,
3351            for_each: None,
3352            retry: None,
3353            decompose: None,
3354            concurrency: None,
3355            fail_fast: None,
3356            artifact: None,
3357            log: None,
3358            structured: None,
3359            span: Span::dummy(),
3360        };
3361
3362        let mut with_spec = WithSpec::default();
3363        with_spec.insert(
3364            "step1".to_string(),
3365            WithEntry::simple(BindingPath {
3366                source: BindingSource::Task(intern("step1")),
3367                segments: vec![],
3368            }),
3369        );
3370
3371        let step2 = AnalyzedTask {
3372            id: tid2,
3373            name: "step2".to_string(),
3374            description: None,
3375            action: AnalyzedTaskAction::Exec(AnalyzedExecAction {
3376                command: step2_cmd.to_string(),
3377                shell: false,
3378                cwd: None,
3379                env: IndexMap::new(),
3380                timeout_ms: None,
3381                span: Span::dummy(),
3382            }),
3383            provider: None,
3384            model: None,
3385            with_spec,
3386            depends_on: vec![tid1],
3387            implicit_deps: vec![],
3388            output: None,
3389            for_each: Some(AnalyzedForEach {
3390                items: for_each_items.to_string(),
3391                as_var: "item".to_string(),
3392                concurrency: None,
3393                fail_fast: true,
3394                span: Span::dummy(),
3395            }),
3396            retry: None,
3397            decompose: None,
3398            concurrency: None,
3399            fail_fast: None,
3400            artifact: None,
3401            log: None,
3402            structured: None,
3403            span: Span::dummy(),
3404        };
3405
3406        AnalyzedWorkflow {
3407            schema_version: SchemaVersion::V03,
3408            name: None,
3409            description: None,
3410            provider: Some("mock".to_string()),
3411            model: None,
3412            task_table,
3413            tasks: vec![step1, step2],
3414            mcp_servers: IndexMap::new(),
3415            context_files: vec![],
3416            imports: vec![],
3417            inputs: IndexMap::new(),
3418            artifacts: None,
3419            log: None,
3420            agents: None,
3421            skills_map: std::collections::HashMap::new(),
3422            span: Span::dummy(),
3423        }
3424    }
3425
3426    #[tokio::test]
3427    async fn for_each_dollar_binding_nested_path() {
3428        let workflow = create_two_step_for_each_workflow(
3429            r#"echo '{"items": ["alpha", "beta", "gamma"], "count": 3}'"#,
3430            true,
3431            "$step1.items",
3432            "echo {{with.item}}",
3433        );
3434
3435        let mut runner = Runner::new(workflow).unwrap();
3436        let result = runner.run().await;
3437        assert!(
3438            result.is_ok(),
3439            "Workflow should complete: {:?}",
3440            result.err()
3441        );
3442
3443        let task_result = runner.datastore.get("step2");
3444        assert!(task_result.is_some(), "step2 result should exist");
3445        assert!(
3446            task_result.unwrap().is_success(),
3447            "step2 should succeed with 3 items from nested path"
3448        );
3449    }
3450
3451    #[tokio::test]
3452    async fn for_each_dollar_binding_non_array_errors() {
3453        let workflow = create_two_step_for_each_workflow(
3454            "echo not_an_array",
3455            false,
3456            "$step1",
3457            "echo {{with.item}}",
3458        );
3459
3460        let mut runner = Runner::new(workflow).unwrap();
3461        let _ = runner.run().await;
3462
3463        let task_result = runner.datastore.get("step2");
3464        assert!(task_result.is_some(), "step2 result should exist");
3465
3466        let result = task_result.unwrap();
3467        assert!(
3468            !result.is_success(),
3469            "step2 should FAIL when for_each binding resolves to non-array"
3470        );
3471        let error_msg = result.error().expect("should have error message");
3472        assert!(
3473            error_msg.contains("non-array"),
3474            "Error should mention 'non-array', got: {}",
3475            error_msg
3476        );
3477    }
3478
3479    #[tokio::test]
3480    async fn for_each_dollar_binding_json_string_array() {
3481        let workflow = create_two_step_for_each_workflow(
3482            r#"echo '["x","y","z"]'"#,
3483            true,
3484            "$step1",
3485            "echo {{with.item}}",
3486        );
3487
3488        let mut runner = Runner::new(workflow).unwrap();
3489        let result = runner.run().await;
3490        assert!(
3491            result.is_ok(),
3492            "Workflow should complete: {:?}",
3493            result.err()
3494        );
3495
3496        let task_result = runner.datastore.get("step2");
3497        assert!(task_result.is_some(), "step2 result should exist");
3498        assert!(
3499            task_result.unwrap().is_success(),
3500            "step2 should succeed — JSON string array should be parsed"
3501        );
3502    }
3503
3504    #[tokio::test]
3505    async fn for_each_dollar_binding_nested_path_not_found() {
3506        let workflow = create_two_step_for_each_workflow(
3507            r#"echo '{"data": {"count": 5}}'"#,
3508            true,
3509            "$step1.data.nonexistent",
3510            "echo {{with.item}}",
3511        );
3512
3513        let mut runner = Runner::new(workflow).unwrap();
3514        let _ = runner.run().await;
3515
3516        let task_result = runner.datastore.get("step2");
3517        assert!(task_result.is_some(), "step2 result should exist");
3518
3519        let result = task_result.unwrap();
3520        assert!(
3521            !result.is_success(),
3522            "step2 should FAIL when nested path segment doesn't exist"
3523        );
3524        let error_msg = result.error().expect("should have error message");
3525        assert!(
3526            error_msg.contains("not found"),
3527            "Error should mention path segment not found, got: {}",
3528            error_msg
3529        );
3530    }
3531
3532    // ═══════════════════════════════════════════════════════════════
3533    // FOR_EACH EMPTY ARRAY EDGE CASE
3534    // ═══════════════════════════════════════════════════════════════
3535
3536    #[tokio::test]
3537    async fn for_each_empty_array_completes_with_empty_result() {
3538        let workflow = create_for_each_workflow(
3539            "empty_loop",
3540            "[]", // empty JSON array
3541            "item",
3542            "echo {{with.item}}",
3543            None,
3544            true,
3545            false,
3546        );
3547
3548        let mut runner = Runner::new(workflow).unwrap();
3549        let result = runner.run().await;
3550        assert!(
3551            result.is_ok(),
3552            "Workflow with empty for_each should succeed, got: {:?}",
3553            result.err()
3554        );
3555
3556        // Parent task should have a result (empty array, not missing)
3557        let parent_result = runner.datastore.get("empty_loop");
3558        assert!(
3559            parent_result.is_some(),
3560            "for_each with empty array should store a result"
3561        );
3562
3563        let result = parent_result.unwrap();
3564        assert!(
3565            result.is_success(),
3566            "for_each with empty array should be success"
3567        );
3568
3569        // Output should be an empty array "[]"
3570        let output = result.output_str();
3571        assert_eq!(
3572            output.trim(),
3573            "[]",
3574            "for_each with empty array should produce empty array, got: {}",
3575            output
3576        );
3577    }
3578
3579    // ═══════════════════════════════════════════════════════════════
3580    // CONSTRUCTOR AND EVENT LOG TESTS
3581    // ═══════════════════════════════════════════════════════════════
3582
3583    #[test]
3584    fn with_event_log_uses_provided_event_log() {
3585        let workflow = create_exec_workflow(vec![("a", "echo A")], vec![]);
3586        let custom_log = EventLog::new();
3587        let runner = Runner::with_event_log(workflow, custom_log).unwrap();
3588
3589        // The runner should use the provided event log
3590        assert!(runner.event_log().events().is_empty());
3591    }
3592
3593    #[test]
3594    fn new_and_with_event_log_return_result() {
3595        // Valid workflow should return Ok
3596        let workflow = create_exec_workflow(vec![("a", "echo A")], vec![]);
3597        let result = Runner::new(workflow);
3598        assert!(
3599            result.is_ok(),
3600            "Runner::new should return Ok for a valid workflow"
3601        );
3602
3603        // Valid workflow with custom event log should return Ok
3604        let workflow = create_exec_workflow(vec![("a", "echo A")], vec![]);
3605        let event_log = EventLog::new();
3606        let result = Runner::with_event_log(workflow, event_log);
3607        assert!(
3608            result.is_ok(),
3609            "Runner::with_event_log should return Ok for a valid workflow"
3610        );
3611    }
3612
3613    #[tokio::test]
3614    async fn workflow_completed_event_has_duration() {
3615        let workflow = create_exec_workflow(vec![("quick", "echo fast")], vec![]);
3616        let mut runner = Runner::new(workflow).unwrap();
3617
3618        runner.run().await.unwrap();
3619
3620        let events = runner.event_log().events();
3621        let completed = events
3622            .iter()
3623            .find(|e| matches!(&e.kind, EventKind::WorkflowCompleted { .. }));
3624
3625        assert!(completed.is_some());
3626        // Verify the event has a duration field (u64 is inherently non-negative)
3627        assert!(matches!(
3628            &completed.unwrap().kind,
3629            EventKind::WorkflowCompleted {
3630                total_duration_ms: _,
3631                ..
3632            }
3633        ));
3634    }
3635
3636    #[tokio::test]
3637    async fn workflow_started_event_has_generation_id() {
3638        let workflow = create_exec_workflow(vec![("a", "echo A")], vec![]);
3639        let mut runner = Runner::new(workflow).unwrap();
3640
3641        runner.run().await.unwrap();
3642
3643        let events = runner.event_log().events();
3644        let started = events
3645            .iter()
3646            .find(|e| matches!(&e.kind, EventKind::WorkflowStarted { .. }));
3647
3648        assert!(started.is_some());
3649        if let EventKind::WorkflowStarted { generation_id, .. } = &started.unwrap().kind {
3650            assert!(
3651                generation_id.starts_with("gen-"),
3652                "Generation ID should have prefix"
3653            );
3654            assert!(
3655                generation_id.len() > 10,
3656                "Generation ID should include UUID"
3657            );
3658        }
3659    }
3660
3661    // ═══════════════════════════════════════════════════════════════
3662    // CANCELLATION TESTS
3663    // ═══════════════════════════════════════════════════════════════
3664
3665    #[test]
3666    fn test_cancel_token_default() {
3667        let workflow = make_empty_workflow();
3668        let runner = Runner::new(workflow).unwrap();
3669
3670        // Should not be cancelled by default
3671        assert!(
3672            !runner.is_cancelled(),
3673            "Runner should not be cancelled by default"
3674        );
3675    }
3676
3677    #[test]
3678    fn test_cancel_token_can_be_set() {
3679        let workflow = make_empty_workflow();
3680        let token = CancellationToken::new();
3681        let token_clone = token.clone();
3682
3683        let runner = Runner::new(workflow).unwrap().with_cancel_token(token);
3684
3685        // Cancelling the original token should be reflected
3686        token_clone.cancel();
3687        assert!(runner.is_cancelled(), "Runner should detect cancellation");
3688    }
3689
3690    #[test]
3691    fn test_cancel_token_cloning() {
3692        let workflow = make_empty_workflow();
3693        let runner = Runner::new(workflow).unwrap();
3694
3695        let token1 = runner.cancel_token();
3696        let token2 = runner.cancel_token();
3697
3698        // Both tokens should be clones of the same underlying token
3699        token1.cancel();
3700        assert!(token2.is_cancelled(), "Cloned tokens should share state");
3701        assert!(runner.is_cancelled(), "Runner should detect cancellation");
3702    }
3703
3704    #[tokio::test]
3705    async fn test_cancellation_before_start_returns_aborted() {
3706        // Create a slow workflow
3707        let workflow = create_exec_workflow(vec![("slow", "sleep 10")], vec![]);
3708        let token = CancellationToken::new();
3709
3710        let mut runner = Runner::new(workflow)
3711            .unwrap()
3712            .with_cancel_token(token.clone());
3713
3714        // Cancel before starting
3715        token.cancel();
3716
3717        let result = runner.run().await;
3718        assert!(result.is_err(), "Cancelled workflow should return error");
3719
3720        let err = result.unwrap_err();
3721        assert!(
3722            err.to_string().contains("cancelled") || err.to_string().contains("aborted"),
3723            "Error should mention cancellation: {}",
3724            err
3725        );
3726
3727        // Should emit WorkflowAborted event
3728        let events = runner.event_log().events();
3729        let aborted = events
3730            .iter()
3731            .find(|e| matches!(&e.kind, EventKind::WorkflowAborted { .. }));
3732        assert!(aborted.is_some(), "WorkflowAborted event should be emitted");
3733    }
3734
3735    #[tokio::test]
3736    async fn test_cancellation_during_execution_aborts_workflow() {
3737        use std::time::Duration;
3738
3739        // Create a workflow with a slow task
3740        let workflow = create_exec_workflow(vec![("slow", "sleep 5")], vec![]);
3741        let token = CancellationToken::new();
3742        let token_clone = token.clone();
3743
3744        let mut runner = Runner::new(workflow).unwrap().with_cancel_token(token);
3745
3746        // Spawn the workflow run in background
3747        let handle = tokio::spawn(async move { runner.run().await });
3748
3749        // Wait a bit then cancel
3750        tokio::time::sleep(Duration::from_millis(100)).await;
3751        token_clone.cancel();
3752
3753        // Should complete with error (not take 5 seconds)
3754        let result = tokio::time::timeout(Duration::from_secs(2), handle).await;
3755        assert!(
3756            result.is_ok(),
3757            "Cancellation should complete within 2 seconds"
3758        );
3759
3760        let workflow_result = result.unwrap().unwrap();
3761        assert!(
3762            workflow_result.is_err(),
3763            "Cancelled workflow should return error"
3764        );
3765    }
3766
3767    #[tokio::test]
3768    async fn test_workflow_aborted_event_has_running_tasks() {
3769        use std::time::Duration;
3770
3771        // Create workflow with parallel slow tasks
3772        let workflow = create_exec_workflow(
3773            vec![("slow1", "sleep 5"), ("slow2", "sleep 5")],
3774            vec![], // No deps = parallel
3775        );
3776        let token = CancellationToken::new();
3777        let token_clone = token.clone();
3778
3779        let event_log = EventLog::new();
3780        let event_log_clone = event_log.clone();
3781        let mut runner = Runner::with_event_log(workflow, event_log)
3782            .unwrap()
3783            .with_cancel_token(token);
3784
3785        // Spawn the workflow
3786        let run_handle = tokio::spawn(async move { runner.run().await });
3787
3788        // Wait for tasks to start, then cancel
3789        tokio::time::sleep(Duration::from_millis(100)).await;
3790        token_clone.cancel();
3791
3792        // Wait for abort
3793        let result = run_handle.await.unwrap();
3794        assert!(result.is_err(), "Cancelled workflow should return error");
3795
3796        // Check that WorkflowAborted event was emitted with running tasks
3797        let events = event_log_clone.events();
3798        let aborted = events
3799            .iter()
3800            .find(|e| matches!(&e.kind, EventKind::WorkflowAborted { .. }));
3801        assert!(aborted.is_some(), "WorkflowAborted event should be emitted");
3802
3803        if let EventKind::WorkflowAborted { running_tasks, .. } = &aborted.unwrap().kind {
3804            // At least one task should have been running
3805            assert!(
3806                !running_tasks.is_empty() || running_tasks.len() <= 2,
3807                "Should have captured running tasks (0-2 expected)"
3808            );
3809        }
3810    }
3811
3812    // ═══════════════════════════════════════════════════════════════
3813    // PAUSE/RESUME TESTS
3814    // ═══════════════════════════════════════════════════════════════
3815
3816    #[test]
3817    fn test_pause_state_default() {
3818        let workflow = make_empty_workflow();
3819        let runner = Runner::new(workflow).unwrap();
3820
3821        // Should not be paused by default
3822        assert!(
3823            !runner.is_paused(),
3824            "Runner should not be paused by default"
3825        );
3826    }
3827
3828    #[test]
3829    fn test_pause_and_resume() {
3830        let workflow = make_empty_workflow();
3831        let runner = Runner::new(workflow).unwrap();
3832
3833        // Initially not paused
3834        assert!(!runner.is_paused());
3835
3836        // Pause
3837        runner.pause();
3838        assert!(runner.is_paused(), "Runner should be paused after pause()");
3839
3840        // Resume
3841        runner.resume();
3842        assert!(
3843            !runner.is_paused(),
3844            "Runner should not be paused after resume()"
3845        );
3846    }
3847
3848    #[test]
3849    fn test_pause_handles_cloning() {
3850        let workflow = make_empty_workflow();
3851        let runner = Runner::new(workflow).unwrap();
3852
3853        let (paused1, notify1) = runner.pause_handles();
3854        let (paused2, _notify2) = runner.pause_handles();
3855
3856        // Both should share the same underlying state
3857        runner.pause();
3858        assert!(
3859            paused1.load(Ordering::SeqCst),
3860            "First handle should see paused state"
3861        );
3862        assert!(
3863            paused2.load(Ordering::SeqCst),
3864            "Second handle should see paused state"
3865        );
3866
3867        // Resume via runner
3868        runner.resume();
3869        assert!(
3870            !paused1.load(Ordering::SeqCst),
3871            "First handle should see resumed state"
3872        );
3873        assert!(
3874            !paused2.load(Ordering::SeqCst),
3875            "Second handle should see resumed state"
3876        );
3877
3878        // Verify notify exists (just access it to prove it's valid)
3879        notify1.notify_one();
3880    }
3881
3882    #[test]
3883    fn test_pause_emits_events() {
3884        let workflow = make_empty_workflow();
3885        let event_log = EventLog::new();
3886        let runner = Runner::with_event_log(workflow, event_log.clone()).unwrap();
3887
3888        // Pause and resume
3889        runner.pause();
3890        runner.resume();
3891
3892        // Check events
3893        let events = event_log.events();
3894        let paused = events
3895            .iter()
3896            .find(|e| matches!(&e.kind, EventKind::WorkflowPaused));
3897        let resumed = events
3898            .iter()
3899            .find(|e| matches!(&e.kind, EventKind::WorkflowResumed));
3900
3901        assert!(paused.is_some(), "WorkflowPaused event should be emitted");
3902        assert!(resumed.is_some(), "WorkflowResumed event should be emitted");
3903    }
3904
3905    #[tokio::test]
3906    async fn test_pause_waits_for_resume() {
3907        use std::sync::atomic::AtomicUsize;
3908        use std::time::Duration;
3909
3910        // Create a simple workflow
3911        let workflow = create_exec_workflow(vec![("task1", "echo done")], vec![]);
3912        let event_log = EventLog::new();
3913        let event_log_clone = event_log.clone();
3914        let mut runner = Runner::with_event_log(workflow, event_log).unwrap();
3915
3916        // Pause before running
3917        runner.pause();
3918
3919        let (paused, notify) = runner.pause_handles();
3920        let resume_count = Arc::new(AtomicUsize::new(0));
3921        let resume_count_clone = Arc::clone(&resume_count);
3922
3923        // Spawn the workflow
3924        let handle = tokio::spawn(async move { runner.run().await });
3925
3926        // Wait a bit - workflow should be waiting
3927        tokio::time::sleep(Duration::from_millis(100)).await;
3928
3929        // Check events - should not have completed yet
3930        {
3931            let events = event_log_clone.events();
3932            let completed = events
3933                .iter()
3934                .find(|e| matches!(&e.kind, EventKind::WorkflowCompleted { .. }));
3935            assert!(
3936                completed.is_none(),
3937                "Workflow should be paused, not completed"
3938            );
3939        }
3940
3941        // Resume
3942        paused.store(false, Ordering::SeqCst);
3943        notify.notify_one();
3944        resume_count_clone.fetch_add(1, Ordering::SeqCst);
3945
3946        // Now it should complete
3947        let result = tokio::time::timeout(Duration::from_secs(5), handle).await;
3948        assert!(result.is_ok(), "Workflow should complete after resume");
3949
3950        let inner_result = result.unwrap().unwrap();
3951        assert!(inner_result.is_ok(), "Workflow should succeed");
3952    }
3953
3954    // ═══════════════════════════════════════════════════════════════
3955    // VALUE_TO_ARRAY HELPER TESTS
3956    // ═══════════════════════════════════════════════════════════════
3957
3958    #[test]
3959    fn test_value_to_array_direct_array() {
3960        use serde_json::json;
3961
3962        let value = json!(["a", "b", "c"]);
3963        let result = value_to_array(&value);
3964        assert!(result.is_some());
3965        assert_eq!(result.unwrap().len(), 3);
3966    }
3967
3968    #[test]
3969    fn test_value_to_array_json_string() {
3970        use serde_json::json;
3971
3972        // String containing JSON array (common case from exec output)
3973        let value = json!(r#"["x","y","z"]"#);
3974        let result = value_to_array(&value);
3975        assert!(result.is_some(), "Should parse JSON array string");
3976        let arr = result.unwrap();
3977        assert_eq!(arr.len(), 3);
3978        assert_eq!(arr[0], "x");
3979        assert_eq!(arr[1], "y");
3980        assert_eq!(arr[2], "z");
3981    }
3982
3983    #[test]
3984    fn test_value_to_array_json_string_with_whitespace() {
3985        use serde_json::json;
3986
3987        // String with leading/trailing whitespace
3988        let value = json!("  [1, 2, 3]  ");
3989        let result = value_to_array(&value);
3990        assert!(result.is_some(), "Should handle whitespace");
3991        assert_eq!(result.unwrap().len(), 3);
3992    }
3993
3994    #[test]
3995    fn test_value_to_array_not_array_string() {
3996        use serde_json::json;
3997
3998        // String that's not a JSON array
3999        let value = json!("hello world");
4000        let result = value_to_array(&value);
4001        assert!(result.is_none(), "Should return None for non-array string");
4002    }
4003
4004    #[test]
4005    fn test_value_to_array_object() {
4006        use serde_json::json;
4007
4008        // Object should return None
4009        let value = json!({"key": "value"});
4010        let result = value_to_array(&value);
4011        assert!(result.is_none(), "Should return None for object");
4012    }
4013
4014    #[test]
4015    fn test_value_to_array_number() {
4016        use serde_json::json;
4017
4018        // Number should return None
4019        let value = json!(42);
4020        let result = value_to_array(&value);
4021        assert!(result.is_none(), "Should return None for number");
4022    }
4023
4024    #[test]
4025    fn test_value_to_array_nested_json_string() {
4026        use serde_json::json;
4027
4028        // Complex JSON array as string
4029        let value = json!(r#"[{"id": 1}, {"id": 2}]"#);
4030        let result = value_to_array(&value);
4031        assert!(result.is_some(), "Should parse complex JSON array string");
4032        let arr = result.unwrap();
4033        assert_eq!(arr.len(), 2);
4034        assert_eq!(arr[0]["id"], 1);
4035        assert_eq!(arr[1]["id"], 2);
4036    }
4037
4038    #[test]
4039    fn test_value_to_array_invalid_json_string() {
4040        use serde_json::json;
4041
4042        // Invalid JSON that looks like an array
4043        let value = json!("[not valid json");
4044        let result = value_to_array(&value);
4045        assert!(result.is_none(), "Should return None for invalid JSON");
4046    }
4047
4048    #[test]
4049    fn test_value_to_array_markdown_fenced_json_array() {
4050        let value = Value::String("```json\n[\"a\", \"b\", \"c\"]\n```".to_string());
4051        let result = value_to_array(&value);
4052        assert!(
4053            result.is_some(),
4054            "Should parse JSON array from markdown fence"
4055        );
4056        let arr = result.unwrap();
4057        assert_eq!(arr.len(), 3);
4058        assert_eq!(arr[0], json!("a"));
4059        assert_eq!(arr[1], json!("b"));
4060        assert_eq!(arr[2], json!("c"));
4061    }
4062
4063    #[test]
4064    fn test_value_to_array_plain_fenced_json_array() {
4065        let value = Value::String("```\n[1, 2, 3]\n```".to_string());
4066        let result = value_to_array(&value);
4067        assert!(result.is_some(), "Should parse JSON array from plain fence");
4068        let arr = result.unwrap();
4069        assert_eq!(arr.len(), 3);
4070        assert_eq!(arr[0], json!(1));
4071        assert_eq!(arr[1], json!(2));
4072        assert_eq!(arr[2], json!(3));
4073    }
4074
4075    #[test]
4076    fn test_value_to_array_bare_json_string_still_works() {
4077        let value = Value::String("[\"x\", \"y\"]".to_string());
4078        let result = value_to_array(&value);
4079        assert!(result.is_some(), "Bare JSON array string should still work");
4080        let arr = result.unwrap();
4081        assert_eq!(arr.len(), 2);
4082        assert_eq!(arr[0], json!("x"));
4083        assert_eq!(arr[1], json!("y"));
4084    }
4085
4086    #[test]
4087    fn test_value_to_array_direct_array_value_still_works() {
4088        let value = json!(["a", "b"]);
4089        let result = value_to_array(&value);
4090        assert!(result.is_some(), "Direct array value should still work");
4091        let arr = result.unwrap();
4092        assert_eq!(arr.len(), 2);
4093        assert_eq!(arr[0], json!("a"));
4094        assert_eq!(arr[1], json!("b"));
4095    }
4096
4097    #[test]
4098    fn test_value_to_array_non_array_string_returns_none() {
4099        let value = Value::String("just a string".to_string());
4100        let result = value_to_array(&value);
4101        assert!(result.is_none(), "Non-array string should return None");
4102    }
4103
4104    #[test]
4105    fn test_value_to_array_empty_array() {
4106        use serde_json::json;
4107
4108        // Empty array (direct)
4109        let value = json!([]);
4110        let result = value_to_array(&value);
4111        assert!(result.is_some());
4112        assert_eq!(result.unwrap().len(), 0);
4113
4114        // Empty array (as string)
4115        let value = json!("[]");
4116        let result = value_to_array(&value);
4117        assert!(result.is_some());
4118        assert_eq!(result.unwrap().len(), 0);
4119    }
4120
4121    // ═══════════════════════════════════════════════════════════════
4122    // GET_RETRY_CONFIG TESTS
4123    // ═══════════════════════════════════════════════════════════════
4124
4125    /// Helper to create an AnalyzedTask with an infer action.
4126    ///
4127    /// `output` controls the AnalyzedOutput (format + schema).
4128    /// `structured` controls the StructuredOutputSpec (max_retries).
4129    /// Both must be Some with matching values for retry to qualify.
4130    fn make_infer_task(
4131        name: &str,
4132        output: Option<AnalyzedOutput>,
4133        structured: Option<StructuredOutputSpec>,
4134    ) -> AnalyzedTask {
4135        AnalyzedTask {
4136            id: TaskId(0),
4137            name: name.to_string(),
4138            description: None,
4139            action: AnalyzedTaskAction::Infer(AnalyzedInferAction {
4140                prompt: "test prompt".to_string(),
4141                system: None,
4142                temperature: None,
4143                max_tokens: None,
4144                ..Default::default()
4145            }),
4146            provider: None,
4147            model: None,
4148            with_spec: Default::default(),
4149            depends_on: vec![],
4150            implicit_deps: vec![],
4151            output,
4152            for_each: None,
4153            retry: None,
4154            decompose: None,
4155            concurrency: None,
4156            fail_fast: None,
4157            artifact: None,
4158            log: None,
4159            structured,
4160            span: Span::dummy(),
4161        }
4162    }
4163
4164    #[test]
4165    fn test_get_retry_config_none_for_exec_task() {
4166        let task = AnalyzedTask {
4167            id: TaskId(0),
4168            name: "exec_task".to_string(),
4169            description: None,
4170            action: AnalyzedTaskAction::Exec(AnalyzedExecAction {
4171                command: "echo hi".to_string(),
4172                shell: false,
4173                cwd: None,
4174                env: IndexMap::new(),
4175                timeout_ms: None,
4176                span: Span::dummy(),
4177            }),
4178            provider: None,
4179            model: None,
4180            with_spec: Default::default(),
4181            depends_on: vec![],
4182            implicit_deps: vec![],
4183            output: Some(AnalyzedOutput {
4184                format: AnalyzedOutputFormat::Json,
4185                schema: Some(json!({"type": "object"})),
4186                schema_ref: None,
4187                max_retries: None,
4188                span: Span::dummy(),
4189            }),
4190            for_each: None,
4191            retry: None,
4192            decompose: None,
4193            concurrency: None,
4194            fail_fast: None,
4195            artifact: None,
4196            log: None,
4197            structured: Some(StructuredOutputSpec::with_inline_schema(
4198                json!({"type": "object"}),
4199            )),
4200            span: Span::dummy(),
4201        };
4202        assert!(
4203            Runner::get_retry_config(&task).is_none(),
4204            "Exec tasks should never qualify for retry"
4205        );
4206    }
4207
4208    #[test]
4209    fn test_get_retry_config_none_for_no_output() {
4210        let task = make_infer_task("no_output", None, None);
4211        assert!(
4212            Runner::get_retry_config(&task).is_none(),
4213            "No output means no retry"
4214        );
4215    }
4216
4217    #[test]
4218    fn test_get_retry_config_none_for_text_format() {
4219        let task = make_infer_task(
4220            "text_format",
4221            Some(AnalyzedOutput {
4222                format: AnalyzedOutputFormat::Text,
4223                schema: Some(json!({"type": "object"})),
4224                schema_ref: None,
4225                max_retries: None,
4226                span: Span::dummy(),
4227            }),
4228            Some(StructuredOutputSpec::with_inline_schema(
4229                json!({"type": "object"}),
4230            )),
4231        );
4232        assert!(
4233            Runner::get_retry_config(&task).is_none(),
4234            "Text format should not qualify for retry"
4235        );
4236    }
4237
4238    #[test]
4239    fn test_get_retry_config_none_for_json_no_schema() {
4240        let task = make_infer_task(
4241            "json_no_schema",
4242            Some(AnalyzedOutput {
4243                format: AnalyzedOutputFormat::Json,
4244                schema: None,
4245                schema_ref: None,
4246                max_retries: None,
4247                span: Span::dummy(),
4248            }),
4249            Some(StructuredOutputSpec::with_inline_schema(
4250                json!({"type": "object"}),
4251            )),
4252        );
4253        assert!(
4254            Runner::get_retry_config(&task).is_none(),
4255            "JSON without schema should not qualify"
4256        );
4257    }
4258
4259    #[test]
4260    fn test_get_retry_config_none_for_no_structured() {
4261        let task = make_infer_task(
4262            "no_structured",
4263            Some(AnalyzedOutput {
4264                format: AnalyzedOutputFormat::Json,
4265                schema: Some(json!({"type": "object"})),
4266                schema_ref: None,
4267                max_retries: None,
4268                span: Span::dummy(),
4269            }),
4270            None, // No structured spec → no max_retries
4271        );
4272        assert!(
4273            Runner::get_retry_config(&task).is_none(),
4274            "No structured spec means no retry"
4275        );
4276    }
4277
4278    #[test]
4279    fn test_get_retry_config_none_for_zero_retries() {
4280        let mut structured = StructuredOutputSpec::with_inline_schema(json!({"type": "object"}));
4281        structured.max_retries = Some(0);
4282        let task = make_infer_task(
4283            "zero_retries",
4284            Some(AnalyzedOutput {
4285                format: AnalyzedOutputFormat::Json,
4286                schema: Some(json!({"type": "object"})),
4287                schema_ref: None,
4288                max_retries: None,
4289                span: Span::dummy(),
4290            }),
4291            Some(structured),
4292        );
4293        assert!(
4294            Runner::get_retry_config(&task).is_none(),
4295            "Zero retries means no retry"
4296        );
4297    }
4298
4299    #[test]
4300    fn test_get_retry_config_none_for_default_retries() {
4301        let mut structured = StructuredOutputSpec::with_inline_schema(json!({"type": "object"}));
4302        structured.max_retries = None; // defaults to 0 via unwrap_or(0)
4303        let task = make_infer_task(
4304            "default_retries",
4305            Some(AnalyzedOutput {
4306                format: AnalyzedOutputFormat::Json,
4307                schema: Some(json!({"type": "object"})),
4308                schema_ref: None,
4309                max_retries: None,
4310                span: Span::dummy(),
4311            }),
4312            Some(structured),
4313        );
4314        assert!(
4315            Runner::get_retry_config(&task).is_none(),
4316            "Default retries (None → 0) means no retry"
4317        );
4318    }
4319
4320    #[test]
4321    fn test_get_retry_config_some_for_valid_config() {
4322        let schema = json!({"type": "object", "properties": {"name": {"type": "string"}}});
4323        let mut structured = StructuredOutputSpec::with_inline_schema(schema.clone());
4324        structured.max_retries = Some(3);
4325        let task = make_infer_task(
4326            "valid_retry",
4327            Some(AnalyzedOutput {
4328                format: AnalyzedOutputFormat::Json,
4329                schema: Some(schema.clone()),
4330                schema_ref: None,
4331                max_retries: None,
4332                span: Span::dummy(),
4333            }),
4334            Some(structured),
4335        );
4336        let result = Runner::get_retry_config(&task);
4337        assert!(result.is_some(), "Valid config should return Some");
4338
4339        let (ret_schema, max_retries, infer) = result.unwrap();
4340        assert_eq!(ret_schema, schema);
4341        assert_eq!(max_retries, 3);
4342        assert_eq!(infer.prompt, "test prompt");
4343    }
4344
4345    // ═══════════════════════════════════════════════════════════════
4346    // FIND_ROOT_FAILURE TESTS
4347    // ═══════════════════════════════════════════════════════════════
4348
4349    #[test]
4350    fn test_find_root_failure_none_when_empty() {
4351        let runner = Runner::new(make_empty_workflow()).unwrap();
4352        assert!(
4353            runner.find_root_failure().is_none(),
4354            "Empty workflow has no failures"
4355        );
4356    }
4357
4358    #[test]
4359    fn test_find_root_failure_none_when_all_succeed() {
4360        let workflow = create_exec_workflow(vec![("a", "echo a"), ("b", "echo b")], vec![]);
4361        let runner = Runner::new(workflow).unwrap();
4362
4363        // Simulate successful completions
4364        runner.datastore.insert(
4365            intern("a"),
4366            TaskResult::success(json!("ok"), Duration::from_millis(10)),
4367        );
4368        runner.datastore.insert(
4369            intern("b"),
4370            TaskResult::success(json!("ok"), Duration::from_millis(20)),
4371        );
4372
4373        assert!(
4374            runner.find_root_failure().is_none(),
4375            "All-success should return None"
4376        );
4377    }
4378
4379    #[test]
4380    fn test_find_root_failure_returns_first_failed() {
4381        let workflow = create_exec_workflow(
4382            vec![("a", "echo a"), ("b", "echo b"), ("c", "echo c")],
4383            vec![],
4384        );
4385        let runner = Runner::new(workflow).unwrap();
4386
4387        runner.datastore.insert(
4388            intern("a"),
4389            TaskResult::success(json!("ok"), Duration::from_millis(10)),
4390        );
4391        runner.datastore.insert(
4392            intern("b"),
4393            TaskResult::failed("something broke".to_string(), Duration::from_millis(20)),
4394        );
4395        runner.datastore.insert(
4396            intern("c"),
4397            TaskResult::failed("also broken".to_string(), Duration::from_millis(30)),
4398        );
4399
4400        assert_eq!(
4401            runner.find_root_failure(),
4402            Some("b".to_string()),
4403            "Should return first failed task in workflow order"
4404        );
4405    }
4406
4407    #[test]
4408    fn test_find_root_failure_skips_dependency_failed() {
4409        let workflow = create_exec_workflow(
4410            vec![("a", "echo a"), ("b", "echo b"), ("c", "echo c")],
4411            vec![("a", "b"), ("b", "c")],
4412        );
4413        let runner = Runner::new(workflow).unwrap();
4414
4415        runner.datastore.insert(
4416            intern("a"),
4417            TaskResult::failed("root cause".to_string(), Duration::from_millis(10)),
4418        );
4419        runner
4420            .datastore
4421            .insert(intern("b"), TaskResult::dependency_failed("a"));
4422        runner
4423            .datastore
4424            .insert(intern("c"), TaskResult::dependency_failed("b"));
4425
4426        assert_eq!(
4427            runner.find_root_failure(),
4428            Some("a".to_string()),
4429            "Should skip DependencyFailed and return the actual failure"
4430        );
4431    }
4432
4433    // ═══════════════════════════════════════════════════════════════
4434    // GET_PENDING_TASKS TESTS
4435    // ═══════════════════════════════════════════════════════════════
4436
4437    #[test]
4438    fn test_get_pending_tasks_all_pending() {
4439        let workflow = create_exec_workflow(
4440            vec![("a", "echo a"), ("b", "echo b"), ("c", "echo c")],
4441            vec![],
4442        );
4443        let runner = Runner::new(workflow).unwrap();
4444
4445        let pending = runner.get_pending_tasks();
4446        assert_eq!(pending.len(), 3);
4447        assert!(pending.contains(&"a".to_string()));
4448        assert!(pending.contains(&"b".to_string()));
4449        assert!(pending.contains(&"c".to_string()));
4450    }
4451
4452    #[test]
4453    fn test_get_pending_tasks_excludes_completed() {
4454        let workflow = create_exec_workflow(
4455            vec![("a", "echo a"), ("b", "echo b"), ("c", "echo c")],
4456            vec![],
4457        );
4458        let runner = Runner::new(workflow).unwrap();
4459
4460        runner.datastore.insert(
4461            intern("a"),
4462            TaskResult::success(json!("ok"), Duration::from_millis(10)),
4463        );
4464        runner.datastore.insert(
4465            intern("c"),
4466            TaskResult::success(json!("ok"), Duration::from_millis(20)),
4467        );
4468
4469        let pending = runner.get_pending_tasks();
4470        assert_eq!(pending, vec!["b".to_string()]);
4471    }
4472
4473    #[test]
4474    fn test_get_pending_tasks_empty_when_all_done() {
4475        let workflow = create_exec_workflow(vec![("a", "echo a"), ("b", "echo b")], vec![]);
4476        let runner = Runner::new(workflow).unwrap();
4477
4478        runner.datastore.insert(
4479            intern("a"),
4480            TaskResult::success(json!("ok"), Duration::from_millis(10)),
4481        );
4482        runner.datastore.insert(
4483            intern("b"),
4484            TaskResult::success(json!("ok"), Duration::from_millis(20)),
4485        );
4486
4487        let pending = runner.get_pending_tasks();
4488        assert!(pending.is_empty());
4489    }
4490
4491    #[test]
4492    fn test_get_pending_tasks_excludes_failed() {
4493        let workflow = create_exec_workflow(vec![("a", "echo a"), ("b", "echo b")], vec![]);
4494        let runner = Runner::new(workflow).unwrap();
4495
4496        runner.datastore.insert(
4497            intern("a"),
4498            TaskResult::failed("error".to_string(), Duration::from_millis(10)),
4499        );
4500
4501        let pending = runner.get_pending_tasks();
4502        assert_eq!(pending, vec!["b".to_string()]);
4503    }
4504
4505    // ═══════════════════════════════════════════════════════════════
4506    // GET_READY_TASKS + DEPENDENCY FAILURE PROPAGATION TESTS
4507    // ═══════════════════════════════════════════════════════════════
4508
4509    #[test]
4510    fn test_get_ready_tasks_no_deps() {
4511        let workflow = create_exec_workflow(vec![("a", "echo a"), ("b", "echo b")], vec![]);
4512        let runner = Runner::new(workflow).unwrap();
4513
4514        let ready = runner.get_ready_tasks();
4515        assert_eq!(ready.len(), 2, "Tasks with no deps should all be ready");
4516    }
4517
4518    #[test]
4519    fn test_get_ready_tasks_blocked_by_incomplete_dep() {
4520        let workflow =
4521            create_exec_workflow(vec![("a", "echo a"), ("b", "echo b")], vec![("a", "b")]);
4522        let runner = Runner::new(workflow).unwrap();
4523
4524        let ready = runner.get_ready_tasks();
4525        assert_eq!(ready.len(), 1);
4526        assert_eq!(ready[0].name, "a", "Only root task should be ready");
4527    }
4528
4529    #[test]
4530    fn test_get_ready_tasks_unblocked_after_dep_success() {
4531        let workflow =
4532            create_exec_workflow(vec![("a", "echo a"), ("b", "echo b")], vec![("a", "b")]);
4533        let runner = Runner::new(workflow).unwrap();
4534
4535        runner.datastore.insert(
4536            intern("a"),
4537            TaskResult::success(json!("ok"), Duration::from_millis(10)),
4538        );
4539
4540        let ready = runner.get_ready_tasks();
4541        assert_eq!(ready.len(), 1);
4542        assert_eq!(ready[0].name, "b", "b should be ready after a succeeds");
4543    }
4544
4545    #[test]
4546    fn test_get_ready_tasks_skips_already_done() {
4547        let workflow = create_exec_workflow(vec![("a", "echo a"), ("b", "echo b")], vec![]);
4548        let runner = Runner::new(workflow).unwrap();
4549
4550        runner.datastore.insert(
4551            intern("a"),
4552            TaskResult::success(json!("ok"), Duration::from_millis(10)),
4553        );
4554
4555        let ready = runner.get_ready_tasks();
4556        assert_eq!(ready.len(), 1);
4557        assert_eq!(ready[0].name, "b", "Completed task should not be returned");
4558    }
4559
4560    #[test]
4561    fn test_dependency_failure_propagation() {
4562        // a → b → c: if a fails, b and c should get DependencyFailed
4563        let workflow = create_exec_workflow(
4564            vec![("a", "echo a"), ("b", "echo b"), ("c", "echo c")],
4565            vec![("a", "b"), ("b", "c")],
4566        );
4567        let runner = Runner::new(workflow).unwrap();
4568
4569        // Mark a as failed
4570        runner.datastore.insert(
4571            intern("a"),
4572            TaskResult::failed("boom".to_string(), Duration::from_millis(10)),
4573        );
4574
4575        // First call: b should be marked as DependencyFailed
4576        let ready = runner.get_ready_tasks();
4577        assert!(ready.is_empty(), "No tasks should be ready when dep failed");
4578
4579        // Verify b was stored as DependencyFailed
4580        let b_result = runner.datastore.get("b").expect("b should be in store");
4581        assert!(
4582            b_result.is_dependency_failed(),
4583            "b should be DependencyFailed"
4584        );
4585        assert_eq!(
4586            b_result.failed_dependency(),
4587            Some("a"),
4588            "b should record a as the failed dependency"
4589        );
4590
4591        // Second call: c should now also be marked as DependencyFailed
4592        let ready = runner.get_ready_tasks();
4593        assert!(ready.is_empty());
4594
4595        let c_result = runner.datastore.get("c").expect("c should be in store");
4596        assert!(
4597            c_result.is_dependency_failed(),
4598            "c should be DependencyFailed"
4599        );
4600        assert_eq!(
4601            c_result.failed_dependency(),
4602            Some("b"),
4603            "c should record b as the failed dependency"
4604        );
4605    }
4606
4607    #[test]
4608    fn test_dependency_failure_does_not_affect_parallel_tasks() {
4609        // a → b, a → c (parallel), d (independent)
4610        // If a fails, b and c get DependencyFailed, but d remains pending
4611        let workflow = create_exec_workflow(
4612            vec![
4613                ("a", "echo a"),
4614                ("b", "echo b"),
4615                ("c", "echo c"),
4616                ("d", "echo d"),
4617            ],
4618            vec![("a", "b"), ("a", "c")],
4619        );
4620        let runner = Runner::new(workflow).unwrap();
4621
4622        // Mark a as failed
4623        runner.datastore.insert(
4624            intern("a"),
4625            TaskResult::failed("oops".to_string(), Duration::from_millis(10)),
4626        );
4627
4628        let ready = runner.get_ready_tasks();
4629        assert_eq!(ready.len(), 1, "Only d should be ready");
4630        assert_eq!(ready[0].name, "d");
4631
4632        // b and c should be dependency-failed
4633        assert!(runner.datastore.get("b").unwrap().is_dependency_failed());
4634        assert!(runner.datastore.get("c").unwrap().is_dependency_failed());
4635    }
4636
4637    #[test]
4638    fn test_dependency_failure_emits_events() {
4639        let workflow =
4640            create_exec_workflow(vec![("a", "echo a"), ("b", "echo b")], vec![("a", "b")]);
4641        let runner = Runner::new(workflow).unwrap();
4642
4643        runner.datastore.insert(
4644            intern("a"),
4645            TaskResult::failed("crash".to_string(), Duration::from_millis(10)),
4646        );
4647
4648        // Trigger dependency failure propagation
4649        let _ = runner.get_ready_tasks();
4650
4651        // Check that a TaskSkipped event was emitted for b
4652        let events = runner.event_log.events();
4653        let skip_events: Vec<_> = events
4654            .iter()
4655            .filter(|e| {
4656                matches!(
4657                    &e.kind,
4658                    EventKind::TaskSkipped { task_id, .. } if task_id.as_ref() == "b"
4659                )
4660            })
4661            .collect();
4662        assert_eq!(
4663            skip_events.len(),
4664            1,
4665            "Should emit exactly one TaskSkipped event for b"
4666        );
4667    }
4668
4669    // ═══════════════════════════════════════════════════════════════
4670    // ALL_DONE TESTS
4671    // ═══════════════════════════════════════════════════════════════
4672
4673    #[test]
4674    fn test_all_done_empty_workflow() {
4675        let runner = Runner::new(make_empty_workflow()).unwrap();
4676        assert!(runner.all_done(), "Empty workflow is trivially done");
4677    }
4678
4679    #[test]
4680    fn test_all_done_false_when_pending() {
4681        let workflow = create_exec_workflow(vec![("a", "echo a")], vec![]);
4682        let runner = Runner::new(workflow).unwrap();
4683        assert!(!runner.all_done());
4684    }
4685
4686    #[test]
4687    fn test_all_done_true_with_mixed_outcomes() {
4688        let workflow = create_exec_workflow(
4689            vec![("a", "echo a"), ("b", "echo b"), ("c", "echo c")],
4690            vec![],
4691        );
4692        let runner = Runner::new(workflow).unwrap();
4693
4694        runner.datastore.insert(
4695            intern("a"),
4696            TaskResult::success(json!("ok"), Duration::from_millis(10)),
4697        );
4698        runner.datastore.insert(
4699            intern("b"),
4700            TaskResult::failed("err".to_string(), Duration::from_millis(20)),
4701        );
4702        runner
4703            .datastore
4704            .insert(intern("c"), TaskResult::dependency_failed("b"));
4705
4706        assert!(
4707            runner.all_done(),
4708            "All tasks have results (success, failed, or dep-failed)"
4709        );
4710    }
4711
4712    // ═══════════════════════════════════════════════════════════════
4713    // AUDIT: FOR_EACH EDGE CASES
4714    // ═══════════════════════════════════════════════════════════════
4715
4716    #[tokio::test]
4717    async fn audit_for_each_empty_array_produces_empty_result() {
4718        let workflow = create_for_each_workflow(
4719            "empty_loop",
4720            "[]",
4721            "item",
4722            "echo {{with.item}}",
4723            None,
4724            true,
4725            false,
4726        );
4727
4728        let mut runner = Runner::new(workflow).unwrap();
4729        let result = runner.run().await;
4730        assert!(
4731            result.is_ok(),
4732            "Empty for_each should complete successfully: {:?}",
4733            result.err()
4734        );
4735
4736        let task_result = runner.datastore.get("empty_loop");
4737        assert!(task_result.is_some(), "Task result should exist");
4738
4739        let tr = task_result.unwrap();
4740        assert!(tr.is_success(), "Empty for_each should succeed");
4741
4742        // The output should be an empty array [], NOT the output of running
4743        // the task body once as a regular task.
4744        let output = tr.output_str();
4745        let parsed: Result<Vec<Value>, _> = serde_json::from_str(&output);
4746        assert!(
4747            parsed.is_ok(),
4748            "Output should be valid JSON array, got: {}",
4749            output
4750        );
4751        assert_eq!(
4752            parsed.unwrap().len(),
4753            0,
4754            "Empty for_each should produce empty array, got: {}",
4755            output
4756        );
4757    }
4758
4759    #[tokio::test]
4760    async fn audit_for_each_single_item_array_works() {
4761        let workflow = create_for_each_workflow(
4762            "single",
4763            r#"["only_one"]"#,
4764            "item",
4765            "echo {{with.item}}",
4766            None,
4767            true,
4768            false,
4769        );
4770
4771        let mut runner = Runner::new(workflow).unwrap();
4772        let result = runner.run().await;
4773        assert!(
4774            result.is_ok(),
4775            "Single-item for_each should complete: {:?}",
4776            result.err()
4777        );
4778
4779        let task_result = runner.datastore.get("single");
4780        assert!(task_result.is_some(), "Task result should exist");
4781
4782        let tr = task_result.unwrap();
4783        assert!(tr.is_success(), "Task should succeed");
4784
4785        let output = tr.output_str();
4786        let parsed: Vec<Value> = serde_json::from_str(&output)
4787            .unwrap_or_else(|_| panic!("Output should be JSON array, got: {}", output));
4788        assert_eq!(parsed.len(), 1, "Should have exactly one result");
4789        let first_str = parsed[0].as_str().unwrap_or("");
4790        assert!(
4791            first_str.contains("only_one"),
4792            "Single result should contain 'only_one', got: {}",
4793            first_str
4794        );
4795    }
4796
4797    #[tokio::test]
4798    async fn audit_for_each_nested_json_items_bound_correctly() {
4799        let workflow = create_for_each_workflow(
4800            "nested_items",
4801            r#"[{"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}]"#,
4802            "person",
4803            "echo {{with.person}}",
4804            None,
4805            true,
4806            true,
4807        );
4808
4809        let mut runner = Runner::new(workflow).unwrap();
4810        let result = runner.run().await;
4811        assert!(
4812            result.is_ok(),
4813            "Nested JSON for_each should complete: {:?}",
4814            result.err()
4815        );
4816
4817        let task_result = runner.datastore.get("nested_items");
4818        assert!(task_result.is_some(), "Task result should exist");
4819
4820        let tr = task_result.unwrap();
4821        assert!(tr.is_success(), "Task should succeed");
4822
4823        let output = tr.output_str();
4824        assert!(
4825            output.contains("Alice") && output.contains("Bob"),
4826            "Output should contain both names from nested JSON items, got: {}",
4827            output
4828        );
4829    }
4830
4831    #[tokio::test]
4832    async fn audit_for_each_fail_fast_false_continues_after_failure() {
4833        // BUG TEST: With fail_fast=false, ALL iterations should run even when
4834        // one fails. Previously, abort_all was called unconditionally.
4835        let workflow = create_for_each_workflow(
4836            "continue_on_fail",
4837            r#"["ok1", "FAIL", "ok2"]"#,
4838            "item",
4839            "test '{{with.item}}' != 'FAIL' && echo {{with.item}}",
4840            Some(1),
4841            false, // fail_fast = false
4842            true,  // shell = true (command uses shell operators)
4843        );
4844
4845        let mut runner = Runner::new(workflow).unwrap();
4846        let _result = runner.run().await;
4847
4848        let task_result = runner.datastore.get("continue_on_fail");
4849        assert!(task_result.is_some(), "Parent task result should exist");
4850
4851        let ok1_result = runner.datastore.get("continue_on_fail[0]");
4852        let fail_result = runner.datastore.get("continue_on_fail[1]");
4853        let ok2_result = runner.datastore.get("continue_on_fail[2]");
4854
4855        assert!(ok1_result.is_some(), "First iteration result should exist");
4856        assert!(
4857            fail_result.is_some(),
4858            "Second iteration result should exist"
4859        );
4860        assert!(
4861            ok2_result.is_some(),
4862            "Third iteration result should exist (fail_fast=false)"
4863        );
4864
4865        assert!(
4866            ok1_result.unwrap().is_success(),
4867            "First iteration should succeed"
4868        );
4869        assert!(
4870            !fail_result.unwrap().is_success(),
4871            "Second iteration should fail"
4872        );
4873        assert!(
4874            ok2_result.unwrap().is_success(),
4875            "Third iteration should succeed (not aborted by fail_fast=false)"
4876        );
4877    }
4878
4879    #[tokio::test]
4880    async fn audit_for_each_with_depends_on_runs_after_dependency() {
4881        let workflow = create_two_step_for_each_workflow(
4882            r#"echo '["red", "green", "blue"]'"#,
4883            true,
4884            "$step1",
4885            "echo color={{with.item}}",
4886        );
4887
4888        let mut runner = Runner::new(workflow).unwrap();
4889        let result = runner.run().await;
4890        assert!(
4891            result.is_ok(),
4892            "for_each with depends_on should complete: {:?}",
4893            result.err()
4894        );
4895
4896        let step1_result = runner.datastore.get("step1");
4897        assert!(step1_result.is_some());
4898        assert!(step1_result.unwrap().is_success());
4899
4900        let step2_result = runner.datastore.get("step2");
4901        assert!(step2_result.is_some());
4902        let tr = step2_result.unwrap();
4903        assert!(
4904            tr.is_success(),
4905            "step2 should succeed, got error: {:?}",
4906            tr.error()
4907        );
4908
4909        let output = tr.output_str();
4910        assert!(
4911            output.contains("red") && output.contains("green") && output.contains("blue"),
4912            "for_each with depends_on should produce all 3 colors, got: {}",
4913            output
4914        );
4915    }
4916
4917    #[tokio::test]
4918    async fn audit_for_each_items_non_array_non_string_errors() {
4919        let workflow =
4920            create_two_step_for_each_workflow("echo 42", false, "$step1", "echo {{with.item}}");
4921
4922        let mut runner = Runner::new(workflow).unwrap();
4923        let _ = runner.run().await;
4924
4925        let task_result = runner.datastore.get("step2");
4926        assert!(task_result.is_some(), "step2 result should exist");
4927
4928        let tr = task_result.unwrap();
4929        assert!(
4930            !tr.is_success(),
4931            "for_each with non-array binding should fail"
4932        );
4933        let error_msg = tr.error().expect("should have error");
4934        assert!(
4935            error_msg.contains("non-array"),
4936            "Error should mention non-array, got: {}",
4937            error_msg
4938        );
4939    }
4940
4941    #[tokio::test]
4942    async fn audit_for_each_large_array_with_concurrency() {
4943        let items: Vec<String> = (0..20).map(|i| format!("\"item{}\"", i)).collect();
4944        let items_json = format!("[{}]", items.join(", "));
4945
4946        let workflow = create_for_each_workflow(
4947            "large_batch",
4948            &items_json,
4949            "x",
4950            "echo {{with.x}}",
4951            Some(4),
4952            true,
4953            false,
4954        );
4955
4956        let mut runner = Runner::new(workflow).unwrap();
4957        let result = runner.run().await;
4958        assert!(
4959            result.is_ok(),
4960            "Large for_each should complete: {:?}",
4961            result.err()
4962        );
4963
4964        let task_result = runner.datastore.get("large_batch");
4965        assert!(task_result.is_some());
4966        let tr = task_result.unwrap();
4967        assert!(tr.is_success(), "Large batch should succeed");
4968
4969        let output = tr.output_str();
4970        let parsed: Vec<Value> = serde_json::from_str(&output)
4971            .unwrap_or_else(|_| panic!("Should be JSON array, got: {}", output));
4972        assert_eq!(
4973            parsed.len(),
4974            20,
4975            "Should have 20 results from large batch, got: {}",
4976            parsed.len()
4977        );
4978    }
4979
4980    // ═══════════════════════════════════════════════════════════════
4981    // AUDIT: STRUCTURED OUTPUT ENGINE EDGE CASES
4982    // ═══════════════════════════════════════════════════════════════
4983
4984    #[tokio::test]
4985    async fn audit_structured_output_layer3_with_mock_callback_succeeds() {
4986        use crate::runtime::structured_output::{InferCallback, StructuredOutputEngine};
4987
4988        let log = Arc::new(EventLog::new());
4989        let schema = json!({
4990            "type": "object",
4991            "properties": {
4992                "name": { "type": "string" },
4993                "age": { "type": "integer" }
4994            },
4995            "required": ["name", "age"]
4996        });
4997        let mut spec = StructuredOutputSpec::with_inline_schema(schema);
4998        spec.max_retries = Some(2);
4999        spec.enable_retry = Some(true);
5000
5001        let callback: InferCallback = Arc::new(move |_prompt: String| {
5002            Box::pin(async move { Ok(r#"{"name": "Fixed", "age": 42}"#.to_string()) })
5003        });
5004
5005        let mut engine = StructuredOutputEngine::new(spec, log.clone())
5006            .with_infer_callback(callback)
5007            .with_original_prompt("Generate a user".to_string());
5008
5009        let result = engine
5010            .validate("retry-test", r#"{"name": "Incomplete"}"#)
5011            .await;
5012
5013        assert!(
5014            result.is_ok(),
5015            "Layer 3 should succeed with mock callback: {:?}",
5016            result.err()
5017        );
5018        let r = result.unwrap();
5019        assert_eq!(r.layer, 3, "Should have succeeded at Layer 3");
5020        assert_eq!(r.value["name"], "Fixed");
5021        assert_eq!(r.value["age"], 42);
5022    }
5023
5024    #[tokio::test]
5025    async fn audit_structured_output_layer3_exhausts_retries() {
5026        use crate::runtime::structured_output::{InferCallback, StructuredOutputEngine};
5027
5028        let log = Arc::new(EventLog::new());
5029        let schema = json!({
5030            "type": "object",
5031            "properties": {
5032                "name": { "type": "string" },
5033                "score": { "type": "number" }
5034            },
5035            "required": ["name", "score"]
5036        });
5037        let mut spec = StructuredOutputSpec::with_inline_schema(schema);
5038        spec.max_retries = Some(2);
5039        spec.enable_retry = Some(true);
5040        spec.enable_repair = Some(false);
5041
5042        let call_count = Arc::new(std::sync::atomic::AtomicU32::new(0));
5043        let call_count_clone = Arc::clone(&call_count);
5044        let callback: InferCallback = Arc::new(move |_prompt: String| {
5045            call_count_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
5046            Box::pin(async move { Ok(r#"{"name": "Still Wrong"}"#.to_string()) })
5047        });
5048
5049        let mut engine = StructuredOutputEngine::new(spec, log.clone())
5050            .with_infer_callback(callback)
5051            .with_original_prompt("test".to_string());
5052
5053        let result = engine
5054            .validate("exhaust-test", r#"{"name": "Invalid"}"#)
5055            .await;
5056
5057        assert!(result.is_err(), "Should fail after exhausting all retries");
5058
5059        let calls = call_count.load(std::sync::atomic::Ordering::SeqCst);
5060        assert_eq!(
5061            calls, 2,
5062            "Layer 3 should call LLM exactly max_retries times, got: {}",
5063            calls
5064        );
5065    }
5066
5067    #[tokio::test]
5068    async fn audit_structured_output_layer4_repair_succeeds() {
5069        use crate::runtime::structured_output::{InferCallback, StructuredOutputEngine};
5070
5071        let log = Arc::new(EventLog::new());
5072        let schema = json!({
5073            "type": "object",
5074            "properties": {
5075                "valid": { "type": "boolean" }
5076            },
5077            "required": ["valid"]
5078        });
5079        let mut spec = StructuredOutputSpec::with_inline_schema(schema);
5080        spec.enable_retry = Some(false);
5081        spec.enable_repair = Some(true);
5082
5083        let callback: InferCallback = Arc::new(move |_prompt: String| {
5084            Box::pin(async move { Ok(r#"{"valid": true}"#.to_string()) })
5085        });
5086
5087        let mut engine =
5088            StructuredOutputEngine::new(spec, log.clone()).with_infer_callback(callback);
5089
5090        let result = engine
5091            .validate("repair-test", r#"{"invalid_field": 123}"#)
5092            .await;
5093
5094        assert!(
5095            result.is_ok(),
5096            "Layer 4 repair should succeed: {:?}",
5097            result.err()
5098        );
5099        let r = result.unwrap();
5100        assert_eq!(r.layer, 4, "Should have succeeded at Layer 4");
5101        assert_eq!(r.value["valid"], true);
5102    }
5103
5104    #[tokio::test]
5105    async fn audit_structured_output_validates_array_schema() {
5106        use crate::runtime::structured_output::StructuredOutputEngine;
5107
5108        let log = Arc::new(EventLog::new());
5109        let schema = json!({
5110            "type": "array",
5111            "items": { "type": "string" },
5112            "minItems": 1
5113        });
5114        let spec = StructuredOutputSpec::with_inline_schema(schema);
5115        let mut engine = StructuredOutputEngine::new(spec, log);
5116
5117        let result = engine.validate("arr-ok", r#"["hello", "world"]"#).await;
5118        assert!(result.is_ok(), "String array should validate");
5119
5120        let mut engine2 = StructuredOutputEngine::new(
5121            StructuredOutputSpec::with_inline_schema(json!({
5122                "type": "array",
5123                "items": { "type": "string" },
5124                "minItems": 1
5125            })),
5126            Arc::new(EventLog::new()),
5127        );
5128        let result = engine2.validate("arr-empty", "[]").await;
5129        assert!(result.is_err(), "Empty array should fail minItems check");
5130    }
5131
5132    #[tokio::test]
5133    async fn audit_structured_output_validates_additional_properties_false() {
5134        use crate::runtime::structured_output::StructuredOutputEngine;
5135
5136        let log = Arc::new(EventLog::new());
5137        let spec = StructuredOutputSpec::with_inline_schema(json!({
5138            "type": "object",
5139            "properties": {
5140                "name": { "type": "string" }
5141            },
5142            "required": ["name"],
5143            "additionalProperties": false
5144        }));
5145        let mut engine = StructuredOutputEngine::new(spec, log);
5146
5147        let result = engine.validate("addl-ok", r#"{"name": "test"}"#).await;
5148        assert!(result.is_ok(), "Known properties only should validate");
5149
5150        let mut engine2 = StructuredOutputEngine::new(
5151            StructuredOutputSpec::with_inline_schema(json!({
5152                "type": "object",
5153                "properties": {
5154                    "name": { "type": "string" }
5155                },
5156                "required": ["name"],
5157                "additionalProperties": false
5158            })),
5159            Arc::new(EventLog::new()),
5160        );
5161        let result = engine2
5162            .validate("addl-bad", r#"{"name": "test", "extra": true}"#)
5163            .await;
5164        assert!(
5165            result.is_err(),
5166            "Extra properties should fail when additionalProperties=false"
5167        );
5168    }
5169
5170    #[tokio::test]
5171    async fn audit_structured_output_validates_deeply_nested_schema() {
5172        use crate::runtime::structured_output::StructuredOutputEngine;
5173
5174        let log = Arc::new(EventLog::new());
5175        let schema = json!({
5176            "type": "object",
5177            "properties": {
5178                "level1": {
5179                    "type": "object",
5180                    "properties": {
5181                        "level2": {
5182                            "type": "object",
5183                            "properties": {
5184                                "value": { "type": "integer" }
5185                            },
5186                            "required": ["value"]
5187                        }
5188                    },
5189                    "required": ["level2"]
5190                }
5191            },
5192            "required": ["level1"]
5193        });
5194        let spec = StructuredOutputSpec::with_inline_schema(schema);
5195        let mut engine = StructuredOutputEngine::new(spec, log);
5196
5197        let result = engine
5198            .validate("deep-ok", r#"{"level1": {"level2": {"value": 42}}}"#)
5199            .await;
5200        assert!(result.is_ok(), "Deeply nested valid should pass");
5201
5202        let mut engine2 = StructuredOutputEngine::new(
5203            StructuredOutputSpec::with_inline_schema(json!({
5204                "type": "object",
5205                "properties": {
5206                    "level1": {
5207                        "type": "object",
5208                        "properties": {
5209                            "level2": {
5210                                "type": "object",
5211                                "properties": {
5212                                    "value": { "type": "integer" }
5213                                },
5214                                "required": ["value"]
5215                            }
5216                        },
5217                        "required": ["level2"]
5218                    }
5219                },
5220                "required": ["level1"]
5221            })),
5222            Arc::new(EventLog::new()),
5223        );
5224        let result = engine2
5225            .validate(
5226                "deep-bad",
5227                r#"{"level1": {"level2": {"value": "not_a_number"}}}"#,
5228            )
5229            .await;
5230        assert!(
5231            result.is_err(),
5232            "Wrong type at deep level should fail validation"
5233        );
5234    }
5235
5236    #[tokio::test]
5237    async fn audit_structured_output_validates_primitive_types() {
5238        use crate::runtime::structured_output::StructuredOutputEngine;
5239
5240        // String schema
5241        let spec = StructuredOutputSpec::with_inline_schema(json!({"type": "string"}));
5242        let mut engine = StructuredOutputEngine::new(spec, Arc::new(EventLog::new()));
5243        let result = engine.validate("str-ok", r#""hello""#).await;
5244        assert!(
5245            result.is_ok(),
5246            "Quoted string should validate as string type"
5247        );
5248
5249        // Number schema
5250        let spec = StructuredOutputSpec::with_inline_schema(json!({"type": "number"}));
5251        let mut engine = StructuredOutputEngine::new(spec, Arc::new(EventLog::new()));
5252        let result = engine.validate("num-ok", "42.5").await;
5253        assert!(result.is_ok(), "Number should validate as number type");
5254
5255        // Boolean schema
5256        let spec = StructuredOutputSpec::with_inline_schema(json!({"type": "boolean"}));
5257        let mut engine = StructuredOutputEngine::new(spec, Arc::new(EventLog::new()));
5258        let result = engine.validate("bool-ok", "true").await;
5259        assert!(result.is_ok(), "Boolean should validate as boolean type");
5260
5261        // Null schema
5262        let spec = StructuredOutputSpec::with_inline_schema(json!({"type": "null"}));
5263        let mut engine = StructuredOutputEngine::new(spec, Arc::new(EventLog::new()));
5264        let result = engine.validate("null-ok", "null").await;
5265        assert!(result.is_ok(), "null should validate as null type");
5266    }
5267
5268    // ═══════════════════════════════════════════════════════════════
5269    // AUDIT: BUILD_RETRY_PROMPT TESTS
5270    // ═══════════════════════════════════════════════════════════════
5271
5272    #[test]
5273    fn audit_build_retry_prompt_includes_all_components() {
5274        let schema = json!({"type": "object", "required": ["name"]});
5275        let prompt = Runner::build_retry_prompt(
5276            "Generate a user",
5277            &schema,
5278            r#"{"broken": true}"#,
5279            "missing required field: name",
5280        );
5281
5282        assert!(
5283            prompt.contains("Generate a user"),
5284            "Should contain original prompt"
5285        );
5286        assert!(
5287            prompt.contains(r#"{"broken": true}"#),
5288            "Should contain the actual previous output"
5289        );
5290        assert!(
5291            prompt.contains("missing required field: name"),
5292            "Should contain validation errors"
5293        );
5294    }
5295
5296    // ═══════════════════════════════════════════════════════════════
5297    // AUDIT: LOWERING OF STRUCTURED FIELD
5298    // ═══════════════════════════════════════════════════════════════
5299
5300    #[test]
5301    fn audit_to_output_policy_preserves_structured_spec() {
5302        let schema = json!({"type": "object"});
5303        let mut spec = StructuredOutputSpec::with_inline_schema(schema.clone());
5304        spec.max_retries = Some(5);
5305        spec.enable_repair = Some(false);
5306
5307        let policy = spec.to_output_policy();
5308
5309        assert_eq!(policy.format, crate::ast::output::OutputFormat::Json,);
5310
5311        assert!(policy.schema.is_some());
5312
5313        assert_eq!(policy.max_retries, Some(5));
5314
5315        assert!(policy.source_structured_spec.is_some());
5316        let roundtripped = policy.source_structured_spec.unwrap();
5317        assert_eq!(roundtripped.max_retries, Some(5));
5318        assert_eq!(roundtripped.enable_repair, Some(false));
5319    }
5320
5321    // ═══════════════════════════════════════════════════════════════
5322    // LOCKFILE GUARD RAII TESTS
5323    // ═══════════════════════════════════════════════════════════════
5324
5325    #[test]
5326    fn lockfile_guard_creates_and_removes_on_drop() {
5327        let dir = tempfile::tempdir().unwrap();
5328        let lock_path = dir.path().join("store").join(".nika-run.lock");
5329
5330        {
5331            let _guard = LockfileGuard::create(lock_path.clone());
5332            assert!(
5333                lock_path.exists(),
5334                "Lockfile should exist while guard is alive"
5335            );
5336
5337            let content = std::fs::read_to_string(&lock_path).unwrap();
5338            assert!(
5339                content.starts_with("pid:"),
5340                "Lockfile should contain pid, got: {content}"
5341            );
5342        }
5343
5344        assert!(
5345            !lock_path.exists(),
5346            "Lockfile should be removed after guard is dropped"
5347        );
5348    }
5349
5350    #[test]
5351    fn lockfile_guard_removes_on_panic_unwind() {
5352        let dir = tempfile::tempdir().unwrap();
5353        let lock_path = dir.path().join("store").join(".nika-run.lock");
5354
5355        let result = std::panic::catch_unwind(|| {
5356            let _guard = LockfileGuard::create(lock_path.clone());
5357            assert!(lock_path.exists(), "Lockfile should exist before panic");
5358            panic!("simulated runner panic");
5359        });
5360
5361        assert!(result.is_err(), "Should have caught the panic");
5362        assert!(
5363            !lock_path.exists(),
5364            "Lockfile should be removed even after panic unwind"
5365        );
5366    }
5367
5368    #[test]
5369    fn lockfile_guard_removes_on_early_return() {
5370        let dir = tempfile::tempdir().unwrap();
5371        let lock_path = dir.path().join("store").join(".nika-run.lock");
5372
5373        fn simulate_early_return(path: &std::path::Path) -> Result<(), &'static str> {
5374            let _guard = LockfileGuard::create(path.to_path_buf());
5375            assert!(path.exists(), "Lockfile should exist before early return");
5376            Err("simulated ? operator bail-out")?;
5377            unreachable!()
5378        }
5379
5380        let result = simulate_early_return(&lock_path);
5381        assert!(result.is_err());
5382        assert!(
5383            !lock_path.exists(),
5384            "Lockfile should be removed after early return via ?"
5385        );
5386    }
5387
5388    #[test]
5389    fn lockfile_guard_tolerates_missing_file() {
5390        // If someone manually deletes the lockfile during a run,
5391        // Drop should not panic.
5392        let dir = tempfile::tempdir().unwrap();
5393        let lock_path = dir.path().join("store").join(".nika-run.lock");
5394
5395        let guard = LockfileGuard::create(lock_path.clone());
5396        assert!(lock_path.exists());
5397
5398        // Simulate external deletion
5399        std::fs::remove_file(&lock_path).unwrap();
5400        assert!(!lock_path.exists());
5401
5402        // Drop should not panic
5403        drop(guard);
5404    }
5405
5406    // ═══════════════════════════════════════════════════════════════
5407    // Bug #24: for_each {{with.alias.path}} traversal failure must
5408    // record a failed result, not silently run as a regular task.
5409    // ═══════════════════════════════════════════════════════════════
5410
5411    /// Build a two-step workflow where step2 uses {{with.step1.path}} for_each.
5412    fn create_with_template_for_each_workflow(
5413        step1_cmd: &str,
5414        step1_shell: bool,
5415        for_each_template: &str,
5416        step2_cmd: &str,
5417    ) -> AnalyzedWorkflow {
5418        let mut task_table = TaskTable::new();
5419        task_table.insert("step1");
5420        task_table.insert("step2");
5421        let tid1 = task_table.get_id("step1").unwrap();
5422        let tid2 = task_table.get_id("step2").unwrap();
5423
5424        let step1 = AnalyzedTask {
5425            id: tid1,
5426            name: "step1".to_string(),
5427            description: None,
5428            action: AnalyzedTaskAction::Exec(AnalyzedExecAction {
5429                command: step1_cmd.to_string(),
5430                shell: step1_shell,
5431                cwd: None,
5432                env: IndexMap::new(),
5433                timeout_ms: None,
5434                span: Span::dummy(),
5435            }),
5436            provider: None,
5437            model: None,
5438            with_spec: Default::default(),
5439            depends_on: vec![],
5440            implicit_deps: vec![],
5441            output: None,
5442            for_each: None,
5443            retry: None,
5444            decompose: None,
5445            concurrency: None,
5446            fail_fast: None,
5447            artifact: None,
5448            log: None,
5449            structured: None,
5450            span: Span::dummy(),
5451        };
5452
5453        let mut with_spec = WithSpec::default();
5454        with_spec.insert(
5455            "step1".to_string(),
5456            WithEntry::simple(BindingPath {
5457                source: BindingSource::Task(intern("step1")),
5458                segments: vec![],
5459            }),
5460        );
5461
5462        let step2 = AnalyzedTask {
5463            id: tid2,
5464            name: "step2".to_string(),
5465            description: None,
5466            action: AnalyzedTaskAction::Exec(AnalyzedExecAction {
5467                command: step2_cmd.to_string(),
5468                shell: false,
5469                cwd: None,
5470                env: IndexMap::new(),
5471                timeout_ms: None,
5472                span: Span::dummy(),
5473            }),
5474            provider: None,
5475            model: None,
5476            with_spec,
5477            depends_on: vec![tid1],
5478            implicit_deps: vec![],
5479            output: None,
5480            for_each: Some(AnalyzedForEach {
5481                items: for_each_template.to_string(),
5482                as_var: "item".to_string(),
5483                concurrency: None,
5484                fail_fast: true,
5485                span: Span::dummy(),
5486            }),
5487            retry: None,
5488            decompose: None,
5489            concurrency: None,
5490            fail_fast: None,
5491            artifact: None,
5492            log: None,
5493            structured: None,
5494            span: Span::dummy(),
5495        };
5496
5497        AnalyzedWorkflow {
5498            schema_version: SchemaVersion::V03,
5499            name: None,
5500            description: None,
5501            provider: Some("mock".to_string()),
5502            model: None,
5503            task_table,
5504            tasks: vec![step1, step2],
5505            mcp_servers: IndexMap::new(),
5506            context_files: vec![],
5507            imports: vec![],
5508            inputs: IndexMap::new(),
5509            artifacts: None,
5510            log: None,
5511            agents: None,
5512            skills_map: std::collections::HashMap::new(),
5513            span: Span::dummy(),
5514        }
5515    }
5516
5517    #[tokio::test]
5518    async fn bug24_for_each_with_template_traversal_failure_records_error() {
5519        // step1 outputs JSON object, step2 references nonexistent nested path
5520        let workflow = create_with_template_for_each_workflow(
5521            r#"echo '{"items": ["a","b"]}'"#,
5522            true,
5523            "{{with.step1.nonexistent}}",
5524            "echo {{with.item}}",
5525        );
5526
5527        let mut runner = Runner::new(workflow).unwrap().quiet();
5528        let _ = runner.run().await;
5529
5530        let task_result = runner.datastore.get("step2");
5531        assert!(task_result.is_some(), "step2 result should exist");
5532
5533        let result = task_result.unwrap();
5534        assert!(
5535            !result.is_success(),
5536            "step2 should FAIL when path traversal fails, not run as regular task"
5537        );
5538        let error_msg = result.error().expect("should have error message");
5539        assert!(
5540            error_msg.contains("traversal failed"),
5541            "Error should mention path traversal failure, got: {}",
5542            error_msg
5543        );
5544    }
5545
5546    // ═══════════════════════════════════════════════════════════════
5547    // Bug #25: for_each {{with.alias}} non-array must record error,
5548    // not silently run as a regular task.
5549    // ═══════════════════════════════════════════════════════════════
5550
5551    #[tokio::test]
5552    async fn bug25_for_each_with_template_non_array_records_error() {
5553        // step1 outputs a plain string (not an array), step2 tries to iterate over it
5554        let workflow = create_with_template_for_each_workflow(
5555            "echo not_an_array",
5556            false,
5557            "{{with.step1}}",
5558            "echo {{with.item}}",
5559        );
5560
5561        let mut runner = Runner::new(workflow).unwrap().quiet();
5562        let _ = runner.run().await;
5563
5564        let task_result = runner.datastore.get("step2");
5565        assert!(task_result.is_some(), "step2 result should exist");
5566
5567        let result = task_result.unwrap();
5568        assert!(
5569            !result.is_success(),
5570            "step2 should FAIL when for_each binding resolves to non-array"
5571        );
5572        let error_msg = result.error().expect("should have error message");
5573        assert!(
5574            error_msg.contains("non-array"),
5575            "Error should mention 'non-array', got: {}",
5576            error_msg
5577        );
5578    }
5579
5580    // ═══════════════════════════════════════════════════════════════
5581    // FOR_EACH CONCURRENT FAIL_FAST CANCELLATION
5582    // ═══════════════════════════════════════════════════════════════
5583
5584    #[tokio::test]
5585    async fn for_each_concurrent_fail_fast_cancels_remaining_iterations() {
5586        // Verify that with concurrency > 1 and fail_fast = true, when one
5587        // iteration fails, iterations waiting on the semaphore are cancelled
5588        // (returned as Skipped) rather than being allowed to proceed.
5589        //
5590        // Cancellation semantics: the CancellationToken fires when an iteration
5591        // fails. This cancels iterations that are:
5592        //   (a) waiting to acquire the semaphore (via tokio::select!)
5593        //   (b) not yet spawned (checked in the spawn loop)
5594        //   (c) checked again after acquiring the permit
5595        //
5596        // Iterations that already acquired a permit and started executing will
5597        // run to completion — the token does not abort running shell processes.
5598        //
5599        // Strategy:
5600        //   - 6 items, concurrency=2, fail_fast=true
5601        //   - Item 0 fails immediately (exit 1)
5602        //   - Item 1 succeeds quickly (echo)
5603        //   - Items 2-5 would run if allowed, but should be cancelled at the
5604        //     semaphore gate since the cancel token fires before they acquire.
5605
5606        let workflow = create_for_each_workflow(
5607            "cancel_test",
5608            r#"["FAIL", "ok1", "wait2", "wait3", "wait4", "wait5"]"#,
5609            "item",
5610            "if [ '{{with.item}}' = 'FAIL' ]; then exit 1; else echo {{with.item}}; fi",
5611            Some(2), // concurrency = 2
5612            true,    // fail_fast = true
5613            true,    // shell = true
5614        );
5615
5616        let mut runner = Runner::new(workflow).unwrap().quiet();
5617        let result = runner.run().await;
5618
5619        // Workflow should fail (fail_fast propagates the error)
5620        assert!(
5621            result.is_err(),
5622            "Workflow should fail when fail_fast triggers on concurrent for_each"
5623        );
5624
5625        // The failing iteration (index 0) should exist and be a failure
5626        let fail_iter = runner.datastore.get("cancel_test[0]");
5627        assert!(
5628            fail_iter.is_some(),
5629            "Failing iteration [0] result should exist"
5630        );
5631        assert!(
5632            !fail_iter.unwrap().is_success(),
5633            "Iteration [0] should have failed"
5634        );
5635
5636        // Item 1 may have completed (it was in the same concurrency batch as
5637        // item 0) or may have been skipped — either is acceptable.
5638        // But items 2-5 were waiting on the semaphore and MUST be skipped.
5639        let mut skipped_count = 0;
5640        let mut total_stored = 0;
5641        for idx in 2..6 {
5642            let key = format!("cancel_test[{}]", idx);
5643            if let Some(iter_result) = runner.datastore.get(&key) {
5644                total_stored += 1;
5645                if iter_result.is_skipped() {
5646                    skipped_count += 1;
5647                }
5648            }
5649            // Iterations that were never spawned (spawn loop saw cancellation)
5650            // won't have a datastore entry at all — that's also valid cancellation.
5651        }
5652
5653        // Items 2-5 were queued behind the semaphore. They should either be
5654        // skipped (cancel fired while waiting) or never spawned (cancel fired
5655        // before the spawn loop reached them). Either way, they should NOT
5656        // have succeeded.
5657        let succeeded_after_cancel: Vec<usize> = (2..6)
5658            .filter(|idx| {
5659                let key = format!("cancel_test[{}]", idx);
5660                runner
5661                    .datastore
5662                    .get(&key)
5663                    .map(|r| r.is_success())
5664                    .unwrap_or(false)
5665            })
5666            .collect();
5667
5668        assert!(
5669            succeeded_after_cancel.is_empty(),
5670            "Iterations behind the semaphore should not succeed after fail_fast cancellation, \
5671            but these succeeded: {:?}",
5672            succeeded_after_cancel
5673        );
5674
5675        // At least some of the queued iterations should have a skipped result
5676        // (others may not have been spawned at all).
5677        let not_spawned = 4 - total_stored;
5678        assert!(
5679            skipped_count + not_spawned >= 1,
5680            "At least one iteration should be cancelled (skipped={}, not_spawned={}). \
5681            This suggests cancellation tokens are not working for concurrent for_each.",
5682            skipped_count,
5683            not_spawned
5684        );
5685    }
5686
5687    // ═══════════════════════════════════════════════════════════════
5688    // Bug #26: fail_fast should only cancel sibling iterations,
5689    // not unrelated tasks in the same JoinSet.
5690    // ═══════════════════════════════════════════════════════════════
5691
5692    #[tokio::test]
5693    async fn bug26_fail_fast_does_not_abort_unrelated_sibling_tasks() {
5694        // Two independent for_each parents: one fails fast, the other should still complete.
5695        // "failing_parent" has fail_fast=true and one failing item.
5696        // "passing_parent" has fail_fast=true but all items succeed.
5697        // Both run in parallel (no depends_on between them).
5698        //
5699        // Before the fix, abort_all() would kill BOTH parents' tasks.
5700        // After the fix, only failing_parent's iterations are cancelled.
5701
5702        let mut task_table = TaskTable::new();
5703        task_table.insert("failing_parent");
5704        task_table.insert("passing_parent");
5705        let tid_fail = task_table.get_id("failing_parent").unwrap();
5706        let tid_pass = task_table.get_id("passing_parent").unwrap();
5707
5708        let failing_parent = AnalyzedTask {
5709            id: tid_fail,
5710            name: "failing_parent".to_string(),
5711            description: None,
5712            action: AnalyzedTaskAction::Exec(AnalyzedExecAction {
5713                command: "test '{{with.item}}' != 'FAIL' && echo {{with.item}}".to_string(),
5714                shell: true,
5715                cwd: None,
5716                env: IndexMap::new(),
5717                timeout_ms: None,
5718                span: Span::dummy(),
5719            }),
5720            provider: None,
5721            model: None,
5722            with_spec: Default::default(),
5723            depends_on: vec![],
5724            implicit_deps: vec![],
5725            output: None,
5726            for_each: Some(AnalyzedForEach {
5727                items: r#"["ok", "FAIL", "ok2"]"#.to_string(),
5728                as_var: "item".to_string(),
5729                concurrency: Some(3),
5730                fail_fast: true,
5731                span: Span::dummy(),
5732            }),
5733            retry: None,
5734            decompose: None,
5735            concurrency: None,
5736            fail_fast: None,
5737            artifact: None,
5738            log: None,
5739            structured: None,
5740            span: Span::dummy(),
5741        };
5742
5743        let passing_parent = AnalyzedTask {
5744            id: tid_pass,
5745            name: "passing_parent".to_string(),
5746            description: None,
5747            action: AnalyzedTaskAction::Exec(AnalyzedExecAction {
5748                command: "echo {{with.item}}".to_string(),
5749                shell: true,
5750                cwd: None,
5751                env: IndexMap::new(),
5752                timeout_ms: None,
5753                span: Span::dummy(),
5754            }),
5755            provider: None,
5756            model: None,
5757            with_spec: Default::default(),
5758            depends_on: vec![],
5759            implicit_deps: vec![],
5760            output: None,
5761            for_each: Some(AnalyzedForEach {
5762                items: r#"["a", "b", "c"]"#.to_string(),
5763                as_var: "item".to_string(),
5764                concurrency: Some(3),
5765                fail_fast: true,
5766                span: Span::dummy(),
5767            }),
5768            retry: None,
5769            decompose: None,
5770            concurrency: None,
5771            fail_fast: None,
5772            artifact: None,
5773            log: None,
5774            structured: None,
5775            span: Span::dummy(),
5776        };
5777
5778        let workflow = AnalyzedWorkflow {
5779            schema_version: SchemaVersion::V03,
5780            name: None,
5781            description: None,
5782            provider: Some("mock".to_string()),
5783            model: None,
5784            task_table,
5785            tasks: vec![failing_parent, passing_parent],
5786            mcp_servers: IndexMap::new(),
5787            context_files: vec![],
5788            imports: vec![],
5789            inputs: IndexMap::new(),
5790            artifacts: None,
5791            log: None,
5792            agents: None,
5793            skills_map: std::collections::HashMap::new(),
5794            span: Span::dummy(),
5795        };
5796
5797        let mut runner = Runner::new(workflow).unwrap().quiet();
5798        let _ = runner.run().await;
5799
5800        // The failing parent should have a result (aggregated, with at least one failure)
5801        let fail_result = runner.datastore.get("failing_parent");
5802        assert!(fail_result.is_some(), "failing_parent result should exist");
5803
5804        // The passing parent should ALSO have a result — it should NOT be aborted
5805        let pass_result = runner.datastore.get("passing_parent");
5806        assert!(
5807            pass_result.is_some(),
5808            "passing_parent result should exist (fail_fast of sibling should not abort it)"
5809        );
5810
5811        let pass_tr = pass_result.unwrap();
5812        assert!(
5813            pass_tr.is_success(),
5814            "passing_parent should succeed — its iterations were all OK. \
5815            Error: {:?}",
5816            pass_tr.error()
5817        );
5818    }
5819}