1use indexmap::IndexMap;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Arc;
11use std::time::Instant;
12
13use colored::Colorize;
14use serde_json::Value;
15use tokio::sync::{Notify, Semaphore};
16use tokio::task::JoinSet;
17use tokio_util::sync::CancellationToken;
18use tracing::{debug, info, instrument};
19
20use crate::ast::analyzed::{
21 AnalyzedOutput, AnalyzedTask, AnalyzedTaskAction, AnalyzedWorkflow,
22 OutputFormat as AnalyzedOutputFormat,
23};
24use crate::ast::lower::{lower_action, lower_mcp_servers, lower_output};
25use crate::ast::output::OutputPolicy;
26use crate::ast::{InferParams, TaskAction};
27use crate::binding::ResolvedBindings;
28use crate::dag::Dag;
29use crate::error::NikaError;
30use crate::event::{prune_traces, EventKind, EventLog, TraceWriter};
31use crate::runtime::boot::TraceConfig;
32use crate::store::{RunContext, TaskResult};
33use crate::util::{intern, DECOMPOSE_TIMEOUT};
34
35use super::artifact_processor::process_task_artifacts;
36use super::context_loader::load_context_analyzed;
37use super::executor::TaskExecutor;
38use super::output::{extract_json, format_validation_errors, make_task_result};
39use super::resolver::{resolve_assets_analyzed, ResolvedAssets};
40use super::structured_output::StructuredOutputEngine;
41
42use crate::ast::artifact::ArtifactsConfig;
43use std::path::PathBuf;
44
45struct LockfileGuard {
60 path: PathBuf,
61}
62
63impl LockfileGuard {
64 fn create(path: PathBuf) -> Self {
73 if path.exists() {
75 if let Ok(metadata) = path.metadata() {
76 if let Ok(modified) = metadata.modified() {
77 if modified.elapsed().unwrap_or_default() > std::time::Duration::from_secs(600)
78 {
79 tracing::warn!("Removing stale lockfile (>10min old)");
80 let _ = std::fs::remove_file(&path);
81 }
82 }
83 }
84 }
85
86 if let Some(parent) = path.parent() {
87 if let Err(e) = std::fs::create_dir_all(parent) {
88 tracing::warn!(path = %parent.display(), error = %e, "Failed to create lockfile directory");
89 }
90 }
91 if let Err(e) = std::fs::write(&path, format!("pid:{}", std::process::id())) {
92 tracing::warn!(path = %path.display(), error = %e, "Failed to write lockfile — concurrent runs may conflict");
93 }
94 Self { path }
95 }
96}
97
98impl Drop for LockfileGuard {
99 fn drop(&mut self) {
100 let _ = std::fs::remove_file(&self.path);
101 }
102}
103
104fn value_to_array(value: &Value) -> Option<Vec<Value>> {
118 if let Some(arr) = value.as_array() {
120 return Some(arr.clone());
121 }
122
123 if let Some(s) = value.as_str() {
125 if let Ok(extracted) = extract_json(s) {
126 if let Some(arr) = extracted.as_array() {
127 return Some(arr.clone());
128 }
129 }
130 }
131
132 None
133}
134
135struct IterationResult {
138 store_id: Arc<str>,
140 result: TaskResult,
142 for_each_info: Option<(Arc<str>, usize)>,
144 #[allow(dead_code)]
146 artifact_paths: Vec<PathBuf>,
147}
148
149pub struct Runner {
155 workflow: AnalyzedWorkflow,
156 flow_graph: Dag,
157 datastore: RunContext,
158 executor: TaskExecutor,
159 event_log: EventLog,
160 generation_id: String,
162 quiet: bool,
164 cancel_token: CancellationToken,
166 paused: Arc<AtomicBool>,
168 resume_notify: Arc<Notify>,
170 resolved_assets: ResolvedAssets,
172 trace_config: TraceConfig,
174 cli_renderer: Option<crate::display::RunRenderer>,
177}
178
179impl Runner {
180 pub fn new(workflow: AnalyzedWorkflow) -> Result<Self, NikaError> {
181 Self::with_event_log(workflow, EventLog::new())
182 }
183
184 pub fn with_event_log(
194 workflow: AnalyzedWorkflow,
195 event_log: EventLog,
196 ) -> Result<Self, NikaError> {
197 let flow_graph = Dag::from_analyzed(&workflow).map_err(|e| NikaError::ValidationError {
198 reason: format!("DAG construction failed: {e}"),
199 })?;
200 flow_graph.detect_cycles()?;
201 let datastore = RunContext::new();
202
203 let mcp_configs = lower_mcp_servers(workflow.mcp_servers.clone());
205 let provider = workflow.provider.as_deref().unwrap_or("claude");
206
207 let executor = TaskExecutor::new(
208 provider,
209 workflow.model.as_deref(),
210 mcp_configs,
211 event_log.clone(),
212 )?;
213
214 let generation_id = format!("gen-{}", uuid::Uuid::new_v4());
216
217 Ok(Self {
218 workflow,
219 flow_graph,
220 datastore,
221 executor,
222 event_log,
223 generation_id,
224 quiet: false,
225 cancel_token: CancellationToken::new(),
226 paused: Arc::new(AtomicBool::new(false)),
227 resume_notify: Arc::new(Notify::new()),
228 resolved_assets: ResolvedAssets::default(),
229 trace_config: TraceConfig::default(),
230 cli_renderer: None,
231 })
232 }
233
234 pub fn quiet(mut self) -> Self {
239 self.quiet = true;
240 self
241 }
242
243 pub fn with_detail_level(mut self, detail: crate::display::DetailLevel) -> Self {
248 let effective_detail = if self.quiet {
249 crate::display::DetailLevel::Min
250 } else {
251 detail
252 };
253 self.cli_renderer = Some(crate::display::RunRenderer::auto(effective_detail));
254 self
255 }
256
257 pub fn with_classic_renderer(mut self, detail: crate::display::DetailLevel) -> Self {
259 let effective_detail = if self.quiet {
260 crate::display::DetailLevel::Min
261 } else {
262 detail
263 };
264 self.cli_renderer = Some(crate::display::RunRenderer::classic(effective_detail));
265 self
266 }
267
268 pub fn datastore(&self) -> &RunContext {
272 &self.datastore
273 }
274
275 pub fn with_initial_context(self, key: &str, context: Value) -> Self {
292 use crate::store::TaskResult;
293 use crate::util::intern;
294
295 self.datastore.insert(
296 intern(key),
297 TaskResult::success(context, std::time::Duration::ZERO),
298 );
299 self
300 }
301
302 pub fn with_permission_mode(self, mode: crate::tools::PermissionMode) -> Self {
307 self.executor.set_permission_mode(mode);
308 self
309 }
310
311 pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
318 self.executor = self.executor.with_cancel_token(token.clone());
319 self.cancel_token = token;
320 self
321 }
322
323 pub fn cancel_token(&self) -> CancellationToken {
327 self.cancel_token.clone()
328 }
329
330 pub fn is_cancelled(&self) -> bool {
332 self.cancel_token.is_cancelled()
333 }
334
335 pub fn pause(&self) {
340 self.paused.store(true, Ordering::SeqCst);
341 self.event_log.emit(EventKind::WorkflowPaused);
342 }
343
344 pub fn resume(&self) {
346 self.paused.store(false, Ordering::SeqCst);
347 self.resume_notify.notify_one();
348 self.event_log.emit(EventKind::WorkflowResumed);
349 }
350
351 pub fn is_paused(&self) -> bool {
353 self.paused.load(Ordering::SeqCst)
354 }
355
356 pub fn pause_handles(&self) -> (Arc<AtomicBool>, Arc<Notify>) {
361 (Arc::clone(&self.paused), Arc::clone(&self.resume_notify))
362 }
363
364 pub fn event_log(&self) -> &EventLog {
366 &self.event_log
367 }
368
369 fn get_ready_tasks(&self) -> Vec<&AnalyzedTask> {
374 self.workflow
375 .tasks
376 .iter()
377 .filter(|task| {
378 if self.datastore.contains(&task.name) {
380 return false;
381 }
382
383 let deps = self.flow_graph.get_dependencies(&task.name);
385 for dep in deps.iter() {
386 if let Some(succeeded) = self.datastore.is_completed_successfully(dep.as_ref())
388 {
389 if !succeeded {
391 self.datastore.insert(
393 intern(&task.name),
394 TaskResult::dependency_failed(dep.as_ref()),
395 );
396
397 self.event_log.emit(EventKind::TaskSkipped {
399 task_id: Arc::from(task.name.as_str()),
400 reason: format!("dependency '{}' failed", dep.as_ref()),
401 });
402
403 debug!(
404 task_id = %task.name,
405 dependency = %dep.as_ref(),
406 "Task blocked due to failed dependency"
407 );
408
409 return false;
410 }
411 } else {
412 return false;
414 }
415 }
416
417 true
419 })
420 .collect()
421 }
422
423 fn all_done(&self) -> bool {
425 self.workflow
426 .tasks
427 .iter()
428 .all(|t| self.datastore.contains(&t.name))
429 }
430
431 fn get_pending_tasks(&self) -> Vec<String> {
435 self.workflow
436 .tasks
437 .iter()
438 .filter(|task| !self.datastore.contains(&task.name))
439 .map(|t| t.name.clone())
440 .collect()
441 }
442
443 fn find_root_failure(&self) -> Option<String> {
445 for task in &self.workflow.tasks {
446 if let Some(result) = self.datastore.get(&task.name) {
447 if matches!(result.status, crate::store::TaskOutcome::Failed(_)) {
449 return Some(task.name.clone());
450 }
451 }
452 }
453 None
454 }
455
456 fn get_final_output(&self) -> Option<String> {
462 if let Some(deepest_task) = self.flow_graph.get_deepest_final_task() {
464 if let Some(result) = self.datastore.get(deepest_task.as_ref()) {
465 if result.is_success() {
466 return Some(result.output_str().into_owned());
467 }
468 }
469 }
470
471 let final_tasks = self.flow_graph.get_final_tasks();
473 for task_id in final_tasks {
474 if let Some(result) = self.datastore.get(&task_id) {
475 if result.is_success() {
476 return Some(result.output_str().into_owned());
477 }
478 }
479 }
480 None
481 }
482
483 fn write_trace(&self) -> Option<String> {
488 let trace_path = match TraceWriter::new(&self.generation_id) {
489 Ok(trace_writer) => {
490 if let Err(e) = trace_writer.write_all(&self.event_log) {
491 tracing::warn!(error = %e, "Failed to write trace");
492 None
493 } else {
494 let path = trace_writer.path().display().to_string();
495 tracing::info!(path = %path, "Trace written");
496 Some(path)
497 }
498 }
499 Err(e) => {
500 tracing::warn!(error = %e, "Failed to create trace writer — traces disabled for this run");
501 None
502 }
503 };
504
505 prune_traces(
507 self.trace_config.max_traces,
508 self.trace_config.retention_days,
509 );
510 trace_path
511 }
512
513 fn verify_media_integrity(&self) -> usize {
519 let mut warnings = 0;
520 let mut checked: u64 = 0;
521 for (task_id, result) in self.datastore.iter_results() {
522 for media_ref in &result.media {
523 checked += 1;
524 if !media_ref.path.exists() {
525 tracing::warn!(
526 task_id = %task_id,
527 hash = %media_ref.hash,
528 path = %media_ref.path.display(),
529 "Media integrity: CAS file missing"
530 );
531 warnings += 1;
532 continue;
533 }
534 match std::fs::metadata(&media_ref.path) {
535 Ok(meta) => {
536 if meta.len() != media_ref.size_bytes {
537 tracing::warn!(
538 task_id = %task_id,
539 hash = %media_ref.hash,
540 expected = media_ref.size_bytes,
541 actual = meta.len(),
542 "Media integrity: size mismatch"
543 );
544 warnings += 1;
545 }
546 }
547 Err(e) => {
548 tracing::warn!(
549 task_id = %task_id,
550 hash = %media_ref.hash,
551 error = %e,
552 "Media integrity: failed to stat CAS file"
553 );
554 warnings += 1;
555 }
556 }
557 }
558 }
559
560 if checked > 0 {
562 self.event_log.emit(EventKind::MediaIntegrityCheck {
563 checked,
564 warnings: warnings as u64,
565 });
566 }
567
568 warnings
569 }
570
571 fn get_retry_config(task: &AnalyzedTask) -> Option<(Value, u8, InferParams)> {
579 let infer_action = match &task.action {
581 AnalyzedTaskAction::Infer(infer) => infer,
582 _ => return None,
583 };
584
585 let output = task.output.as_ref()?;
587 if output.format != AnalyzedOutputFormat::Json {
588 return None;
589 }
590
591 let schema = output.schema.as_ref()?.clone();
593
594 let structured = task.structured.as_ref()?;
596 let max_retries = structured.max_retries.unwrap_or(0);
597 if max_retries == 0 {
598 return None;
599 }
600
601 let infer_params = InferParams {
603 prompt: infer_action.prompt.clone(),
604 provider: task.provider.clone(),
605 model: task.model.clone(),
606 temperature: infer_action.temperature,
607 max_tokens: infer_action.max_tokens,
608 system: infer_action.system.clone(),
609 response_format: None,
610 extended_thinking: None,
611 thinking_budget: None,
612 content: infer_action
613 .content
614 .as_ref()
615 .map(|parts| parts.iter().cloned().map(Into::into).collect()),
616 guardrails: Vec::new(),
617 };
618
619 Some((schema, max_retries, infer_params))
620 }
621
622 #[allow(clippy::too_many_arguments)]
632 async fn execute_with_retry(
633 task_id: &Arc<str>,
634 original_infer: InferParams,
635 schema: &Value,
636 max_retries: u8,
637 bindings: &ResolvedBindings,
638 datastore: &RunContext,
639 executor: &TaskExecutor,
640 event_log: &EventLog,
641 start: Instant,
642 output_policy: Option<&OutputPolicy>,
643 ) -> TaskResult {
644 let mut current_infer = original_infer;
645 let original_prompt = current_infer.prompt.clone();
646 let mut attempts = 0u8;
647
648 let compiled_validator = jsonschema::validator_for(schema).ok();
651
652 loop {
653 if executor.is_cancelled() {
655 let reason = "cancelled during structured output retry".to_string();
656 event_log.emit(EventKind::TaskFailed {
657 task_id: Arc::clone(task_id),
658 error: reason.clone(),
659 error_code: Some("NIKA-097".to_string()),
660 duration_ms: start.elapsed().as_millis() as u64,
661 });
662 return TaskResult::failed(reason, start.elapsed());
663 }
664 attempts += 1;
665
666 let action = TaskAction::Infer {
668 infer: current_infer.clone(),
669 };
670
671 let result = executor
673 .execute(task_id, &action, bindings, datastore, output_policy)
674 .await;
675 let duration = start.elapsed();
676
677 match result {
678 Ok(output) => {
679 let json_value = match extract_json(&output) {
681 Ok(v) => v,
682 Err(e) => {
683 if attempts > max_retries {
684 event_log.emit(EventKind::TaskFailed {
686 task_id: Arc::clone(task_id),
687 error: format!(
688 "NIKA-060: Invalid JSON after {} attempts: {}",
689 attempts, e
690 ),
691 duration_ms: duration.as_millis() as u64,
692 error_code: Some("NIKA-060".to_string()),
693 });
694 return TaskResult::failed(
695 format!(
696 "NIKA-060: Invalid JSON output after {} attempts: {}",
697 attempts, e
698 ),
699 duration,
700 );
701 }
702
703 tracing::debug!(
705 task_id = %task_id,
706 attempt = attempts,
707 "JSON parsing failed, retrying"
708 );
709 current_infer.prompt = Self::build_retry_prompt(
710 &original_prompt,
711 schema,
712 &output,
713 &format!("JSON parsing failed: {}", e),
714 );
715 continue;
716 }
717 };
718
719 let compiled = match compiled_validator.as_ref() {
721 Some(c) => c,
722 None => {
723 event_log.emit(EventKind::TaskFailed {
724 task_id: Arc::clone(task_id),
725 error: "Invalid schema (failed to compile)".to_string(),
726 duration_ms: duration.as_millis() as u64,
727 error_code: Some("NIKA-061".to_string()),
728 });
729 return TaskResult::failed(
730 "Invalid inline schema (compilation failed)".to_string(),
731 duration,
732 );
733 }
734 };
735
736 let errors: Vec<_> = compiled.iter_errors(&json_value).collect();
737 if errors.is_empty() {
738 event_log.emit(EventKind::TaskCompleted {
740 task_id: Arc::clone(task_id),
741 output: Arc::new(json_value.clone()),
742 duration_ms: duration.as_millis() as u64,
743 });
744 return TaskResult::success(json_value, duration);
745 }
746
747 if attempts > max_retries {
749 let error_feedback = format_validation_errors(&json_value, schema);
750 event_log.emit(EventKind::TaskFailed {
751 task_id: Arc::clone(task_id),
752 error: format!(
753 "Schema validation failed after {} attempts:\n{}",
754 attempts, error_feedback
755 ),
756 duration_ms: duration.as_millis() as u64,
757 error_code: Some("NIKA-061".to_string()),
758 });
759 return TaskResult::failed(
760 format!(
761 "NIKA-061: Schema validation failed after {} attempts:\n{}",
762 attempts, error_feedback
763 ),
764 duration,
765 );
766 }
767
768 let error_feedback = format_validation_errors(&json_value, schema);
770 tracing::debug!(
771 task_id = %task_id,
772 attempt = attempts,
773 errors = %error_feedback,
774 "Schema validation failed, retrying"
775 );
776 current_infer.prompt = Self::build_retry_prompt(
777 &original_prompt,
778 schema,
779 &output,
780 &error_feedback,
781 );
782 }
783 Err(e) => {
784 event_log.emit(EventKind::TaskFailed {
786 task_id: Arc::clone(task_id),
787 error: e.to_string(),
788 duration_ms: duration.as_millis() as u64,
789 error_code: Some(e.code().to_string()),
790 });
791 return TaskResult::failed(e.to_string(), duration);
792 }
793 }
794 }
795 }
796
797 fn build_retry_prompt(
799 original_prompt: &str,
800 schema: &Value,
801 previous_output: &str,
802 error_feedback: &str,
803 ) -> String {
804 format!(
805 r#"{original_prompt}
806
807---
808RETRY: Your previous response did not match the required JSON schema.
809
810REQUIRED SCHEMA:
811{schema}
812
813YOUR PREVIOUS OUTPUT:
814{previous_output}
815
816VALIDATION ERRORS:
817{error_feedback}
818
819Please provide a corrected JSON response that strictly matches the schema."#,
820 original_prompt = original_prompt,
821 schema = serde_json::to_string_pretty(schema).unwrap_or_else(|_| schema.to_string()),
822 previous_output = previous_output,
823 error_feedback = error_feedback
824 )
825 }
826
827 #[allow(clippy::too_many_arguments)] async fn execute_task_iteration(
845 task: Arc<AnalyzedTask>,
846 task_id: Arc<str>,
847 parent_task_id: Arc<str>,
848 datastore: RunContext,
849 executor: TaskExecutor,
850 event_log: EventLog,
851 for_each_binding: Option<(String, Value, usize)>,
852 workflow_artifacts: Option<ArtifactsConfig>,
853 base_path: PathBuf,
854 ) -> IterationResult {
855 let start = Instant::now();
856
857 let for_each_info = for_each_binding
859 .as_ref()
860 .map(|(_, _, idx)| (Arc::clone(&parent_task_id), *idx));
861
862 let (mut bindings, binding_events) = match ResolvedBindings::from_with_spec_traced(
864 Some(&task.with_spec),
865 &datastore,
866 &task_id,
867 ) {
868 Ok(result) => result,
869 Err(e) => {
870 let duration = start.elapsed();
871 event_log.emit(EventKind::TaskFailed {
872 task_id: Arc::clone(&task_id),
873 error: e.to_string(),
874 duration_ms: duration.as_millis() as u64,
875 error_code: Some(e.code().to_string()),
876 });
877 return IterationResult {
878 store_id: task_id,
879 result: TaskResult::failed(e.to_string(), duration),
880 for_each_info,
881 artifact_paths: vec![],
882 };
883 }
884 };
885 for event in binding_events {
887 event_log.emit(event);
888 }
889
890 if let Some((var_name, value, _idx)) = for_each_binding {
892 bindings.set(&var_name, value);
893 }
894
895 event_log.emit(EventKind::TaskStarted {
897 task_id: Arc::clone(&task_id),
898 verb: Arc::from(task.action.verb_name()),
899 inputs: bindings.to_value(),
900 });
901
902 let lowered_action = lower_action(&task.action, &task.provider, &task.model, &task.retry);
905 let lowered_output = task
906 .output
907 .as_ref()
908 .map(|o: &AnalyzedOutput| lower_output(o.clone()));
909
910 let effective_output = if lowered_output.is_some() {
915 lowered_output
916 } else {
917 task.structured.as_ref().map(|spec| spec.to_output_policy())
918 };
919
920 let retry_config = Self::get_retry_config(&task);
922
923 let mut task_result = if let Some((schema, max_retries, original_infer)) = retry_config {
925 Self::execute_with_retry(
926 &task_id,
927 original_infer,
928 &schema,
929 max_retries,
930 &bindings,
931 &datastore,
932 &executor,
933 &event_log,
934 start,
935 effective_output.as_ref(),
936 )
937 .await
938 } else {
939 let result = executor
941 .execute(
942 &task_id,
943 &lowered_action,
944 &bindings,
945 &datastore,
946 effective_output.as_ref(),
947 )
948 .await;
949 let duration = start.elapsed();
950
951 match result {
952 Ok(output) => {
953 let executor_already_validated = effective_output.is_some();
960 let final_output = if !executor_already_validated {
961 if let Some(ref structured_spec) = task.structured {
962 let mut engine = StructuredOutputEngine::new(
963 structured_spec.clone(),
964 Arc::new(event_log.clone()),
965 );
966 match engine.validate(&task_id, &output).await {
967 Ok(result) => {
968 debug!(
969 task_id = %task_id,
970 layer = result.layer,
971 layer_name = %result.layer_name,
972 total_attempts = result.total_attempts,
973 "Structured output validation succeeded (runner fallback)"
974 );
975 result.value.to_string()
976 }
977 Err(e) => {
978 let _ = datastore.take_media(&task_id);
980 event_log.emit(EventKind::TaskFailed {
981 task_id: Arc::clone(&task_id),
982 error: e.to_string(),
983 duration_ms: duration.as_millis() as u64,
984 error_code: Some(e.code().to_string()),
985 });
986 return IterationResult {
987 store_id: task_id,
988 result: TaskResult::failed(e.to_string(), duration),
989 for_each_info,
990 artifact_paths: vec![],
991 };
992 }
993 }
994 } else {
995 output
996 }
997 } else {
998 output
999 };
1000
1001 let tr =
1002 make_task_result(final_output, effective_output.as_ref(), duration).await;
1003 let tr = tr.with_media(datastore.take_media(&task_id));
1005 if tr.is_success() {
1006 event_log.emit(EventKind::TaskCompleted {
1007 task_id: Arc::clone(&task_id),
1008 output: Arc::clone(&tr.output),
1009 duration_ms: duration.as_millis() as u64,
1010 });
1011 } else {
1012 event_log.emit(EventKind::TaskFailed {
1013 task_id: Arc::clone(&task_id),
1014 error: tr.error().unwrap_or("Unknown error").to_string(),
1015 duration_ms: duration.as_millis() as u64,
1016 error_code: Some("NIKA-060".to_string()),
1017 });
1018 }
1019 tr
1020 }
1021 Err(e) => {
1022 let _ = datastore.take_media(&task_id);
1024 event_log.emit(EventKind::TaskFailed {
1025 task_id: Arc::clone(&task_id),
1026 error: e.to_string(),
1027 duration_ms: duration.as_millis() as u64,
1028 error_code: Some(e.code().to_string()),
1029 });
1030 TaskResult::failed(e.to_string(), duration)
1031 }
1032 }
1033 };
1034
1035 let mut artifact_paths = Vec::new();
1037 if task_result.is_success() {
1038 if let Some(ref artifact_spec) = task.artifact {
1039 let output_content = task_result.output_str().into_owned();
1040
1041 let artifact_result = process_task_artifacts(
1042 &task_id,
1043 &output_content,
1044 artifact_spec,
1045 workflow_artifacts.as_ref(),
1046 &base_path,
1047 Some(&event_log),
1048 &bindings,
1049 &datastore,
1050 task_result.media.as_slice(),
1051 )
1052 .await;
1053
1054 if artifact_result.written > 0 {
1055 debug!(
1056 task_id = %task_id,
1057 artifacts_written = artifact_result.written,
1058 "Artifacts written"
1059 );
1060 }
1061 artifact_paths = artifact_result.paths;
1062
1063 if !artifact_result.errors.is_empty() {
1064 let error_msgs: Vec<String> = artifact_result
1065 .errors
1066 .iter()
1067 .map(|err| {
1068 tracing::error!(
1069 task_id = %task_id,
1070 error = %err,
1071 "Artifact write failed"
1072 );
1073 event_log.emit(EventKind::ArtifactFailed {
1074 task_id: Arc::clone(&task_id),
1075 path: String::new(),
1076 reason: err.to_string(),
1077 });
1078 err.to_string()
1079 })
1080 .collect();
1081 let error = format!("Artifact write errors: {}", error_msgs.join("; "));
1082 event_log.emit(EventKind::TaskFailed {
1084 task_id: Arc::clone(&task_id),
1085 error: error.clone(),
1086 error_code: Some("NIKA-281".to_string()),
1087 duration_ms: start.elapsed().as_millis() as u64,
1088 });
1089 task_result = TaskResult::failed(error, start.elapsed());
1090 }
1091 }
1092 }
1093
1094 IterationResult {
1095 store_id: task_id,
1096 result: task_result,
1097 for_each_info,
1098 artifact_paths,
1099 }
1100 }
1101
1102 #[instrument(skip(self), fields(workflow_tasks = self.workflow.tasks.len()))]
1104 pub async fn run(&mut self) -> Result<String, NikaError> {
1105 let workflow_start = Instant::now();
1106 info!("Starting workflow execution");
1107
1108 if self.cancel_token.is_cancelled() {
1110 let duration = workflow_start.elapsed();
1111 self.event_log.emit(EventKind::WorkflowAborted {
1112 reason: "Workflow cancelled before start".to_string(),
1113 duration_ms: duration.as_millis() as u64,
1114 running_tasks: vec![],
1115 });
1116 self.write_trace();
1117 return Err(NikaError::WorkflowCancelled {
1118 phase: "before start".to_string(),
1119 });
1120 }
1121
1122 let base_path = std::env::current_dir().unwrap_or_else(|e| {
1124 tracing::warn!(error = %e, "Failed to get current directory, using '.'");
1125 std::path::PathBuf::from(".")
1126 });
1127
1128 self.datastore.set_workspace_root(base_path.clone());
1130
1131 let _lockfile_guard = LockfileGuard::create(
1133 base_path
1134 .join(".nika")
1135 .join("media")
1136 .join("store")
1137 .join(".nika-run.lock"),
1138 );
1139
1140 if !self.workflow.context_files.is_empty() {
1141 let loaded_context =
1142 load_context_analyzed(&self.workflow.context_files, &base_path).await?;
1143 self.datastore.set_context(loaded_context);
1144 debug!("Loaded {} context files", self.workflow.context_files.len());
1145 }
1146
1147 if !self.workflow.inputs.is_empty() {
1149 let inputs_map: rustc_hash::FxHashMap<String, serde_json::Value> = self
1150 .workflow
1151 .inputs
1152 .iter()
1153 .map(|(k, v)| (k.clone(), v.clone()))
1154 .collect();
1155 self.datastore.set_inputs(inputs_map);
1156 debug!("Loaded {} input parameters", self.workflow.inputs.len());
1157 }
1158
1159 if self.workflow.agents.is_some() {
1161 self.resolved_assets = resolve_assets_analyzed(&self.workflow, &base_path).await?;
1162 debug!(
1163 agents = self.resolved_assets.agents.len(),
1164 skills = self.resolved_assets.skills.len(),
1165 "Resolved workflow assets"
1166 );
1167 }
1168
1169 if !self.workflow.skills_map.is_empty() {
1172 self.executor = self
1173 .executor
1174 .clone()
1175 .with_skills(self.workflow.skills_map.clone(), base_path.clone());
1176 debug!(
1177 skills_count = self.workflow.skills_map.len(),
1178 "Wired skills mapping into executor"
1179 );
1180 }
1181
1182 let total_tasks = self.workflow.tasks.len();
1183 let mut _completed = 0;
1184
1185 self.event_log.emit(EventKind::WorkflowStarted {
1187 task_count: total_tasks,
1188 generation_id: self.generation_id.clone(),
1189 workflow_hash: self.workflow.compute_hash(),
1190 nika_version: env!("CARGO_PKG_VERSION").to_string(),
1191 });
1192
1193 if !self.quiet {
1194 println!();
1195
1196 let has_observable_output = self
1198 .workflow
1199 .tasks
1200 .iter()
1201 .any(|t| t.output.is_some() || t.artifact.is_some());
1202 if !has_observable_output && total_tasks > 1 {
1203 println!(
1204 " {} {}\n",
1205 "⚠".yellow(),
1206 "No tasks have output: or artifact: config — results won't be persisted"
1207 .yellow()
1208 );
1209 }
1210 }
1211
1212 let cached_depths = if total_tasks > 1 {
1215 let nodes: Vec<&str> = self
1216 .workflow
1217 .tasks
1218 .iter()
1219 .map(|t| t.name.as_str())
1220 .collect();
1221 let edges: Vec<(&str, &str)> = self
1222 .workflow
1223 .tasks
1224 .iter()
1225 .flat_map(|task| {
1226 task.depends_on.iter().filter_map(|dep_id| {
1227 self.workflow
1228 .task_table
1229 .get_name(*dep_id)
1230 .map(|dep_name| (dep_name, task.name.as_str()))
1231 })
1232 })
1233 .collect();
1234 Some(crate::dag::flow::compute_layers(&nodes, &edges))
1235 } else {
1236 None
1237 };
1238
1239 if let Some(ref mut renderer) = self.cli_renderer {
1241 if let Some(ref depths) = cached_depths {
1242 use crate::display::dag::{StaticDagEdge, StaticDagTask};
1243 let dag_tasks: Vec<StaticDagTask> = self
1244 .workflow
1245 .tasks
1246 .iter()
1247 .map(|t| StaticDagTask {
1248 id: t.name.clone(),
1249 verb: t.action.verb_name().to_string(),
1250 layer: depths[t.name.as_str()],
1251 })
1252 .collect();
1253 let total_deps: usize =
1254 self.workflow.tasks.iter().map(|t| t.depends_on.len()).sum();
1255 let mut dag_edges = Vec::with_capacity(total_deps);
1256 for task in &self.workflow.tasks {
1257 for dep_id in &task.depends_on {
1258 if let Some(dep_name) = self.workflow.task_table.get_name(*dep_id) {
1259 dag_edges.push(StaticDagEdge {
1260 from: dep_name.to_string(),
1261 to: task.name.clone(),
1262 });
1263 }
1264 }
1265 }
1266 crate::display::dag::print_static_dag(&dag_tasks, &dag_edges);
1267 println!("{}", "\u{254C}".repeat(69).dimmed());
1268 println!();
1269 let task_layers: std::collections::HashMap<Arc<str>, usize> = self
1270 .workflow
1271 .tasks
1272 .iter()
1273 .map(|t| (Arc::from(t.name.as_str()), depths[t.name.as_str()]))
1274 .collect();
1275 renderer.set_task_layers(task_layers);
1276 }
1277
1278 let task_ids: Vec<String> =
1280 self.workflow.tasks.iter().map(|t| t.name.clone()).collect();
1281 let task_deps: std::collections::HashMap<String, Vec<String>> = self
1282 .workflow
1283 .tasks
1284 .iter()
1285 .map(|t| {
1286 let deps: Vec<String> = t
1287 .depends_on
1288 .iter()
1289 .filter_map(|dep_id| {
1290 self.workflow
1291 .task_table
1292 .get_name(*dep_id)
1293 .map(|s| s.to_string())
1294 })
1295 .collect();
1296 (t.name.clone(), deps)
1297 })
1298 .collect();
1299 renderer.init_tasks(&task_ids, &task_deps);
1300 }
1301
1302 loop {
1303 if self.cancel_token.is_cancelled() {
1305 let duration = workflow_start.elapsed();
1306 let running_tasks: Vec<Arc<str>> = self
1308 .workflow
1309 .tasks
1310 .iter()
1311 .filter(|t| !self.datastore.contains(&t.name))
1312 .map(|t| Arc::from(t.name.as_str()))
1313 .collect();
1314
1315 self.event_log.emit(EventKind::WorkflowAborted {
1316 reason: "Workflow cancelled by user".to_string(),
1317 duration_ms: duration.as_millis() as u64,
1318 running_tasks,
1319 });
1320 self.write_trace();
1321 return Err(NikaError::WorkflowCancelled {
1322 phase: "by user".to_string(),
1323 });
1324 }
1325
1326 while self.paused.load(Ordering::SeqCst) {
1329 tokio::select! {
1330 biased;
1331 _ = self.cancel_token.cancelled() => {
1332 let duration = workflow_start.elapsed();
1334 let running_tasks: Vec<Arc<str>> = self
1335 .workflow
1336 .tasks
1337 .iter()
1338 .filter(|t| !self.datastore.contains(&t.name))
1339 .map(|t| Arc::from(t.name.as_str()))
1340 .collect();
1341
1342 self.event_log.emit(EventKind::WorkflowAborted {
1343 reason: "Workflow cancelled while paused".to_string(),
1344 duration_ms: duration.as_millis() as u64,
1345 running_tasks,
1346 });
1347 self.write_trace();
1348 return Err(NikaError::WorkflowCancelled {
1349 phase: "while paused".to_string(),
1350 });
1351 }
1352 _ = self.resume_notify.notified() => {
1353 }
1355 }
1356 }
1357
1358 let mut renderer = self.cli_renderer.take();
1359
1360 let ready = self.get_ready_tasks();
1361
1362 if ready.is_empty() {
1364 self.cli_renderer = renderer;
1365
1366 if self.all_done() {
1367 let failed_tasks: Vec<String> = self
1371 .workflow
1372 .tasks
1373 .iter()
1374 .filter(|t| self.datastore.is_failed(&t.name))
1375 .map(|t| t.name.clone())
1376 .collect();
1377
1378 if !failed_tasks.is_empty() {
1379 let root_failure = self.find_root_failure();
1380 let dep_failed_count = failed_tasks
1381 .iter()
1382 .filter(|t| self.datastore.is_dependency_failed(t))
1383 .count();
1384
1385 self.event_log.emit(EventKind::WorkflowFailed {
1386 error: format!(
1387 "{} task(s) failed ({} direct, {} from dependency chain)",
1388 failed_tasks.len(),
1389 failed_tasks.len() - dep_failed_count,
1390 dep_failed_count,
1391 ),
1392 failed_task: root_failure.clone().map(Arc::from),
1393 });
1394 self.write_trace();
1395 return Err(NikaError::DependencyChainFailed {
1396 count: failed_tasks.len(),
1397 blocked_tasks: failed_tasks,
1398 root_failure,
1399 });
1400 }
1401 break;
1402 }
1403
1404 let pending = self.get_pending_tasks();
1406 if pending.is_empty() {
1407 break;
1409 }
1410
1411 let blocked_by_dep_failure: Vec<String> = self
1413 .workflow
1414 .tasks
1415 .iter()
1416 .filter(|t| self.datastore.is_dependency_failed(&t.name))
1417 .map(|t| t.name.clone())
1418 .collect();
1419
1420 if !blocked_by_dep_failure.is_empty() {
1421 let root_failure = self.find_root_failure();
1423
1424 self.event_log.emit(EventKind::WorkflowFailed {
1425 error: format!(
1426 "Dependency chain failed: {} task(s) blocked by failed dependencies",
1427 blocked_by_dep_failure.len()
1428 ),
1429 failed_task: root_failure.clone().map(Arc::from),
1430 });
1431 self.write_trace();
1432 return Err(NikaError::DependencyChainFailed {
1433 count: blocked_by_dep_failure.len(),
1434 blocked_tasks: blocked_by_dep_failure,
1435 root_failure,
1436 });
1437 }
1438
1439 self.event_log.emit(EventKind::WorkflowFailed {
1442 error: "Deadlock: no tasks ready but workflow not complete".to_string(),
1443 failed_task: None,
1444 });
1445 self.write_trace();
1446 return Err(NikaError::RuntimeDeadlock {
1447 details:
1448 "no tasks ready but workflow not complete. Check for circular dependencies."
1449 .to_string(),
1450 });
1451 }
1452
1453 let mut join_set = JoinSet::new();
1455
1456 let mut for_each_cancel_tokens: rustc_hash::FxHashMap<Arc<str>, CancellationToken> =
1460 rustc_hash::FxHashMap::default();
1461
1462 let workflow_artifacts = self.workflow.artifacts.clone();
1464 let artifact_base_path = base_path.clone();
1465
1466 for task in ready {
1467 let task_id = intern(&task.name);
1468
1469 let deps = self.flow_graph.get_dependencies(&task.name);
1471 let sched_kind = EventKind::TaskScheduled {
1472 task_id: Arc::clone(&task_id),
1473 dependencies: deps.to_vec(),
1474 };
1475 if let Some(ref mut r) = renderer {
1476 r.render_kind(&sched_kind);
1477 }
1478 self.event_log.emit(sched_kind);
1479
1480 let for_each_items: Option<Vec<Value>> = if let Some(decompose) =
1483 task.decompose.as_ref()
1484 {
1485 debug!(
1486 task_id = %task.name,
1487 strategy = ?decompose.strategy,
1488 traverse = %decompose.traverse,
1489 "Expanding decompose modifier"
1490 );
1491 self.event_log.emit(EventKind::DecomposeStarted {
1492 task_id: Arc::from(task.name.as_str()),
1493 strategy: format!("{:?}", decompose.strategy).to_lowercase(),
1494 });
1495 let bindings = match ResolvedBindings::from_with_spec(
1497 Some(&task.with_spec),
1498 &self.datastore,
1499 ) {
1500 Ok(b) => b,
1501 Err(e) => {
1502 tracing::error!(
1503 task_id = %task.name,
1504 error = %e,
1505 "Failed to resolve bindings for decompose"
1506 );
1507 self.datastore.insert(
1508 intern(&task.name),
1509 TaskResult::failed(
1510 format!("Decompose binding resolution failed: {e}"),
1511 std::time::Duration::ZERO,
1512 ),
1513 );
1514 continue;
1515 }
1516 };
1517 let decompose_start = Instant::now();
1519 let decompose_result = tokio::time::timeout(
1520 DECOMPOSE_TIMEOUT,
1521 self.executor
1522 .expand_decompose(decompose, &bindings, &self.datastore),
1523 )
1524 .await;
1525
1526 match decompose_result {
1527 Ok(Ok(items)) => {
1528 self.event_log.emit(EventKind::DecomposeCompleted {
1529 task_id: Arc::from(task.name.as_str()),
1530 strategy: format!("{:?}", decompose.strategy).to_lowercase(),
1531 item_count: items.len(),
1532 duration_ms: decompose_start.elapsed().as_millis() as u64,
1533 });
1534 Some(items)
1535 }
1536 Ok(Err(e)) => {
1537 self.event_log.emit(EventKind::DecomposeCompleted {
1539 task_id: Arc::from(task.name.as_str()),
1540 strategy: format!("{:?}", decompose.strategy).to_lowercase(),
1541 item_count: 0,
1542 duration_ms: decompose_start.elapsed().as_millis() as u64,
1543 });
1544 self.datastore.insert(
1545 intern(&task.name),
1546 TaskResult::failed(e.to_string(), std::time::Duration::ZERO),
1547 );
1548 continue;
1549 }
1550 Err(_timeout) => {
1551 self.event_log.emit(EventKind::DecomposeCompleted {
1553 task_id: Arc::from(task.name.as_str()),
1554 strategy: format!("{:?}", decompose.strategy).to_lowercase(),
1555 item_count: 0,
1556 duration_ms: decompose_start.elapsed().as_millis() as u64,
1557 });
1558 let timeout_error = NikaError::DecomposeTimeout {
1559 task_id: task.name.clone(),
1560 timeout_secs: DECOMPOSE_TIMEOUT.as_secs(),
1561 };
1562 self.datastore.insert(
1563 intern(&task.name),
1564 TaskResult::failed(timeout_error.to_string(), DECOMPOSE_TIMEOUT),
1565 );
1566 continue;
1567 }
1568 }
1569 } else if let Some(ref for_each) = task.for_each {
1570 let items_str = &for_each.items;
1572
1573 if for_each.is_binding() {
1574 let bindings = match ResolvedBindings::from_with_spec(
1576 Some(&task.with_spec),
1577 &self.datastore,
1578 ) {
1579 Ok(b) => b,
1580 Err(e) => {
1581 tracing::error!(
1582 task_id = %task.name,
1583 error = %e,
1584 "Failed to resolve bindings for for_each"
1585 );
1586 self.datastore.insert(
1587 intern(&task.name),
1588 TaskResult::failed(
1589 format!("for_each binding resolution failed: {e}"),
1590 std::time::Duration::ZERO,
1591 ),
1592 );
1593 continue;
1594 }
1595 };
1596
1597 if let Some(alias) = items_str.strip_prefix('$') {
1598 if alias.starts_with("inputs.") {
1600 match self.datastore.resolve_input_path(alias) {
1601 Some(value) => match value_to_array(&value) {
1602 Some(items) => Some(items),
1603 None => {
1604 self.datastore.insert(
1605 intern(&task.name),
1606 TaskResult::failed(
1607 format!(
1608 "for_each binding '${}' resolved to non-array value",
1609 alias
1610 ),
1611 std::time::Duration::ZERO,
1612 ),
1613 );
1614 continue;
1615 }
1616 },
1617 None => {
1618 self.datastore.insert(
1619 intern(&task.name),
1620 TaskResult::failed(
1621 format!(
1622 "for_each input '{}' not found in workflow inputs",
1623 alias
1624 ),
1625 std::time::Duration::ZERO,
1626 ),
1627 );
1628 continue;
1629 }
1630 }
1631 } else {
1632 let mut segments = alias.split('.');
1634 let Some(base_alias) = segments.next() else {
1635 self.datastore.insert(
1636 intern(&task.name),
1637 TaskResult::failed(
1638 "for_each: empty alias after '$' prefix".to_string(),
1639 std::time::Duration::ZERO,
1640 ),
1641 );
1642 continue;
1643 };
1644
1645 let base_result = bindings
1647 .get_resolved(base_alias, &self.datastore)
1648 .or_else(|_| {
1649 self.datastore
1651 .get_output(base_alias)
1652 .map(|arc| arc.as_ref().clone())
1653 .ok_or_else(|| NikaError::BindingNotFound {
1654 alias: base_alias.to_string(),
1655 })
1656 });
1657 match base_result {
1658 Ok(base_value) => {
1659 let parsed_value;
1661 let working_value: &Value = if let Some(v) =
1662 crate::binding::jsonpath::try_parse_json_str(
1663 &base_value,
1664 ) {
1665 parsed_value = v;
1666 &parsed_value
1667 } else {
1668 &base_value
1669 };
1670
1671 let mut value_ref: &Value = working_value;
1673 let mut traversal_failed = false;
1674
1675 for segment in segments {
1676 let next = if let Ok(idx) = segment.parse::<usize>() {
1677 value_ref.get(idx)
1678 } else {
1679 value_ref.get(segment)
1680 };
1681
1682 match next {
1683 Some(v) => value_ref = v,
1684 None => {
1685 self.datastore.insert(
1686 intern(&task.name),
1687 TaskResult::failed(
1688 format!(
1689 "for_each binding '${}': nested path segment '{}' not found",
1690 alias, segment
1691 ),
1692 std::time::Duration::ZERO,
1693 ),
1694 );
1695 traversal_failed = true;
1696 break;
1697 }
1698 }
1699 }
1700
1701 if traversal_failed {
1702 continue;
1703 }
1704
1705 match value_to_array(value_ref) {
1706 Some(items) => Some(items),
1707 None => {
1708 self.datastore.insert(
1709 intern(&task.name),
1710 TaskResult::failed(
1711 format!(
1712 "for_each binding '${}' resolved to non-array value",
1713 alias
1714 ),
1715 std::time::Duration::ZERO,
1716 ),
1717 );
1718 continue;
1719 }
1720 }
1721 }
1722 Err(e) => {
1723 self.datastore.insert(
1724 intern(&task.name),
1725 TaskResult::failed(
1726 format!(
1727 "for_each binding '{}' not found: {}",
1728 base_alias, e
1729 ),
1730 std::time::Duration::ZERO,
1731 ),
1732 );
1733 continue;
1734 }
1735 }
1736 }
1737 } else if items_str.contains("{{inputs.") {
1738 if let Some(start) = items_str.find("{{inputs.") {
1740 let after = &items_str[start + 9..];
1741 if let Some(end) = after.find("}}") {
1742 let param_path = &after[..end];
1743 let full_path = format!("inputs.{}", param_path);
1744 match self.datastore.resolve_input_path(&full_path) {
1745 Some(value) => match value_to_array(&value) {
1746 Some(items) => Some(items),
1747 None => {
1748 self.datastore.insert(
1749 intern(&task.name),
1750 TaskResult::failed(
1751 format!(
1752 "for_each binding '{{{{inputs.{}}}}}' resolved to non-array value",
1753 param_path
1754 ),
1755 std::time::Duration::ZERO,
1756 ),
1757 );
1758 continue;
1759 }
1760 },
1761 None => {
1762 self.datastore.insert(
1763 intern(&task.name),
1764 TaskResult::failed(
1765 format!(
1766 "for_each input '{}' not found in workflow inputs",
1767 full_path
1768 ),
1769 std::time::Duration::ZERO,
1770 ),
1771 );
1772 continue;
1773 }
1774 }
1775 } else {
1776 None
1777 }
1778 } else {
1779 None
1780 }
1781 } else if items_str.contains("{{with.") {
1782 let prefix_info = items_str.find("{{with.").map(|s| (s, 7usize));
1784 if let Some((start, prefix_len)) = prefix_info {
1785 let after = &items_str[start + prefix_len..];
1786 if let Some(end) = after.find("}}") {
1787 let path = &after[..end];
1788 let mut parts = path.split('.');
1789 let Some(alias) = parts.next() else {
1790 continue;
1791 };
1792
1793 match bindings.get_resolved(alias, &self.datastore) {
1794 Ok(base_value) => {
1795 let parsed_value;
1797 let working_value: &Value = if let Some(v) =
1798 crate::binding::jsonpath::try_parse_json_str(
1799 &base_value,
1800 ) {
1801 parsed_value = v;
1802 &parsed_value
1803 } else {
1804 &base_value
1805 };
1806
1807 let mut value_ref: &Value = working_value;
1808 let mut traversal_failed = false;
1809
1810 for segment in parts {
1811 let next = if let Ok(idx) = segment.parse::<usize>()
1812 {
1813 value_ref.get(idx)
1814 } else {
1815 value_ref.get(segment)
1816 };
1817
1818 match next {
1819 Some(v) => value_ref = v,
1820 None => {
1821 tracing::warn!(
1822 task_id = %task.name,
1823 path = %path,
1824 segment = %segment,
1825 "for_each nested path segment not found"
1826 );
1827 traversal_failed = true;
1828 break;
1829 }
1830 }
1831 }
1832
1833 if traversal_failed {
1834 self.datastore.insert(
1835 intern(&task.name),
1836 TaskResult::failed(
1837 format!(
1838 "for_each items: path traversal failed for '{{{{with.{}}}}}'",
1839 path
1840 ),
1841 std::time::Duration::ZERO,
1842 ),
1843 );
1844 continue;
1845 } else {
1846 match value_to_array(value_ref) {
1847 Some(items) => Some(items),
1848 None => {
1849 self.datastore.insert(
1850 intern(&task.name),
1851 TaskResult::failed(
1852 format!(
1853 "for_each binding '{{{{with.{}}}}}' resolved to non-array value",
1854 path
1855 ),
1856 std::time::Duration::ZERO,
1857 ),
1858 );
1859 continue;
1860 }
1861 }
1862 }
1863 }
1864 Err(e) => {
1865 self.datastore.insert(
1866 intern(&task.name),
1867 TaskResult::failed(
1868 format!(
1869 "for_each binding '{}' not found: {}",
1870 alias, e
1871 ),
1872 std::time::Duration::ZERO,
1873 ),
1874 );
1875 continue;
1876 }
1877 }
1878 } else {
1879 None
1880 }
1881 } else {
1882 None
1883 }
1884 } else {
1885 None
1886 }
1887 } else if for_each.is_array() {
1888 for_each.parse_items()
1890 } else {
1891 None
1892 }
1893 } else {
1894 None
1895 };
1896
1897 if let Some(items) = for_each_items {
1899 if !items.is_empty() {
1900 let fe = task.for_each.as_ref();
1906 let concurrency = fe
1907 .and_then(|f| f.concurrency)
1908 .or(task.concurrency)
1909 .unwrap_or(1)
1910 .max(1) as usize;
1911 let fail_fast = fe.map(|f| f.fail_fast).or(task.fail_fast).unwrap_or(true);
1912
1913 debug!(
1914 task_id = %task.name,
1915 items = items.len(),
1916 concurrency = concurrency,
1917 fail_fast = fail_fast,
1918 "Starting for_each iteration"
1919 );
1920 self.event_log.emit(EventKind::ForEachStarted {
1921 task_id: Arc::from(task.name.as_str()),
1922 item_count: items.len(),
1923 concurrency,
1924 fail_fast,
1925 });
1926
1927 let semaphore = Arc::new(Semaphore::new(concurrency));
1929 let cancel = CancellationToken::new();
1931
1932 if fail_fast {
1935 for_each_cancel_tokens.insert(intern(&task.name), cancel.clone());
1936 }
1937
1938 let var_name = fe.map(|f| f.as_var.as_str()).unwrap_or("item").to_string();
1940 let task = Arc::new(task.clone());
1943 let mut task_id_buf = String::with_capacity(task.name.len() + 8);
1945 for (idx, item) in items.iter().enumerate() {
1946 if fail_fast && cancel.is_cancelled() {
1948 debug!(
1949 task_id = %task.name,
1950 idx = idx,
1951 "Skipping iteration due to fail_fast cancellation"
1952 );
1953 break;
1954 }
1955
1956 let task = Arc::clone(&task);
1957 task_id_buf.clear();
1958 use std::fmt::Write;
1959 let _ = write!(task_id_buf, "{}[{}]", task.name, idx);
1960 let task_id = intern(&task_id_buf);
1961 let parent_task_id = intern(&task.name);
1962 let datastore = self.datastore.clone();
1963 let executor = self.executor.clone();
1964 let event_log = self.event_log.clone();
1965 let item = item.clone();
1966 let var_name = var_name.clone();
1967 let semaphore = Arc::clone(&semaphore);
1968 let cancel = cancel.clone();
1969 let workflow_artifacts = workflow_artifacts.clone();
1970 let artifact_base_path = artifact_base_path.clone();
1971
1972 join_set.spawn(async move {
1973 if cancel.is_cancelled() {
1975 return IterationResult {
1976 store_id: task_id,
1977 result: TaskResult::skipped(
1978 "Cancelled due to fail_fast before semaphore acquire"
1979 .to_string(),
1980 ),
1981 for_each_info: Some((parent_task_id, idx)),
1982 artifact_paths: vec![],
1983 };
1984 }
1985
1986 let _permit = tokio::select! {
1988 biased;
1989
1990 _ = cancel.cancelled() => {
1991 return IterationResult {
1992 store_id: task_id,
1993 result: TaskResult::skipped(
1994 "Cancelled while waiting for semaphore".to_string(),
1995 ),
1996 for_each_info: Some((parent_task_id, idx)),
1997 artifact_paths: vec![],
1998 };
1999 }
2000
2001 permit = semaphore.acquire() => {
2002 match permit {
2003 Ok(p) => p,
2004 Err(_) => {
2005 return IterationResult {
2006 store_id: task_id,
2007 result: TaskResult::failed(
2008 "Semaphore closed unexpectedly".to_string(),
2009 std::time::Duration::ZERO,
2010 ),
2011 for_each_info: Some((parent_task_id, idx)),
2012 artifact_paths: vec![],
2013 };
2014 }
2015 }
2016 }
2017 };
2018
2019 if cancel.is_cancelled() {
2021 return IterationResult {
2022 store_id: task_id,
2023 result: TaskResult::skipped(
2024 "Cancelled after semaphore acquire".to_string(),
2025 ),
2026 for_each_info: Some((parent_task_id, idx)),
2027 artifact_paths: vec![],
2028 };
2029 }
2030
2031 let result = Self::execute_task_iteration(
2032 task,
2033 Arc::clone(&task_id),
2034 Arc::clone(&parent_task_id),
2035 datastore,
2036 executor,
2037 event_log,
2038 Some((var_name, item, idx)),
2039 workflow_artifacts,
2040 artifact_base_path,
2041 )
2042 .await;
2043
2044 if !result.result.is_success() && fail_fast {
2046 cancel.cancel();
2047 }
2048
2049 result
2050 });
2051 }
2052 } else {
2053 debug!(
2055 task_id = %task.name,
2056 "for_each items array is empty, storing empty result"
2057 );
2058 self.datastore.insert(
2059 intern(&task.name),
2060 TaskResult::success(Value::Array(vec![]), std::time::Duration::ZERO),
2061 );
2062 }
2063 } else if task.for_each.is_some() {
2064 self.datastore.insert(
2068 intern(&task.name),
2069 TaskResult::failed(
2070 format!(
2071 "for_each items could not be resolved for task '{}'. \
2072 Check the binding reference.",
2073 task.name
2074 ),
2075 std::time::Duration::ZERO,
2076 ),
2077 );
2078 continue;
2079 } else {
2080 let task = Arc::new(task.clone());
2082 let datastore = self.datastore.clone();
2083 let executor = self.executor.clone();
2084 let event_log = self.event_log.clone();
2085 let workflow_artifacts = workflow_artifacts.clone();
2086 let artifact_base_path = artifact_base_path.clone();
2087
2088 join_set.spawn(async move {
2089 Self::execute_task_iteration(
2090 task,
2091 Arc::clone(&task_id),
2092 task_id,
2093 datastore,
2094 executor,
2095 event_log,
2096 None,
2097 workflow_artifacts,
2098 artifact_base_path,
2099 )
2100 .await
2101 });
2102 }
2103 }
2104
2105 self.cli_renderer = renderer;
2106
2107 let mut for_each_results: IndexMap<Arc<str>, Vec<(usize, TaskResult)>> =
2110 IndexMap::new();
2111
2112 loop {
2114 tokio::select! {
2115 biased;
2116 _ = self.cancel_token.cancelled() => {
2118 join_set.abort_all();
2120
2121 let duration = workflow_start.elapsed();
2122 let running_tasks: Vec<Arc<str>> = self
2124 .workflow
2125 .tasks
2126 .iter()
2127 .filter(|t| !self.datastore.contains(&t.name))
2128 .map(|t| Arc::from(t.name.as_str()))
2129 .collect();
2130
2131 self.event_log.emit(EventKind::WorkflowAborted {
2132 reason: "Workflow cancelled during execution".to_string(),
2133 duration_ms: duration.as_millis() as u64,
2134 running_tasks,
2135 });
2136 self.write_trace();
2137 return Err(NikaError::WorkflowCancelled {
2138 phase: "during execution".to_string(),
2139 });
2140 }
2141 result = join_set.join_next() => {
2143 match result {
2144 Some(Ok(iteration_result)) => {
2145 let IterationResult {
2146 store_id,
2147 result: task_result,
2148 for_each_info,
2149 artifact_paths: _,
2150 } = iteration_result;
2151
2152 _completed += 1;
2153 let success = task_result.is_success();
2154 let skipped = task_result.is_skipped();
2155
2156 if let Some(ref mut r) = self.cli_renderer {
2158 self.event_log.with_events_since(r.last_rendered_id(), |events| {
2159 r.render_new_events(events);
2160 });
2161 }
2162
2163 self.datastore
2165 .insert(Arc::clone(&store_id), task_result.clone());
2166
2167 if !success && !skipped {
2172 if let Some((ref parent_id, _)) = for_each_info {
2173 if let Some(token) = for_each_cancel_tokens.get(parent_id) {
2174 if !token.is_cancelled() {
2175 debug!(
2176 store_id = %store_id,
2177 parent_id = %parent_id,
2178 "Triggering fail_fast cancellation for parent"
2179 );
2180 token.cancel();
2181 }
2182 }
2183 }
2184 }
2185
2186 if let Some((parent_id, idx)) = for_each_info {
2188 for_each_results
2189 .entry(parent_id)
2190 .or_default()
2191 .push((idx, task_result));
2192 }
2193 }
2194 Some(Err(e)) => {
2195 if e.is_cancelled() {
2197 debug!("Task cancelled (workflow abort or fail_fast)");
2199 } else {
2201 self.event_log.emit(EventKind::WorkflowFailed {
2203 error: format!("Task panicked: {}", e),
2204 failed_task: None,
2205 });
2206 self.write_trace();
2207 return Err(NikaError::TaskPanicked { reason: format!("{}", e) });
2208 }
2209 }
2210 None => {
2211 break;
2213 }
2214 }
2215 }
2216 }
2217 }
2218
2219 for (parent_id, mut results) in for_each_results {
2221 results.sort_by_key(|(idx, _)| *idx);
2223
2224 let outputs: Vec<Value> = results
2226 .iter()
2227 .map(|(_, r)| {
2228 let output_str = r.output_str();
2230 serde_json::from_str(&output_str)
2231 .unwrap_or(Value::String(output_str.into_owned()))
2232 })
2233 .collect();
2234
2235 let total_duration: std::time::Duration =
2237 results.iter().map(|(_, r)| r.duration).sum();
2238 let all_success = results.iter().all(|(_, r)| r.is_success());
2239
2240 let merged_media: Vec<crate::media::MediaRef> = results
2242 .iter()
2243 .filter(|(_, r)| r.is_success())
2244 .flat_map(|(_, r)| r.media.iter().cloned())
2245 .collect();
2246
2247 let aggregated_result = if all_success {
2249 TaskResult::success(Value::Array(outputs), total_duration)
2250 .with_media(merged_media)
2251 } else {
2252 let errors: Vec<String> = results
2254 .iter()
2255 .filter_map(|(idx, r)| r.error().map(|e| format!("[{}]: {}", idx, e)))
2256 .collect();
2257 let mut result = TaskResult::failed(errors.join("; "), total_duration)
2259 .with_media(merged_media);
2260 result.output = Arc::new(Value::Array(outputs));
2261 result
2262 };
2263
2264 self.event_log.emit(EventKind::ForEachCompleted {
2266 task_id: Arc::clone(&parent_id),
2267 total: results.len() as u32,
2268 succeeded: results.iter().filter(|(_, r)| r.is_success()).count() as u32,
2269 failed: results
2270 .iter()
2271 .filter(|(_, r)| !r.is_success() && !r.is_skipped())
2272 .count() as u32,
2273 skipped: results.iter().filter(|(_, r)| r.is_skipped()).count() as u32,
2274 duration_ms: total_duration.as_millis() as u64,
2275 });
2276
2277 self.datastore.insert(parent_id, aggregated_result);
2279 }
2280 }
2281
2282 let media_warnings = self.verify_media_integrity();
2284
2285 let output = self.get_final_output().unwrap_or_default();
2290
2291 self.event_log.emit(EventKind::WorkflowCompleted {
2293 final_output: Arc::new(Value::String(output.clone())),
2294 total_duration_ms: workflow_start.elapsed().as_millis() as u64,
2295 });
2296
2297 if media_warnings > 0 {
2298 tracing::warn!(
2299 warnings = media_warnings,
2300 "Media integrity check completed with warnings"
2301 );
2302 }
2303
2304 let trace_path = self.write_trace();
2306
2307 if let Some(ref mut renderer) = self.cli_renderer {
2308 self.event_log
2309 .with_events_since(renderer.last_rendered_id(), |events| {
2310 renderer.render_new_events(events);
2311 });
2312 let total_duration_ms = workflow_start.elapsed().as_millis() as u64;
2313 if self.quiet {
2314 renderer.render_quiet_summary(total_duration_ms);
2315 } else {
2316 renderer.render_summary(total_duration_ms, trace_path.as_deref());
2317 }
2318 } else if !self.quiet {
2319 let elapsed = workflow_start.elapsed();
2320 let elapsed_str = if elapsed.as_secs() >= 60 {
2321 format!(
2322 "{}m {:.1}s",
2323 elapsed.as_secs() / 60,
2324 elapsed.as_secs_f64() % 60.0
2325 )
2326 } else {
2327 format!("{:.1}s", elapsed.as_secs_f64())
2328 };
2329 let events = self.event_log.events();
2330 let (total_tokens, total_cost) =
2331 events.iter().fold((0u64, 0.0f64), |(tokens, cost), e| {
2332 if let EventKind::ProviderResponded {
2333 input_tokens,
2334 output_tokens,
2335 cost_usd,
2336 ..
2337 } = &e.kind
2338 {
2339 (tokens + input_tokens + output_tokens, cost + cost_usd)
2340 } else {
2341 (tokens, cost)
2342 }
2343 });
2344 let task_count = self.workflow.tasks.len();
2346 let parallel_count = if let Some(ref depths) = cached_depths {
2347 let max_layer = depths.values().copied().max().unwrap_or(0);
2348 let mut layers: Vec<Vec<&str>> = vec![Vec::new(); max_layer + 1];
2349 for task in &self.workflow.tasks {
2350 if let Some(&layer) = depths.get(task.name.as_str()) {
2351 layers[layer].push(task.name.as_str());
2352 }
2353 }
2354 layers
2355 .iter()
2356 .filter(|l| l.len() > 1)
2357 .flat_map(|l| l.iter())
2358 .count()
2359 } else {
2360 0
2361 };
2362 crate::display::print_done_summary(
2363 &elapsed_str,
2364 total_tokens,
2365 total_cost,
2366 trace_path.as_deref(),
2367 task_count,
2368 parallel_count,
2369 );
2370 }
2371
2372 self.executor.shutdown_mcp().await;
2374
2375 Ok(output)
2376 }
2377}
2378
2379#[cfg(test)]
2380mod tests {
2381 use super::*;
2382 use crate::ast::analyzed::{
2383 AnalyzedExecAction, AnalyzedForEach, AnalyzedInferAction, AnalyzedOutput, AnalyzedTask,
2384 AnalyzedTaskAction, AnalyzedWorkflow, OutputFormat as AnalyzedOutputFormat, TaskId,
2385 TaskTable,
2386 };
2387 use crate::ast::schema::SchemaVersion;
2388 use crate::ast::structured::StructuredOutputSpec;
2389 use crate::binding::types::{BindingPath, BindingSource};
2390 use crate::binding::{WithEntry, WithSpec};
2391 use crate::source::Span;
2392 use indexmap::IndexMap;
2393 use serde_json::json;
2394 use std::time::Duration;
2395
2396 fn make_empty_workflow() -> AnalyzedWorkflow {
2401 AnalyzedWorkflow {
2402 schema_version: SchemaVersion::V03,
2403 name: None,
2404 description: None,
2405 provider: Some("mock".to_string()),
2406 model: None,
2407 task_table: TaskTable::new(),
2408 tasks: vec![],
2409 mcp_servers: IndexMap::new(),
2410 context_files: vec![],
2411 imports: vec![],
2412 inputs: IndexMap::new(),
2413 artifacts: None,
2414 log: None,
2415 agents: None,
2416 skills_map: std::collections::HashMap::new(),
2417 span: Span::dummy(),
2418 }
2419 }
2420
2421 #[test]
2422 fn test_runner_quiet_mode() {
2423 let runner = Runner::new(make_empty_workflow()).unwrap();
2425 assert!(!runner.quiet, "Runner should not be quiet by default");
2426
2427 let runner = Runner::new(make_empty_workflow()).unwrap().quiet();
2429 assert!(runner.quiet, "Runner should be quiet after .quiet()");
2430
2431 let event_log = crate::event::EventLog::new();
2433 let runner = Runner::with_event_log(make_empty_workflow(), event_log)
2434 .unwrap()
2435 .quiet();
2436 assert!(runner.quiet, "Runner should be quiet when chained");
2437 }
2438
2439 #[test]
2444 fn test_with_initial_context_stores_value() {
2445 use serde_json::json;
2446
2447 let workflow = make_empty_workflow();
2448 let runner = Runner::new(workflow).unwrap().with_initial_context(
2449 "__parent_context__",
2450 json!({"key": "value", "nested": {"deep": true}}),
2451 );
2452
2453 let result = runner.datastore.get("__parent_context__");
2455 assert!(result.is_some(), "Context should be stored");
2456
2457 let stored = result.unwrap();
2458 assert!(stored.is_success(), "Should be stored as success");
2459
2460 let output = stored.output_str();
2461 assert!(output.contains("key"), "Should contain 'key'");
2462 assert!(output.contains("value"), "Should contain 'value'");
2463 }
2464
2465 #[test]
2466 fn test_with_initial_context_chaining() {
2467 use serde_json::json;
2468
2469 let workflow = make_empty_workflow();
2471 let event_log = EventLog::new();
2472 let runner = Runner::with_event_log(workflow, event_log)
2473 .unwrap()
2474 .quiet()
2475 .with_initial_context("test_ctx", json!({"test": 123}));
2476
2477 assert!(runner.quiet, "Should be quiet");
2478 assert!(
2479 runner.datastore.get("test_ctx").is_some(),
2480 "Context should exist"
2481 );
2482 }
2483
2484 fn create_for_each_workflow(
2490 task_id: &str,
2491 items_json: &str,
2492 as_var: &str,
2493 command: &str,
2494 concurrency: Option<u32>,
2495 fail_fast: bool,
2496 shell: bool,
2497 ) -> AnalyzedWorkflow {
2498 let mut task_table = TaskTable::new();
2499 task_table.insert(task_id);
2500 let tid = task_table.get_id(task_id).unwrap();
2501
2502 let task = AnalyzedTask {
2503 id: tid,
2504 name: task_id.to_string(),
2505 description: None,
2506 action: AnalyzedTaskAction::Exec(AnalyzedExecAction {
2507 command: command.to_string(),
2508 shell,
2509 cwd: None,
2510 env: IndexMap::new(),
2511 timeout_ms: None,
2512 span: Span::dummy(),
2513 }),
2514 provider: None,
2515 model: None,
2516 with_spec: Default::default(),
2517 depends_on: vec![],
2518 implicit_deps: vec![],
2519 output: None,
2520 for_each: Some(AnalyzedForEach {
2521 items: items_json.to_string(),
2522 as_var: as_var.to_string(),
2523 concurrency,
2524 fail_fast,
2525 span: Span::dummy(),
2526 }),
2527 retry: None,
2528 decompose: None,
2529 concurrency: None,
2530 fail_fast: None,
2531 artifact: None,
2532 log: None,
2533 structured: None,
2534 span: Span::dummy(),
2535 };
2536
2537 AnalyzedWorkflow {
2538 schema_version: SchemaVersion::V03,
2539 name: None,
2540 description: None,
2541 provider: Some("mock".to_string()),
2542 model: None,
2543 task_table,
2544 tasks: vec![task],
2545 mcp_servers: IndexMap::new(),
2546 context_files: vec![],
2547 imports: vec![],
2548 inputs: IndexMap::new(),
2549 artifacts: None,
2550 log: None,
2551 agents: None,
2552 skills_map: std::collections::HashMap::new(),
2553 span: Span::dummy(),
2554 }
2555 }
2556
2557 #[tokio::test]
2558 async fn test_for_each_collects_all_results() {
2559 let workflow = create_for_each_workflow(
2560 "echo_items",
2561 r#"["a", "b", "c"]"#,
2562 "item",
2563 "echo {{with.item}}",
2564 None, true, false, );
2568
2569 let mut runner = Runner::new(workflow).unwrap();
2570 let result = runner.run().await;
2571 assert!(
2572 result.is_ok(),
2573 "Workflow should complete: {:?}",
2574 result.err()
2575 );
2576
2577 let parent_result = runner.datastore.get("echo_items");
2578 assert!(parent_result.is_some(), "Parent task result should exist");
2579
2580 let result = parent_result.unwrap();
2581 let output = result.output_str();
2582 let has_a = output.contains("a") || output.contains("\"a\"");
2583 let has_b = output.contains("b") || output.contains("\"b\"");
2584 let has_c = output.contains("c") || output.contains("\"c\"");
2585
2586 assert!(
2587 has_a && has_b && has_c,
2588 "Output should contain all 3 results, got: {}",
2589 output
2590 );
2591 }
2592
2593 #[tokio::test]
2594 async fn test_for_each_preserves_order() {
2595 let workflow = create_for_each_workflow(
2596 "ordered",
2597 r#"["first", "second", "third"]"#,
2598 "x",
2599 "echo {{with.x}}",
2600 None,
2601 true,
2602 false,
2603 );
2604
2605 let mut runner = Runner::new(workflow).unwrap();
2606 runner.run().await.unwrap();
2607
2608 let parent_result = runner.datastore.get("ordered");
2609 assert!(parent_result.is_some(), "Parent task result should exist");
2610
2611 let result = parent_result.unwrap();
2612 let output = result.output_str();
2613 if let Ok(arr) = serde_json::from_str::<Vec<serde_json::Value>>(&output) {
2614 assert_eq!(arr.len(), 3, "Should have 3 results");
2615 let first = arr[0].as_str().unwrap_or("");
2616 let last = arr[2].as_str().unwrap_or("");
2617 assert!(
2618 first.contains("first"),
2619 "First element should contain 'first'"
2620 );
2621 assert!(
2622 last.contains("third"),
2623 "Last element should contain 'third'"
2624 );
2625 }
2626 }
2627
2628 fn create_exec_workflow(
2634 tasks: Vec<(&str, &str)>,
2635 edges: Vec<(&str, &str)>,
2636 ) -> AnalyzedWorkflow {
2637 let mut task_table = TaskTable::new();
2638 for (id, _) in &tasks {
2639 task_table.insert(id);
2640 }
2641
2642 let analyzed_tasks: Vec<AnalyzedTask> = tasks
2643 .into_iter()
2644 .map(|(id, cmd)| {
2645 let task_id = task_table.get_id(id).unwrap();
2646 let depends_on: Vec<_> = edges
2647 .iter()
2648 .filter(|(_, tgt)| *tgt == id)
2649 .filter_map(|(src, _)| task_table.get_id(src))
2650 .collect();
2651 AnalyzedTask {
2652 id: task_id,
2653 name: id.to_string(),
2654 description: None,
2655 action: AnalyzedTaskAction::Exec(AnalyzedExecAction {
2656 command: cmd.to_string(),
2657 shell: false,
2658 cwd: None,
2659 env: IndexMap::new(),
2660 timeout_ms: None,
2661 span: Span::dummy(),
2662 }),
2663 provider: None,
2664 model: None,
2665 with_spec: Default::default(),
2666 depends_on,
2667 implicit_deps: vec![],
2668 output: None,
2669 for_each: None,
2670 retry: None,
2671 decompose: None,
2672 concurrency: None,
2673 fail_fast: None,
2674 artifact: None,
2675 log: None,
2676 structured: None,
2677 span: Span::dummy(),
2678 }
2679 })
2680 .collect();
2681
2682 AnalyzedWorkflow {
2683 schema_version: SchemaVersion::V01,
2684 name: None,
2685 description: None,
2686 provider: Some("mock".to_string()),
2687 model: None,
2688 task_table,
2689 tasks: analyzed_tasks,
2690 mcp_servers: IndexMap::new(),
2691 context_files: vec![],
2692 imports: vec![],
2693 inputs: IndexMap::new(),
2694 artifacts: None,
2695 log: None,
2696 agents: None,
2697 skills_map: std::collections::HashMap::new(),
2698 span: Span::dummy(),
2699 }
2700 }
2701
2702 #[tokio::test]
2703 async fn event_sequence_for_single_task() {
2704 let workflow = create_exec_workflow(vec![("greet", "echo hello")], vec![]);
2705 let mut runner = Runner::new(workflow).unwrap();
2706
2707 let result = runner.run().await.unwrap();
2708 assert_eq!(result, "hello");
2709
2710 let events = runner.event_log().events();
2712
2713 assert!(
2722 events.len() >= 5,
2723 "Expected at least 5 events, got {}",
2724 events.len()
2725 );
2726
2727 assert!(matches!(
2729 &events[0].kind,
2730 EventKind::WorkflowStarted { task_count: 1, .. }
2731 ));
2732
2733 let last = events.last().unwrap();
2735 assert!(matches!(&last.kind, EventKind::WorkflowCompleted { .. }));
2736
2737 let task_events = runner.event_log().filter_task("greet");
2739 assert!(task_events.len() >= 3, "Expected at least 3 task events");
2740
2741 let completed = task_events
2743 .iter()
2744 .find(|e| matches!(&e.kind, EventKind::TaskCompleted { .. }));
2745 assert!(completed.is_some(), "TaskCompleted event not found");
2746 }
2747
2748 #[tokio::test]
2749 async fn event_sequence_for_chained_tasks() {
2750 let workflow = create_exec_workflow(
2752 vec![("greet", "echo hello"), ("shout", "echo DONE")],
2753 vec![("greet", "shout")],
2754 );
2755 let mut runner = Runner::new(workflow).unwrap();
2756
2757 runner.run().await.unwrap();
2758
2759 let events = runner.event_log().events();
2760
2761 assert!(matches!(
2763 &events[0].kind,
2764 EventKind::WorkflowStarted { task_count: 2, .. }
2765 ));
2766
2767 let greet_events = runner.event_log().filter_task("greet");
2769 let shout_events = runner.event_log().filter_task("shout");
2770
2771 assert!(!greet_events.is_empty(), "greet task events missing");
2772 assert!(!shout_events.is_empty(), "shout task events missing");
2773
2774 let greet_completed_id = greet_events
2776 .iter()
2777 .find(|e| matches!(&e.kind, EventKind::TaskCompleted { .. }))
2778 .map(|e| e.id);
2779 let shout_started_id = shout_events
2780 .iter()
2781 .find(|e| matches!(&e.kind, EventKind::TaskStarted { .. }))
2782 .map(|e| e.id);
2783
2784 assert!(greet_completed_id.is_some());
2785 assert!(shout_started_id.is_some());
2786 assert!(
2787 greet_completed_id.unwrap() < shout_started_id.unwrap(),
2788 "greet should complete before shout starts"
2789 );
2790 }
2791
2792 #[tokio::test]
2793 async fn event_sequence_for_parallel_tasks() {
2794 let workflow = create_exec_workflow(
2796 vec![("task_a", "echo A"), ("task_b", "echo B")],
2797 vec![], );
2799 let mut runner = Runner::new(workflow).unwrap();
2800
2801 runner.run().await.unwrap();
2802
2803 let events = runner.event_log().events();
2804
2805 assert!(matches!(
2807 &events[0].kind,
2808 EventKind::WorkflowStarted { task_count: 2, .. }
2809 ));
2810
2811 let scheduled: Vec<_> = events
2813 .iter()
2814 .filter(|e| matches!(&e.kind, EventKind::TaskScheduled { .. }))
2815 .collect();
2816 assert_eq!(scheduled.len(), 2, "Both tasks should be scheduled");
2817
2818 let completed: Vec<_> = events
2820 .iter()
2821 .filter(|e| matches!(&e.kind, EventKind::TaskCompleted { .. }))
2822 .collect();
2823 assert_eq!(completed.len(), 2, "Both tasks should complete");
2824
2825 let last = events.last().unwrap();
2827 assert!(matches!(&last.kind, EventKind::WorkflowCompleted { .. }));
2828 }
2829
2830 #[tokio::test]
2831 async fn event_ids_are_monotonic() {
2832 let workflow = create_exec_workflow(
2833 vec![("a", "echo 1"), ("b", "echo 2"), ("c", "echo 3")],
2834 vec![("a", "b"), ("b", "c")],
2835 );
2836 let mut runner = Runner::new(workflow).unwrap();
2837
2838 runner.run().await.unwrap();
2839
2840 let events = runner.event_log().events();
2841 let ids: Vec<u64> = events.iter().map(|e| e.id).collect();
2842
2843 for (i, &id) in ids.iter().enumerate() {
2845 assert_eq!(id, i as u64, "IDs should be sequential from 0");
2846 }
2847 }
2848
2849 #[tokio::test]
2850 async fn timestamps_are_relative_and_increasing() {
2851 let workflow = create_exec_workflow(
2852 vec![("fast", "echo quick"), ("slow", "echo done")],
2853 vec![("fast", "slow")],
2854 );
2855 let mut runner = Runner::new(workflow).unwrap();
2856
2857 runner.run().await.unwrap();
2858
2859 let events = runner.event_log().events();
2860
2861 assert!(
2864 events[0].timestamp_ms < 5000,
2865 "First event should be near start (got {}ms, expected < 5000ms)",
2866 events[0].timestamp_ms
2867 );
2868
2869 for window in events.windows(2) {
2871 assert!(
2872 window[1].timestamp_ms >= window[0].timestamp_ms,
2873 "Timestamps should not decrease"
2874 );
2875 }
2876 }
2877
2878 #[tokio::test]
2879 async fn failed_task_emits_task_failed_event() {
2880 let workflow = create_exec_workflow(vec![("fail", "exit 1")], vec![]);
2881 let mut runner = Runner::new(workflow).unwrap();
2882
2883 let result = runner.run().await;
2885 assert!(
2886 result.is_err(),
2887 "workflow should return Err when tasks fail"
2888 );
2889
2890 let events = runner.event_log().filter_task("fail");
2891 let failed = events
2892 .iter()
2893 .find(|e| matches!(&e.kind, EventKind::TaskFailed { .. }));
2894
2895 assert!(failed.is_some(), "TaskFailed event should be emitted");
2896 }
2897
2898 #[tokio::test]
2899 async fn template_resolved_event_captures_before_and_after() {
2900 let workflow = create_exec_workflow(vec![("echo_test", "echo hello world")], vec![]);
2902 let mut runner = Runner::new(workflow).unwrap();
2903
2904 runner.run().await.unwrap();
2905
2906 let events = runner.event_log().filter_task("echo_test");
2907 let template_event = events
2908 .iter()
2909 .find(|e| matches!(&e.kind, EventKind::TemplateResolved { .. }));
2910
2911 assert!(template_event.is_some(), "TemplateResolved event expected");
2912
2913 if let EventKind::TemplateResolved {
2914 template, result, ..
2915 } = &template_event.unwrap().kind
2916 {
2917 assert_eq!(template, "echo hello world");
2918 assert_eq!(result, "echo hello world");
2919 }
2920 }
2921
2922 #[tokio::test]
2923 async fn event_log_to_json_serializes_correctly() {
2924 let workflow = create_exec_workflow(vec![("simple", "echo test")], vec![]);
2925 let mut runner = Runner::new(workflow).unwrap();
2926
2927 runner.run().await.unwrap();
2928
2929 let json = runner.event_log().to_json();
2930 assert!(json.is_array());
2931
2932 let array = json.as_array().unwrap();
2933 assert!(!array.is_empty());
2934
2935 let first = &array[0];
2937 assert!(first.get("id").is_some());
2938 assert!(first.get("timestamp_ms").is_some());
2939 assert!(first.get("kind").is_some());
2940 assert_eq!(first["kind"]["type"], "workflow_started");
2941 }
2942
2943 #[test]
2948 fn get_ready_tasks_returns_tasks_with_no_deps() {
2949 let workflow = create_exec_workflow(
2951 vec![("a", "echo A"), ("b", "echo B")],
2952 vec![], );
2954 let runner = Runner::new(workflow).unwrap();
2955
2956 let ready = runner.get_ready_tasks();
2957 assert_eq!(ready.len(), 2, "Both tasks should be ready");
2958
2959 let names: Vec<&str> = ready.iter().map(|t| t.name.as_str()).collect();
2960 assert!(names.contains(&"a"), "Task 'a' should be ready");
2961 assert!(names.contains(&"b"), "Task 'b' should be ready");
2962 }
2963
2964 #[test]
2965 fn get_ready_tasks_respects_dependencies() {
2966 let workflow = create_exec_workflow(
2968 vec![("a", "echo A"), ("b", "echo B"), ("c", "echo C")],
2969 vec![("a", "b"), ("b", "c")],
2970 );
2971 let runner = Runner::new(workflow).unwrap();
2972
2973 let ready = runner.get_ready_tasks();
2974 assert_eq!(ready.len(), 1, "Only first task should be ready");
2975 assert_eq!(ready[0].name, "a", "Task 'a' should be ready");
2976 }
2977
2978 #[test]
2979 fn get_ready_tasks_excludes_completed_tasks() {
2980 let workflow = create_exec_workflow(vec![("only", "echo x")], vec![]);
2981 let runner = Runner::new(workflow).unwrap();
2982
2983 let ready = runner.get_ready_tasks();
2985 assert_eq!(ready.len(), 1);
2986
2987 runner.datastore.insert(
2989 intern("only"),
2990 TaskResult::success_str("done", std::time::Duration::ZERO),
2991 );
2992
2993 let ready = runner.get_ready_tasks();
2995 assert_eq!(ready.len(), 0, "Completed task should not be ready");
2996 }
2997
2998 #[test]
2999 fn all_done_returns_false_when_tasks_pending() {
3000 let workflow = create_exec_workflow(vec![("a", "echo A"), ("b", "echo B")], vec![]);
3001 let runner = Runner::new(workflow).unwrap();
3002
3003 assert!(!runner.all_done(), "Not all tasks are done initially");
3004 }
3005
3006 #[test]
3007 fn all_done_returns_true_when_all_completed() {
3008 let workflow = create_exec_workflow(vec![("a", "echo A"), ("b", "echo B")], vec![]);
3009 let runner = Runner::new(workflow).unwrap();
3010
3011 runner.datastore.insert(
3013 intern("a"),
3014 TaskResult::success_str("A", std::time::Duration::ZERO),
3015 );
3016 runner.datastore.insert(
3017 intern("b"),
3018 TaskResult::success_str("B", std::time::Duration::ZERO),
3019 );
3020
3021 assert!(runner.all_done(), "All tasks should be done");
3022 }
3023
3024 #[test]
3025 fn get_final_output_returns_output_from_final_task() {
3026 let workflow =
3028 create_exec_workflow(vec![("a", "echo A"), ("b", "echo B")], vec![("a", "b")]);
3029 let runner = Runner::new(workflow).unwrap();
3030
3031 runner.datastore.insert(
3033 intern("a"),
3034 TaskResult::success_str("A", std::time::Duration::ZERO),
3035 );
3036 runner.datastore.insert(
3037 intern("b"),
3038 TaskResult::success_str("final output", std::time::Duration::ZERO),
3039 );
3040
3041 let output = runner.get_final_output();
3042 assert!(output.is_some());
3043 assert_eq!(output.unwrap(), "final output");
3044 }
3045
3046 #[test]
3047 fn get_final_output_returns_none_when_no_results() {
3048 let workflow = create_exec_workflow(vec![("only", "echo x")], vec![]);
3049 let runner = Runner::new(workflow).unwrap();
3050
3051 let output = runner.get_final_output();
3052 assert!(output.is_none(), "No output when tasks not complete");
3053 }
3054
3055 #[test]
3056 fn get_final_output_skips_failed_tasks() {
3057 let workflow = create_exec_workflow(
3058 vec![("a", "echo A"), ("b", "exit 1")],
3059 vec![], );
3061 let runner = Runner::new(workflow).unwrap();
3062
3063 runner.datastore.insert(
3065 intern("a"),
3066 TaskResult::success_str("success", std::time::Duration::ZERO),
3067 );
3068 runner.datastore.insert(
3069 intern("b"),
3070 TaskResult::failed("error", std::time::Duration::ZERO),
3071 );
3072
3073 let output = runner.get_final_output();
3074 assert!(output.is_some());
3075 assert_eq!(
3076 output.unwrap(),
3077 "success",
3078 "Should return successful task output"
3079 );
3080 }
3081
3082 #[tokio::test]
3087 async fn for_each_with_explicit_concurrency() {
3088 let workflow = create_for_each_workflow(
3089 "concurrent",
3090 r#"["a", "b", "c", "d"]"#,
3091 "item",
3092 "echo {{with.item}}",
3093 Some(2), true,
3095 false,
3096 );
3097
3098 let mut runner = Runner::new(workflow).unwrap();
3099 let result = runner.run().await;
3100 assert!(
3101 result.is_ok(),
3102 "Workflow should complete: {:?}",
3103 result.err()
3104 );
3105
3106 let parent_result = runner.datastore.get("concurrent");
3107 assert!(parent_result.is_some(), "Parent task result should exist");
3108
3109 let result = parent_result.unwrap();
3110 let output = result.output_str();
3111 assert!(output.contains("a") || output.contains("\"a\""));
3112 assert!(output.contains("d") || output.contains("\"d\""));
3113 }
3114
3115 #[tokio::test]
3116 async fn for_each_fail_fast_stops_on_first_error() {
3117 let workflow = create_for_each_workflow(
3118 "failfast",
3119 r#"["ok1", "FAIL", "ok2", "ok3"]"#,
3120 "item",
3121 "test '{{with.item}}' != 'FAIL' && echo {{with.item}}",
3122 Some(1), true, false,
3125 );
3126
3127 let mut runner = Runner::new(workflow).unwrap();
3128 let result = runner.run().await;
3129 assert!(
3131 result.is_err(),
3132 "Workflow should fail when fail_fast triggers"
3133 );
3134 }
3135
3136 #[tokio::test]
3137 async fn for_each_fail_fast_false_continues_on_error() {
3138 let workflow = create_for_each_workflow(
3139 "continue",
3140 r#"["ok1", "ok2"]"#,
3141 "item",
3142 "echo {{with.item}}",
3143 None,
3144 false, false,
3146 );
3147
3148 let mut runner = Runner::new(workflow).unwrap();
3149 let result = runner.run().await;
3150 assert!(result.is_ok(), "Workflow should complete");
3151
3152 let parent_result = runner.datastore.get("continue");
3153 assert!(parent_result.is_some());
3154 }
3155
3156 fn create_for_each_with_inputs(
3162 task_id: &str,
3163 items_expr: &str,
3164 as_var: &str,
3165 command: &str,
3166 inputs: IndexMap<String, serde_json::Value>,
3167 concurrency: Option<u32>,
3168 ) -> AnalyzedWorkflow {
3169 let mut workflow = create_for_each_workflow(
3170 task_id,
3171 items_expr,
3172 as_var,
3173 command,
3174 concurrency,
3175 true,
3176 false,
3177 );
3178 workflow.inputs = inputs;
3179 workflow
3180 }
3181
3182 #[tokio::test]
3183 async fn for_each_with_dollar_inputs_array() {
3184 let mut inputs = IndexMap::new();
3185 inputs.insert(
3186 "items".to_string(),
3187 json!({
3188 "type": "array",
3189 "default": ["alpha", "beta", "gamma"]
3190 }),
3191 );
3192 let workflow = create_for_each_with_inputs(
3193 "process_items",
3194 "$inputs.items",
3195 "item",
3196 "echo {{with.item}}",
3197 inputs,
3198 None,
3199 );
3200
3201 let mut runner = Runner::new(workflow).unwrap();
3202 let result = runner.run().await;
3203 assert!(
3204 result.is_ok(),
3205 "Workflow should complete: {:?}",
3206 result.err()
3207 );
3208
3209 let task_result = runner.datastore.get("process_items");
3210 assert!(task_result.is_some(), "Task result should exist");
3211 assert!(task_result.unwrap().is_success(), "Task should succeed");
3212 }
3213
3214 #[tokio::test]
3215 async fn for_each_with_template_inputs() {
3216 let mut inputs = IndexMap::new();
3217 inputs.insert(
3218 "locales".to_string(),
3219 json!({
3220 "type": "array",
3221 "default": ["fr-FR", "en-US"]
3222 }),
3223 );
3224 let workflow = create_for_each_with_inputs(
3225 "translate",
3226 "{{inputs.locales}}",
3227 "locale",
3228 "echo Translating to {{with.locale}}",
3229 inputs,
3230 Some(2),
3231 );
3232
3233 let mut runner = Runner::new(workflow).unwrap();
3234 let result = runner.run().await;
3235 assert!(
3236 result.is_ok(),
3237 "Workflow should complete: {:?}",
3238 result.err()
3239 );
3240
3241 let task_result = runner.datastore.get("translate");
3242 assert!(task_result.is_some(), "Task result should exist");
3243 assert!(task_result.unwrap().is_success(), "Task should succeed");
3244 }
3245
3246 #[tokio::test]
3247 async fn for_each_with_inputs_missing_fails_gracefully() {
3248 let mut inputs = IndexMap::new();
3249 inputs.insert(
3250 "other_param".to_string(),
3251 json!({
3252 "type": "string",
3253 "default": "test"
3254 }),
3255 );
3256 let workflow = create_for_each_with_inputs(
3257 "missing_input",
3258 "$inputs.nonexistent",
3259 "item",
3260 "echo {{with.item}}",
3261 inputs,
3262 None,
3263 );
3264
3265 let mut runner = Runner::new(workflow).unwrap();
3266 let result = runner.run().await;
3267 assert!(result.is_err(), "Workflow should fail when task fails");
3269
3270 let task_result = runner.datastore.get("missing_input");
3271 assert!(task_result.is_some(), "Task result should exist");
3272 let tr = task_result.unwrap();
3273 assert!(!tr.is_success(), "Task should fail due to missing input");
3274 let error_msg = tr.error().expect("Failed task should have error message");
3275 assert!(
3276 error_msg.contains("not found"),
3277 "Error should mention 'not found': {}",
3278 error_msg
3279 );
3280 }
3281
3282 #[tokio::test]
3283 async fn for_each_with_inputs_nested_path() {
3284 let mut inputs = IndexMap::new();
3285 inputs.insert(
3286 "data".to_string(),
3287 json!({
3288 "type": "object",
3289 "default": {
3290 "items": ["one", "two", "three"]
3291 }
3292 }),
3293 );
3294 let workflow = create_for_each_with_inputs(
3295 "nested",
3296 "$inputs.data.items",
3297 "n",
3298 "echo {{with.n}}",
3299 inputs,
3300 None,
3301 );
3302
3303 let mut runner = Runner::new(workflow).unwrap();
3304 let result = runner.run().await;
3305 assert!(
3306 result.is_ok(),
3307 "Workflow should complete: {:?}",
3308 result.err()
3309 );
3310
3311 let task_result = runner.datastore.get("nested");
3312 assert!(task_result.is_some(), "Task result should exist");
3313 assert!(task_result.unwrap().is_success(), "Task should succeed");
3314 }
3315
3316 fn create_two_step_for_each_workflow(
3322 step1_cmd: &str,
3323 step1_shell: bool,
3324 for_each_items: &str,
3325 step2_cmd: &str,
3326 ) -> AnalyzedWorkflow {
3327 let mut task_table = TaskTable::new();
3328 task_table.insert("step1");
3329 task_table.insert("step2");
3330 let tid1 = task_table.get_id("step1").unwrap();
3331 let tid2 = task_table.get_id("step2").unwrap();
3332
3333 let step1 = AnalyzedTask {
3334 id: tid1,
3335 name: "step1".to_string(),
3336 description: None,
3337 action: AnalyzedTaskAction::Exec(AnalyzedExecAction {
3338 command: step1_cmd.to_string(),
3339 shell: step1_shell,
3340 cwd: None,
3341 env: IndexMap::new(),
3342 timeout_ms: None,
3343 span: Span::dummy(),
3344 }),
3345 provider: None,
3346 model: None,
3347 with_spec: Default::default(),
3348 depends_on: vec![],
3349 implicit_deps: vec![],
3350 output: None,
3351 for_each: None,
3352 retry: None,
3353 decompose: None,
3354 concurrency: None,
3355 fail_fast: None,
3356 artifact: None,
3357 log: None,
3358 structured: None,
3359 span: Span::dummy(),
3360 };
3361
3362 let mut with_spec = WithSpec::default();
3363 with_spec.insert(
3364 "step1".to_string(),
3365 WithEntry::simple(BindingPath {
3366 source: BindingSource::Task(intern("step1")),
3367 segments: vec![],
3368 }),
3369 );
3370
3371 let step2 = AnalyzedTask {
3372 id: tid2,
3373 name: "step2".to_string(),
3374 description: None,
3375 action: AnalyzedTaskAction::Exec(AnalyzedExecAction {
3376 command: step2_cmd.to_string(),
3377 shell: false,
3378 cwd: None,
3379 env: IndexMap::new(),
3380 timeout_ms: None,
3381 span: Span::dummy(),
3382 }),
3383 provider: None,
3384 model: None,
3385 with_spec,
3386 depends_on: vec![tid1],
3387 implicit_deps: vec![],
3388 output: None,
3389 for_each: Some(AnalyzedForEach {
3390 items: for_each_items.to_string(),
3391 as_var: "item".to_string(),
3392 concurrency: None,
3393 fail_fast: true,
3394 span: Span::dummy(),
3395 }),
3396 retry: None,
3397 decompose: None,
3398 concurrency: None,
3399 fail_fast: None,
3400 artifact: None,
3401 log: None,
3402 structured: None,
3403 span: Span::dummy(),
3404 };
3405
3406 AnalyzedWorkflow {
3407 schema_version: SchemaVersion::V03,
3408 name: None,
3409 description: None,
3410 provider: Some("mock".to_string()),
3411 model: None,
3412 task_table,
3413 tasks: vec![step1, step2],
3414 mcp_servers: IndexMap::new(),
3415 context_files: vec![],
3416 imports: vec![],
3417 inputs: IndexMap::new(),
3418 artifacts: None,
3419 log: None,
3420 agents: None,
3421 skills_map: std::collections::HashMap::new(),
3422 span: Span::dummy(),
3423 }
3424 }
3425
3426 #[tokio::test]
3427 async fn for_each_dollar_binding_nested_path() {
3428 let workflow = create_two_step_for_each_workflow(
3429 r#"echo '{"items": ["alpha", "beta", "gamma"], "count": 3}'"#,
3430 true,
3431 "$step1.items",
3432 "echo {{with.item}}",
3433 );
3434
3435 let mut runner = Runner::new(workflow).unwrap();
3436 let result = runner.run().await;
3437 assert!(
3438 result.is_ok(),
3439 "Workflow should complete: {:?}",
3440 result.err()
3441 );
3442
3443 let task_result = runner.datastore.get("step2");
3444 assert!(task_result.is_some(), "step2 result should exist");
3445 assert!(
3446 task_result.unwrap().is_success(),
3447 "step2 should succeed with 3 items from nested path"
3448 );
3449 }
3450
3451 #[tokio::test]
3452 async fn for_each_dollar_binding_non_array_errors() {
3453 let workflow = create_two_step_for_each_workflow(
3454 "echo not_an_array",
3455 false,
3456 "$step1",
3457 "echo {{with.item}}",
3458 );
3459
3460 let mut runner = Runner::new(workflow).unwrap();
3461 let _ = runner.run().await;
3462
3463 let task_result = runner.datastore.get("step2");
3464 assert!(task_result.is_some(), "step2 result should exist");
3465
3466 let result = task_result.unwrap();
3467 assert!(
3468 !result.is_success(),
3469 "step2 should FAIL when for_each binding resolves to non-array"
3470 );
3471 let error_msg = result.error().expect("should have error message");
3472 assert!(
3473 error_msg.contains("non-array"),
3474 "Error should mention 'non-array', got: {}",
3475 error_msg
3476 );
3477 }
3478
3479 #[tokio::test]
3480 async fn for_each_dollar_binding_json_string_array() {
3481 let workflow = create_two_step_for_each_workflow(
3482 r#"echo '["x","y","z"]'"#,
3483 true,
3484 "$step1",
3485 "echo {{with.item}}",
3486 );
3487
3488 let mut runner = Runner::new(workflow).unwrap();
3489 let result = runner.run().await;
3490 assert!(
3491 result.is_ok(),
3492 "Workflow should complete: {:?}",
3493 result.err()
3494 );
3495
3496 let task_result = runner.datastore.get("step2");
3497 assert!(task_result.is_some(), "step2 result should exist");
3498 assert!(
3499 task_result.unwrap().is_success(),
3500 "step2 should succeed — JSON string array should be parsed"
3501 );
3502 }
3503
3504 #[tokio::test]
3505 async fn for_each_dollar_binding_nested_path_not_found() {
3506 let workflow = create_two_step_for_each_workflow(
3507 r#"echo '{"data": {"count": 5}}'"#,
3508 true,
3509 "$step1.data.nonexistent",
3510 "echo {{with.item}}",
3511 );
3512
3513 let mut runner = Runner::new(workflow).unwrap();
3514 let _ = runner.run().await;
3515
3516 let task_result = runner.datastore.get("step2");
3517 assert!(task_result.is_some(), "step2 result should exist");
3518
3519 let result = task_result.unwrap();
3520 assert!(
3521 !result.is_success(),
3522 "step2 should FAIL when nested path segment doesn't exist"
3523 );
3524 let error_msg = result.error().expect("should have error message");
3525 assert!(
3526 error_msg.contains("not found"),
3527 "Error should mention path segment not found, got: {}",
3528 error_msg
3529 );
3530 }
3531
3532 #[tokio::test]
3537 async fn for_each_empty_array_completes_with_empty_result() {
3538 let workflow = create_for_each_workflow(
3539 "empty_loop",
3540 "[]", "item",
3542 "echo {{with.item}}",
3543 None,
3544 true,
3545 false,
3546 );
3547
3548 let mut runner = Runner::new(workflow).unwrap();
3549 let result = runner.run().await;
3550 assert!(
3551 result.is_ok(),
3552 "Workflow with empty for_each should succeed, got: {:?}",
3553 result.err()
3554 );
3555
3556 let parent_result = runner.datastore.get("empty_loop");
3558 assert!(
3559 parent_result.is_some(),
3560 "for_each with empty array should store a result"
3561 );
3562
3563 let result = parent_result.unwrap();
3564 assert!(
3565 result.is_success(),
3566 "for_each with empty array should be success"
3567 );
3568
3569 let output = result.output_str();
3571 assert_eq!(
3572 output.trim(),
3573 "[]",
3574 "for_each with empty array should produce empty array, got: {}",
3575 output
3576 );
3577 }
3578
3579 #[test]
3584 fn with_event_log_uses_provided_event_log() {
3585 let workflow = create_exec_workflow(vec![("a", "echo A")], vec![]);
3586 let custom_log = EventLog::new();
3587 let runner = Runner::with_event_log(workflow, custom_log).unwrap();
3588
3589 assert!(runner.event_log().events().is_empty());
3591 }
3592
3593 #[test]
3594 fn new_and_with_event_log_return_result() {
3595 let workflow = create_exec_workflow(vec![("a", "echo A")], vec![]);
3597 let result = Runner::new(workflow);
3598 assert!(
3599 result.is_ok(),
3600 "Runner::new should return Ok for a valid workflow"
3601 );
3602
3603 let workflow = create_exec_workflow(vec![("a", "echo A")], vec![]);
3605 let event_log = EventLog::new();
3606 let result = Runner::with_event_log(workflow, event_log);
3607 assert!(
3608 result.is_ok(),
3609 "Runner::with_event_log should return Ok for a valid workflow"
3610 );
3611 }
3612
3613 #[tokio::test]
3614 async fn workflow_completed_event_has_duration() {
3615 let workflow = create_exec_workflow(vec![("quick", "echo fast")], vec![]);
3616 let mut runner = Runner::new(workflow).unwrap();
3617
3618 runner.run().await.unwrap();
3619
3620 let events = runner.event_log().events();
3621 let completed = events
3622 .iter()
3623 .find(|e| matches!(&e.kind, EventKind::WorkflowCompleted { .. }));
3624
3625 assert!(completed.is_some());
3626 assert!(matches!(
3628 &completed.unwrap().kind,
3629 EventKind::WorkflowCompleted {
3630 total_duration_ms: _,
3631 ..
3632 }
3633 ));
3634 }
3635
3636 #[tokio::test]
3637 async fn workflow_started_event_has_generation_id() {
3638 let workflow = create_exec_workflow(vec![("a", "echo A")], vec![]);
3639 let mut runner = Runner::new(workflow).unwrap();
3640
3641 runner.run().await.unwrap();
3642
3643 let events = runner.event_log().events();
3644 let started = events
3645 .iter()
3646 .find(|e| matches!(&e.kind, EventKind::WorkflowStarted { .. }));
3647
3648 assert!(started.is_some());
3649 if let EventKind::WorkflowStarted { generation_id, .. } = &started.unwrap().kind {
3650 assert!(
3651 generation_id.starts_with("gen-"),
3652 "Generation ID should have prefix"
3653 );
3654 assert!(
3655 generation_id.len() > 10,
3656 "Generation ID should include UUID"
3657 );
3658 }
3659 }
3660
3661 #[test]
3666 fn test_cancel_token_default() {
3667 let workflow = make_empty_workflow();
3668 let runner = Runner::new(workflow).unwrap();
3669
3670 assert!(
3672 !runner.is_cancelled(),
3673 "Runner should not be cancelled by default"
3674 );
3675 }
3676
3677 #[test]
3678 fn test_cancel_token_can_be_set() {
3679 let workflow = make_empty_workflow();
3680 let token = CancellationToken::new();
3681 let token_clone = token.clone();
3682
3683 let runner = Runner::new(workflow).unwrap().with_cancel_token(token);
3684
3685 token_clone.cancel();
3687 assert!(runner.is_cancelled(), "Runner should detect cancellation");
3688 }
3689
3690 #[test]
3691 fn test_cancel_token_cloning() {
3692 let workflow = make_empty_workflow();
3693 let runner = Runner::new(workflow).unwrap();
3694
3695 let token1 = runner.cancel_token();
3696 let token2 = runner.cancel_token();
3697
3698 token1.cancel();
3700 assert!(token2.is_cancelled(), "Cloned tokens should share state");
3701 assert!(runner.is_cancelled(), "Runner should detect cancellation");
3702 }
3703
3704 #[tokio::test]
3705 async fn test_cancellation_before_start_returns_aborted() {
3706 let workflow = create_exec_workflow(vec![("slow", "sleep 10")], vec![]);
3708 let token = CancellationToken::new();
3709
3710 let mut runner = Runner::new(workflow)
3711 .unwrap()
3712 .with_cancel_token(token.clone());
3713
3714 token.cancel();
3716
3717 let result = runner.run().await;
3718 assert!(result.is_err(), "Cancelled workflow should return error");
3719
3720 let err = result.unwrap_err();
3721 assert!(
3722 err.to_string().contains("cancelled") || err.to_string().contains("aborted"),
3723 "Error should mention cancellation: {}",
3724 err
3725 );
3726
3727 let events = runner.event_log().events();
3729 let aborted = events
3730 .iter()
3731 .find(|e| matches!(&e.kind, EventKind::WorkflowAborted { .. }));
3732 assert!(aborted.is_some(), "WorkflowAborted event should be emitted");
3733 }
3734
3735 #[tokio::test]
3736 async fn test_cancellation_during_execution_aborts_workflow() {
3737 use std::time::Duration;
3738
3739 let workflow = create_exec_workflow(vec![("slow", "sleep 5")], vec![]);
3741 let token = CancellationToken::new();
3742 let token_clone = token.clone();
3743
3744 let mut runner = Runner::new(workflow).unwrap().with_cancel_token(token);
3745
3746 let handle = tokio::spawn(async move { runner.run().await });
3748
3749 tokio::time::sleep(Duration::from_millis(100)).await;
3751 token_clone.cancel();
3752
3753 let result = tokio::time::timeout(Duration::from_secs(2), handle).await;
3755 assert!(
3756 result.is_ok(),
3757 "Cancellation should complete within 2 seconds"
3758 );
3759
3760 let workflow_result = result.unwrap().unwrap();
3761 assert!(
3762 workflow_result.is_err(),
3763 "Cancelled workflow should return error"
3764 );
3765 }
3766
3767 #[tokio::test]
3768 async fn test_workflow_aborted_event_has_running_tasks() {
3769 use std::time::Duration;
3770
3771 let workflow = create_exec_workflow(
3773 vec![("slow1", "sleep 5"), ("slow2", "sleep 5")],
3774 vec![], );
3776 let token = CancellationToken::new();
3777 let token_clone = token.clone();
3778
3779 let event_log = EventLog::new();
3780 let event_log_clone = event_log.clone();
3781 let mut runner = Runner::with_event_log(workflow, event_log)
3782 .unwrap()
3783 .with_cancel_token(token);
3784
3785 let run_handle = tokio::spawn(async move { runner.run().await });
3787
3788 tokio::time::sleep(Duration::from_millis(100)).await;
3790 token_clone.cancel();
3791
3792 let result = run_handle.await.unwrap();
3794 assert!(result.is_err(), "Cancelled workflow should return error");
3795
3796 let events = event_log_clone.events();
3798 let aborted = events
3799 .iter()
3800 .find(|e| matches!(&e.kind, EventKind::WorkflowAborted { .. }));
3801 assert!(aborted.is_some(), "WorkflowAborted event should be emitted");
3802
3803 if let EventKind::WorkflowAborted { running_tasks, .. } = &aborted.unwrap().kind {
3804 assert!(
3806 !running_tasks.is_empty() || running_tasks.len() <= 2,
3807 "Should have captured running tasks (0-2 expected)"
3808 );
3809 }
3810 }
3811
3812 #[test]
3817 fn test_pause_state_default() {
3818 let workflow = make_empty_workflow();
3819 let runner = Runner::new(workflow).unwrap();
3820
3821 assert!(
3823 !runner.is_paused(),
3824 "Runner should not be paused by default"
3825 );
3826 }
3827
3828 #[test]
3829 fn test_pause_and_resume() {
3830 let workflow = make_empty_workflow();
3831 let runner = Runner::new(workflow).unwrap();
3832
3833 assert!(!runner.is_paused());
3835
3836 runner.pause();
3838 assert!(runner.is_paused(), "Runner should be paused after pause()");
3839
3840 runner.resume();
3842 assert!(
3843 !runner.is_paused(),
3844 "Runner should not be paused after resume()"
3845 );
3846 }
3847
3848 #[test]
3849 fn test_pause_handles_cloning() {
3850 let workflow = make_empty_workflow();
3851 let runner = Runner::new(workflow).unwrap();
3852
3853 let (paused1, notify1) = runner.pause_handles();
3854 let (paused2, _notify2) = runner.pause_handles();
3855
3856 runner.pause();
3858 assert!(
3859 paused1.load(Ordering::SeqCst),
3860 "First handle should see paused state"
3861 );
3862 assert!(
3863 paused2.load(Ordering::SeqCst),
3864 "Second handle should see paused state"
3865 );
3866
3867 runner.resume();
3869 assert!(
3870 !paused1.load(Ordering::SeqCst),
3871 "First handle should see resumed state"
3872 );
3873 assert!(
3874 !paused2.load(Ordering::SeqCst),
3875 "Second handle should see resumed state"
3876 );
3877
3878 notify1.notify_one();
3880 }
3881
3882 #[test]
3883 fn test_pause_emits_events() {
3884 let workflow = make_empty_workflow();
3885 let event_log = EventLog::new();
3886 let runner = Runner::with_event_log(workflow, event_log.clone()).unwrap();
3887
3888 runner.pause();
3890 runner.resume();
3891
3892 let events = event_log.events();
3894 let paused = events
3895 .iter()
3896 .find(|e| matches!(&e.kind, EventKind::WorkflowPaused));
3897 let resumed = events
3898 .iter()
3899 .find(|e| matches!(&e.kind, EventKind::WorkflowResumed));
3900
3901 assert!(paused.is_some(), "WorkflowPaused event should be emitted");
3902 assert!(resumed.is_some(), "WorkflowResumed event should be emitted");
3903 }
3904
3905 #[tokio::test]
3906 async fn test_pause_waits_for_resume() {
3907 use std::sync::atomic::AtomicUsize;
3908 use std::time::Duration;
3909
3910 let workflow = create_exec_workflow(vec![("task1", "echo done")], vec![]);
3912 let event_log = EventLog::new();
3913 let event_log_clone = event_log.clone();
3914 let mut runner = Runner::with_event_log(workflow, event_log).unwrap();
3915
3916 runner.pause();
3918
3919 let (paused, notify) = runner.pause_handles();
3920 let resume_count = Arc::new(AtomicUsize::new(0));
3921 let resume_count_clone = Arc::clone(&resume_count);
3922
3923 let handle = tokio::spawn(async move { runner.run().await });
3925
3926 tokio::time::sleep(Duration::from_millis(100)).await;
3928
3929 {
3931 let events = event_log_clone.events();
3932 let completed = events
3933 .iter()
3934 .find(|e| matches!(&e.kind, EventKind::WorkflowCompleted { .. }));
3935 assert!(
3936 completed.is_none(),
3937 "Workflow should be paused, not completed"
3938 );
3939 }
3940
3941 paused.store(false, Ordering::SeqCst);
3943 notify.notify_one();
3944 resume_count_clone.fetch_add(1, Ordering::SeqCst);
3945
3946 let result = tokio::time::timeout(Duration::from_secs(5), handle).await;
3948 assert!(result.is_ok(), "Workflow should complete after resume");
3949
3950 let inner_result = result.unwrap().unwrap();
3951 assert!(inner_result.is_ok(), "Workflow should succeed");
3952 }
3953
3954 #[test]
3959 fn test_value_to_array_direct_array() {
3960 use serde_json::json;
3961
3962 let value = json!(["a", "b", "c"]);
3963 let result = value_to_array(&value);
3964 assert!(result.is_some());
3965 assert_eq!(result.unwrap().len(), 3);
3966 }
3967
3968 #[test]
3969 fn test_value_to_array_json_string() {
3970 use serde_json::json;
3971
3972 let value = json!(r#"["x","y","z"]"#);
3974 let result = value_to_array(&value);
3975 assert!(result.is_some(), "Should parse JSON array string");
3976 let arr = result.unwrap();
3977 assert_eq!(arr.len(), 3);
3978 assert_eq!(arr[0], "x");
3979 assert_eq!(arr[1], "y");
3980 assert_eq!(arr[2], "z");
3981 }
3982
3983 #[test]
3984 fn test_value_to_array_json_string_with_whitespace() {
3985 use serde_json::json;
3986
3987 let value = json!(" [1, 2, 3] ");
3989 let result = value_to_array(&value);
3990 assert!(result.is_some(), "Should handle whitespace");
3991 assert_eq!(result.unwrap().len(), 3);
3992 }
3993
3994 #[test]
3995 fn test_value_to_array_not_array_string() {
3996 use serde_json::json;
3997
3998 let value = json!("hello world");
4000 let result = value_to_array(&value);
4001 assert!(result.is_none(), "Should return None for non-array string");
4002 }
4003
4004 #[test]
4005 fn test_value_to_array_object() {
4006 use serde_json::json;
4007
4008 let value = json!({"key": "value"});
4010 let result = value_to_array(&value);
4011 assert!(result.is_none(), "Should return None for object");
4012 }
4013
4014 #[test]
4015 fn test_value_to_array_number() {
4016 use serde_json::json;
4017
4018 let value = json!(42);
4020 let result = value_to_array(&value);
4021 assert!(result.is_none(), "Should return None for number");
4022 }
4023
4024 #[test]
4025 fn test_value_to_array_nested_json_string() {
4026 use serde_json::json;
4027
4028 let value = json!(r#"[{"id": 1}, {"id": 2}]"#);
4030 let result = value_to_array(&value);
4031 assert!(result.is_some(), "Should parse complex JSON array string");
4032 let arr = result.unwrap();
4033 assert_eq!(arr.len(), 2);
4034 assert_eq!(arr[0]["id"], 1);
4035 assert_eq!(arr[1]["id"], 2);
4036 }
4037
4038 #[test]
4039 fn test_value_to_array_invalid_json_string() {
4040 use serde_json::json;
4041
4042 let value = json!("[not valid json");
4044 let result = value_to_array(&value);
4045 assert!(result.is_none(), "Should return None for invalid JSON");
4046 }
4047
4048 #[test]
4049 fn test_value_to_array_markdown_fenced_json_array() {
4050 let value = Value::String("```json\n[\"a\", \"b\", \"c\"]\n```".to_string());
4051 let result = value_to_array(&value);
4052 assert!(
4053 result.is_some(),
4054 "Should parse JSON array from markdown fence"
4055 );
4056 let arr = result.unwrap();
4057 assert_eq!(arr.len(), 3);
4058 assert_eq!(arr[0], json!("a"));
4059 assert_eq!(arr[1], json!("b"));
4060 assert_eq!(arr[2], json!("c"));
4061 }
4062
4063 #[test]
4064 fn test_value_to_array_plain_fenced_json_array() {
4065 let value = Value::String("```\n[1, 2, 3]\n```".to_string());
4066 let result = value_to_array(&value);
4067 assert!(result.is_some(), "Should parse JSON array from plain fence");
4068 let arr = result.unwrap();
4069 assert_eq!(arr.len(), 3);
4070 assert_eq!(arr[0], json!(1));
4071 assert_eq!(arr[1], json!(2));
4072 assert_eq!(arr[2], json!(3));
4073 }
4074
4075 #[test]
4076 fn test_value_to_array_bare_json_string_still_works() {
4077 let value = Value::String("[\"x\", \"y\"]".to_string());
4078 let result = value_to_array(&value);
4079 assert!(result.is_some(), "Bare JSON array string should still work");
4080 let arr = result.unwrap();
4081 assert_eq!(arr.len(), 2);
4082 assert_eq!(arr[0], json!("x"));
4083 assert_eq!(arr[1], json!("y"));
4084 }
4085
4086 #[test]
4087 fn test_value_to_array_direct_array_value_still_works() {
4088 let value = json!(["a", "b"]);
4089 let result = value_to_array(&value);
4090 assert!(result.is_some(), "Direct array value should still work");
4091 let arr = result.unwrap();
4092 assert_eq!(arr.len(), 2);
4093 assert_eq!(arr[0], json!("a"));
4094 assert_eq!(arr[1], json!("b"));
4095 }
4096
4097 #[test]
4098 fn test_value_to_array_non_array_string_returns_none() {
4099 let value = Value::String("just a string".to_string());
4100 let result = value_to_array(&value);
4101 assert!(result.is_none(), "Non-array string should return None");
4102 }
4103
4104 #[test]
4105 fn test_value_to_array_empty_array() {
4106 use serde_json::json;
4107
4108 let value = json!([]);
4110 let result = value_to_array(&value);
4111 assert!(result.is_some());
4112 assert_eq!(result.unwrap().len(), 0);
4113
4114 let value = json!("[]");
4116 let result = value_to_array(&value);
4117 assert!(result.is_some());
4118 assert_eq!(result.unwrap().len(), 0);
4119 }
4120
4121 fn make_infer_task(
4131 name: &str,
4132 output: Option<AnalyzedOutput>,
4133 structured: Option<StructuredOutputSpec>,
4134 ) -> AnalyzedTask {
4135 AnalyzedTask {
4136 id: TaskId(0),
4137 name: name.to_string(),
4138 description: None,
4139 action: AnalyzedTaskAction::Infer(AnalyzedInferAction {
4140 prompt: "test prompt".to_string(),
4141 system: None,
4142 temperature: None,
4143 max_tokens: None,
4144 ..Default::default()
4145 }),
4146 provider: None,
4147 model: None,
4148 with_spec: Default::default(),
4149 depends_on: vec![],
4150 implicit_deps: vec![],
4151 output,
4152 for_each: None,
4153 retry: None,
4154 decompose: None,
4155 concurrency: None,
4156 fail_fast: None,
4157 artifact: None,
4158 log: None,
4159 structured,
4160 span: Span::dummy(),
4161 }
4162 }
4163
4164 #[test]
4165 fn test_get_retry_config_none_for_exec_task() {
4166 let task = AnalyzedTask {
4167 id: TaskId(0),
4168 name: "exec_task".to_string(),
4169 description: None,
4170 action: AnalyzedTaskAction::Exec(AnalyzedExecAction {
4171 command: "echo hi".to_string(),
4172 shell: false,
4173 cwd: None,
4174 env: IndexMap::new(),
4175 timeout_ms: None,
4176 span: Span::dummy(),
4177 }),
4178 provider: None,
4179 model: None,
4180 with_spec: Default::default(),
4181 depends_on: vec![],
4182 implicit_deps: vec![],
4183 output: Some(AnalyzedOutput {
4184 format: AnalyzedOutputFormat::Json,
4185 schema: Some(json!({"type": "object"})),
4186 schema_ref: None,
4187 max_retries: None,
4188 span: Span::dummy(),
4189 }),
4190 for_each: None,
4191 retry: None,
4192 decompose: None,
4193 concurrency: None,
4194 fail_fast: None,
4195 artifact: None,
4196 log: None,
4197 structured: Some(StructuredOutputSpec::with_inline_schema(
4198 json!({"type": "object"}),
4199 )),
4200 span: Span::dummy(),
4201 };
4202 assert!(
4203 Runner::get_retry_config(&task).is_none(),
4204 "Exec tasks should never qualify for retry"
4205 );
4206 }
4207
4208 #[test]
4209 fn test_get_retry_config_none_for_no_output() {
4210 let task = make_infer_task("no_output", None, None);
4211 assert!(
4212 Runner::get_retry_config(&task).is_none(),
4213 "No output means no retry"
4214 );
4215 }
4216
4217 #[test]
4218 fn test_get_retry_config_none_for_text_format() {
4219 let task = make_infer_task(
4220 "text_format",
4221 Some(AnalyzedOutput {
4222 format: AnalyzedOutputFormat::Text,
4223 schema: Some(json!({"type": "object"})),
4224 schema_ref: None,
4225 max_retries: None,
4226 span: Span::dummy(),
4227 }),
4228 Some(StructuredOutputSpec::with_inline_schema(
4229 json!({"type": "object"}),
4230 )),
4231 );
4232 assert!(
4233 Runner::get_retry_config(&task).is_none(),
4234 "Text format should not qualify for retry"
4235 );
4236 }
4237
4238 #[test]
4239 fn test_get_retry_config_none_for_json_no_schema() {
4240 let task = make_infer_task(
4241 "json_no_schema",
4242 Some(AnalyzedOutput {
4243 format: AnalyzedOutputFormat::Json,
4244 schema: None,
4245 schema_ref: None,
4246 max_retries: None,
4247 span: Span::dummy(),
4248 }),
4249 Some(StructuredOutputSpec::with_inline_schema(
4250 json!({"type": "object"}),
4251 )),
4252 );
4253 assert!(
4254 Runner::get_retry_config(&task).is_none(),
4255 "JSON without schema should not qualify"
4256 );
4257 }
4258
4259 #[test]
4260 fn test_get_retry_config_none_for_no_structured() {
4261 let task = make_infer_task(
4262 "no_structured",
4263 Some(AnalyzedOutput {
4264 format: AnalyzedOutputFormat::Json,
4265 schema: Some(json!({"type": "object"})),
4266 schema_ref: None,
4267 max_retries: None,
4268 span: Span::dummy(),
4269 }),
4270 None, );
4272 assert!(
4273 Runner::get_retry_config(&task).is_none(),
4274 "No structured spec means no retry"
4275 );
4276 }
4277
4278 #[test]
4279 fn test_get_retry_config_none_for_zero_retries() {
4280 let mut structured = StructuredOutputSpec::with_inline_schema(json!({"type": "object"}));
4281 structured.max_retries = Some(0);
4282 let task = make_infer_task(
4283 "zero_retries",
4284 Some(AnalyzedOutput {
4285 format: AnalyzedOutputFormat::Json,
4286 schema: Some(json!({"type": "object"})),
4287 schema_ref: None,
4288 max_retries: None,
4289 span: Span::dummy(),
4290 }),
4291 Some(structured),
4292 );
4293 assert!(
4294 Runner::get_retry_config(&task).is_none(),
4295 "Zero retries means no retry"
4296 );
4297 }
4298
4299 #[test]
4300 fn test_get_retry_config_none_for_default_retries() {
4301 let mut structured = StructuredOutputSpec::with_inline_schema(json!({"type": "object"}));
4302 structured.max_retries = None; let task = make_infer_task(
4304 "default_retries",
4305 Some(AnalyzedOutput {
4306 format: AnalyzedOutputFormat::Json,
4307 schema: Some(json!({"type": "object"})),
4308 schema_ref: None,
4309 max_retries: None,
4310 span: Span::dummy(),
4311 }),
4312 Some(structured),
4313 );
4314 assert!(
4315 Runner::get_retry_config(&task).is_none(),
4316 "Default retries (None → 0) means no retry"
4317 );
4318 }
4319
4320 #[test]
4321 fn test_get_retry_config_some_for_valid_config() {
4322 let schema = json!({"type": "object", "properties": {"name": {"type": "string"}}});
4323 let mut structured = StructuredOutputSpec::with_inline_schema(schema.clone());
4324 structured.max_retries = Some(3);
4325 let task = make_infer_task(
4326 "valid_retry",
4327 Some(AnalyzedOutput {
4328 format: AnalyzedOutputFormat::Json,
4329 schema: Some(schema.clone()),
4330 schema_ref: None,
4331 max_retries: None,
4332 span: Span::dummy(),
4333 }),
4334 Some(structured),
4335 );
4336 let result = Runner::get_retry_config(&task);
4337 assert!(result.is_some(), "Valid config should return Some");
4338
4339 let (ret_schema, max_retries, infer) = result.unwrap();
4340 assert_eq!(ret_schema, schema);
4341 assert_eq!(max_retries, 3);
4342 assert_eq!(infer.prompt, "test prompt");
4343 }
4344
4345 #[test]
4350 fn test_find_root_failure_none_when_empty() {
4351 let runner = Runner::new(make_empty_workflow()).unwrap();
4352 assert!(
4353 runner.find_root_failure().is_none(),
4354 "Empty workflow has no failures"
4355 );
4356 }
4357
4358 #[test]
4359 fn test_find_root_failure_none_when_all_succeed() {
4360 let workflow = create_exec_workflow(vec![("a", "echo a"), ("b", "echo b")], vec![]);
4361 let runner = Runner::new(workflow).unwrap();
4362
4363 runner.datastore.insert(
4365 intern("a"),
4366 TaskResult::success(json!("ok"), Duration::from_millis(10)),
4367 );
4368 runner.datastore.insert(
4369 intern("b"),
4370 TaskResult::success(json!("ok"), Duration::from_millis(20)),
4371 );
4372
4373 assert!(
4374 runner.find_root_failure().is_none(),
4375 "All-success should return None"
4376 );
4377 }
4378
4379 #[test]
4380 fn test_find_root_failure_returns_first_failed() {
4381 let workflow = create_exec_workflow(
4382 vec![("a", "echo a"), ("b", "echo b"), ("c", "echo c")],
4383 vec![],
4384 );
4385 let runner = Runner::new(workflow).unwrap();
4386
4387 runner.datastore.insert(
4388 intern("a"),
4389 TaskResult::success(json!("ok"), Duration::from_millis(10)),
4390 );
4391 runner.datastore.insert(
4392 intern("b"),
4393 TaskResult::failed("something broke".to_string(), Duration::from_millis(20)),
4394 );
4395 runner.datastore.insert(
4396 intern("c"),
4397 TaskResult::failed("also broken".to_string(), Duration::from_millis(30)),
4398 );
4399
4400 assert_eq!(
4401 runner.find_root_failure(),
4402 Some("b".to_string()),
4403 "Should return first failed task in workflow order"
4404 );
4405 }
4406
4407 #[test]
4408 fn test_find_root_failure_skips_dependency_failed() {
4409 let workflow = create_exec_workflow(
4410 vec![("a", "echo a"), ("b", "echo b"), ("c", "echo c")],
4411 vec![("a", "b"), ("b", "c")],
4412 );
4413 let runner = Runner::new(workflow).unwrap();
4414
4415 runner.datastore.insert(
4416 intern("a"),
4417 TaskResult::failed("root cause".to_string(), Duration::from_millis(10)),
4418 );
4419 runner
4420 .datastore
4421 .insert(intern("b"), TaskResult::dependency_failed("a"));
4422 runner
4423 .datastore
4424 .insert(intern("c"), TaskResult::dependency_failed("b"));
4425
4426 assert_eq!(
4427 runner.find_root_failure(),
4428 Some("a".to_string()),
4429 "Should skip DependencyFailed and return the actual failure"
4430 );
4431 }
4432
4433 #[test]
4438 fn test_get_pending_tasks_all_pending() {
4439 let workflow = create_exec_workflow(
4440 vec![("a", "echo a"), ("b", "echo b"), ("c", "echo c")],
4441 vec![],
4442 );
4443 let runner = Runner::new(workflow).unwrap();
4444
4445 let pending = runner.get_pending_tasks();
4446 assert_eq!(pending.len(), 3);
4447 assert!(pending.contains(&"a".to_string()));
4448 assert!(pending.contains(&"b".to_string()));
4449 assert!(pending.contains(&"c".to_string()));
4450 }
4451
4452 #[test]
4453 fn test_get_pending_tasks_excludes_completed() {
4454 let workflow = create_exec_workflow(
4455 vec![("a", "echo a"), ("b", "echo b"), ("c", "echo c")],
4456 vec![],
4457 );
4458 let runner = Runner::new(workflow).unwrap();
4459
4460 runner.datastore.insert(
4461 intern("a"),
4462 TaskResult::success(json!("ok"), Duration::from_millis(10)),
4463 );
4464 runner.datastore.insert(
4465 intern("c"),
4466 TaskResult::success(json!("ok"), Duration::from_millis(20)),
4467 );
4468
4469 let pending = runner.get_pending_tasks();
4470 assert_eq!(pending, vec!["b".to_string()]);
4471 }
4472
4473 #[test]
4474 fn test_get_pending_tasks_empty_when_all_done() {
4475 let workflow = create_exec_workflow(vec![("a", "echo a"), ("b", "echo b")], vec![]);
4476 let runner = Runner::new(workflow).unwrap();
4477
4478 runner.datastore.insert(
4479 intern("a"),
4480 TaskResult::success(json!("ok"), Duration::from_millis(10)),
4481 );
4482 runner.datastore.insert(
4483 intern("b"),
4484 TaskResult::success(json!("ok"), Duration::from_millis(20)),
4485 );
4486
4487 let pending = runner.get_pending_tasks();
4488 assert!(pending.is_empty());
4489 }
4490
4491 #[test]
4492 fn test_get_pending_tasks_excludes_failed() {
4493 let workflow = create_exec_workflow(vec![("a", "echo a"), ("b", "echo b")], vec![]);
4494 let runner = Runner::new(workflow).unwrap();
4495
4496 runner.datastore.insert(
4497 intern("a"),
4498 TaskResult::failed("error".to_string(), Duration::from_millis(10)),
4499 );
4500
4501 let pending = runner.get_pending_tasks();
4502 assert_eq!(pending, vec!["b".to_string()]);
4503 }
4504
4505 #[test]
4510 fn test_get_ready_tasks_no_deps() {
4511 let workflow = create_exec_workflow(vec![("a", "echo a"), ("b", "echo b")], vec![]);
4512 let runner = Runner::new(workflow).unwrap();
4513
4514 let ready = runner.get_ready_tasks();
4515 assert_eq!(ready.len(), 2, "Tasks with no deps should all be ready");
4516 }
4517
4518 #[test]
4519 fn test_get_ready_tasks_blocked_by_incomplete_dep() {
4520 let workflow =
4521 create_exec_workflow(vec![("a", "echo a"), ("b", "echo b")], vec![("a", "b")]);
4522 let runner = Runner::new(workflow).unwrap();
4523
4524 let ready = runner.get_ready_tasks();
4525 assert_eq!(ready.len(), 1);
4526 assert_eq!(ready[0].name, "a", "Only root task should be ready");
4527 }
4528
4529 #[test]
4530 fn test_get_ready_tasks_unblocked_after_dep_success() {
4531 let workflow =
4532 create_exec_workflow(vec![("a", "echo a"), ("b", "echo b")], vec![("a", "b")]);
4533 let runner = Runner::new(workflow).unwrap();
4534
4535 runner.datastore.insert(
4536 intern("a"),
4537 TaskResult::success(json!("ok"), Duration::from_millis(10)),
4538 );
4539
4540 let ready = runner.get_ready_tasks();
4541 assert_eq!(ready.len(), 1);
4542 assert_eq!(ready[0].name, "b", "b should be ready after a succeeds");
4543 }
4544
4545 #[test]
4546 fn test_get_ready_tasks_skips_already_done() {
4547 let workflow = create_exec_workflow(vec![("a", "echo a"), ("b", "echo b")], vec![]);
4548 let runner = Runner::new(workflow).unwrap();
4549
4550 runner.datastore.insert(
4551 intern("a"),
4552 TaskResult::success(json!("ok"), Duration::from_millis(10)),
4553 );
4554
4555 let ready = runner.get_ready_tasks();
4556 assert_eq!(ready.len(), 1);
4557 assert_eq!(ready[0].name, "b", "Completed task should not be returned");
4558 }
4559
4560 #[test]
4561 fn test_dependency_failure_propagation() {
4562 let workflow = create_exec_workflow(
4564 vec![("a", "echo a"), ("b", "echo b"), ("c", "echo c")],
4565 vec![("a", "b"), ("b", "c")],
4566 );
4567 let runner = Runner::new(workflow).unwrap();
4568
4569 runner.datastore.insert(
4571 intern("a"),
4572 TaskResult::failed("boom".to_string(), Duration::from_millis(10)),
4573 );
4574
4575 let ready = runner.get_ready_tasks();
4577 assert!(ready.is_empty(), "No tasks should be ready when dep failed");
4578
4579 let b_result = runner.datastore.get("b").expect("b should be in store");
4581 assert!(
4582 b_result.is_dependency_failed(),
4583 "b should be DependencyFailed"
4584 );
4585 assert_eq!(
4586 b_result.failed_dependency(),
4587 Some("a"),
4588 "b should record a as the failed dependency"
4589 );
4590
4591 let ready = runner.get_ready_tasks();
4593 assert!(ready.is_empty());
4594
4595 let c_result = runner.datastore.get("c").expect("c should be in store");
4596 assert!(
4597 c_result.is_dependency_failed(),
4598 "c should be DependencyFailed"
4599 );
4600 assert_eq!(
4601 c_result.failed_dependency(),
4602 Some("b"),
4603 "c should record b as the failed dependency"
4604 );
4605 }
4606
4607 #[test]
4608 fn test_dependency_failure_does_not_affect_parallel_tasks() {
4609 let workflow = create_exec_workflow(
4612 vec![
4613 ("a", "echo a"),
4614 ("b", "echo b"),
4615 ("c", "echo c"),
4616 ("d", "echo d"),
4617 ],
4618 vec![("a", "b"), ("a", "c")],
4619 );
4620 let runner = Runner::new(workflow).unwrap();
4621
4622 runner.datastore.insert(
4624 intern("a"),
4625 TaskResult::failed("oops".to_string(), Duration::from_millis(10)),
4626 );
4627
4628 let ready = runner.get_ready_tasks();
4629 assert_eq!(ready.len(), 1, "Only d should be ready");
4630 assert_eq!(ready[0].name, "d");
4631
4632 assert!(runner.datastore.get("b").unwrap().is_dependency_failed());
4634 assert!(runner.datastore.get("c").unwrap().is_dependency_failed());
4635 }
4636
4637 #[test]
4638 fn test_dependency_failure_emits_events() {
4639 let workflow =
4640 create_exec_workflow(vec![("a", "echo a"), ("b", "echo b")], vec![("a", "b")]);
4641 let runner = Runner::new(workflow).unwrap();
4642
4643 runner.datastore.insert(
4644 intern("a"),
4645 TaskResult::failed("crash".to_string(), Duration::from_millis(10)),
4646 );
4647
4648 let _ = runner.get_ready_tasks();
4650
4651 let events = runner.event_log.events();
4653 let skip_events: Vec<_> = events
4654 .iter()
4655 .filter(|e| {
4656 matches!(
4657 &e.kind,
4658 EventKind::TaskSkipped { task_id, .. } if task_id.as_ref() == "b"
4659 )
4660 })
4661 .collect();
4662 assert_eq!(
4663 skip_events.len(),
4664 1,
4665 "Should emit exactly one TaskSkipped event for b"
4666 );
4667 }
4668
4669 #[test]
4674 fn test_all_done_empty_workflow() {
4675 let runner = Runner::new(make_empty_workflow()).unwrap();
4676 assert!(runner.all_done(), "Empty workflow is trivially done");
4677 }
4678
4679 #[test]
4680 fn test_all_done_false_when_pending() {
4681 let workflow = create_exec_workflow(vec![("a", "echo a")], vec![]);
4682 let runner = Runner::new(workflow).unwrap();
4683 assert!(!runner.all_done());
4684 }
4685
4686 #[test]
4687 fn test_all_done_true_with_mixed_outcomes() {
4688 let workflow = create_exec_workflow(
4689 vec![("a", "echo a"), ("b", "echo b"), ("c", "echo c")],
4690 vec![],
4691 );
4692 let runner = Runner::new(workflow).unwrap();
4693
4694 runner.datastore.insert(
4695 intern("a"),
4696 TaskResult::success(json!("ok"), Duration::from_millis(10)),
4697 );
4698 runner.datastore.insert(
4699 intern("b"),
4700 TaskResult::failed("err".to_string(), Duration::from_millis(20)),
4701 );
4702 runner
4703 .datastore
4704 .insert(intern("c"), TaskResult::dependency_failed("b"));
4705
4706 assert!(
4707 runner.all_done(),
4708 "All tasks have results (success, failed, or dep-failed)"
4709 );
4710 }
4711
4712 #[tokio::test]
4717 async fn audit_for_each_empty_array_produces_empty_result() {
4718 let workflow = create_for_each_workflow(
4719 "empty_loop",
4720 "[]",
4721 "item",
4722 "echo {{with.item}}",
4723 None,
4724 true,
4725 false,
4726 );
4727
4728 let mut runner = Runner::new(workflow).unwrap();
4729 let result = runner.run().await;
4730 assert!(
4731 result.is_ok(),
4732 "Empty for_each should complete successfully: {:?}",
4733 result.err()
4734 );
4735
4736 let task_result = runner.datastore.get("empty_loop");
4737 assert!(task_result.is_some(), "Task result should exist");
4738
4739 let tr = task_result.unwrap();
4740 assert!(tr.is_success(), "Empty for_each should succeed");
4741
4742 let output = tr.output_str();
4745 let parsed: Result<Vec<Value>, _> = serde_json::from_str(&output);
4746 assert!(
4747 parsed.is_ok(),
4748 "Output should be valid JSON array, got: {}",
4749 output
4750 );
4751 assert_eq!(
4752 parsed.unwrap().len(),
4753 0,
4754 "Empty for_each should produce empty array, got: {}",
4755 output
4756 );
4757 }
4758
4759 #[tokio::test]
4760 async fn audit_for_each_single_item_array_works() {
4761 let workflow = create_for_each_workflow(
4762 "single",
4763 r#"["only_one"]"#,
4764 "item",
4765 "echo {{with.item}}",
4766 None,
4767 true,
4768 false,
4769 );
4770
4771 let mut runner = Runner::new(workflow).unwrap();
4772 let result = runner.run().await;
4773 assert!(
4774 result.is_ok(),
4775 "Single-item for_each should complete: {:?}",
4776 result.err()
4777 );
4778
4779 let task_result = runner.datastore.get("single");
4780 assert!(task_result.is_some(), "Task result should exist");
4781
4782 let tr = task_result.unwrap();
4783 assert!(tr.is_success(), "Task should succeed");
4784
4785 let output = tr.output_str();
4786 let parsed: Vec<Value> = serde_json::from_str(&output)
4787 .unwrap_or_else(|_| panic!("Output should be JSON array, got: {}", output));
4788 assert_eq!(parsed.len(), 1, "Should have exactly one result");
4789 let first_str = parsed[0].as_str().unwrap_or("");
4790 assert!(
4791 first_str.contains("only_one"),
4792 "Single result should contain 'only_one', got: {}",
4793 first_str
4794 );
4795 }
4796
4797 #[tokio::test]
4798 async fn audit_for_each_nested_json_items_bound_correctly() {
4799 let workflow = create_for_each_workflow(
4800 "nested_items",
4801 r#"[{"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}]"#,
4802 "person",
4803 "echo {{with.person}}",
4804 None,
4805 true,
4806 true,
4807 );
4808
4809 let mut runner = Runner::new(workflow).unwrap();
4810 let result = runner.run().await;
4811 assert!(
4812 result.is_ok(),
4813 "Nested JSON for_each should complete: {:?}",
4814 result.err()
4815 );
4816
4817 let task_result = runner.datastore.get("nested_items");
4818 assert!(task_result.is_some(), "Task result should exist");
4819
4820 let tr = task_result.unwrap();
4821 assert!(tr.is_success(), "Task should succeed");
4822
4823 let output = tr.output_str();
4824 assert!(
4825 output.contains("Alice") && output.contains("Bob"),
4826 "Output should contain both names from nested JSON items, got: {}",
4827 output
4828 );
4829 }
4830
4831 #[tokio::test]
4832 async fn audit_for_each_fail_fast_false_continues_after_failure() {
4833 let workflow = create_for_each_workflow(
4836 "continue_on_fail",
4837 r#"["ok1", "FAIL", "ok2"]"#,
4838 "item",
4839 "test '{{with.item}}' != 'FAIL' && echo {{with.item}}",
4840 Some(1),
4841 false, true, );
4844
4845 let mut runner = Runner::new(workflow).unwrap();
4846 let _result = runner.run().await;
4847
4848 let task_result = runner.datastore.get("continue_on_fail");
4849 assert!(task_result.is_some(), "Parent task result should exist");
4850
4851 let ok1_result = runner.datastore.get("continue_on_fail[0]");
4852 let fail_result = runner.datastore.get("continue_on_fail[1]");
4853 let ok2_result = runner.datastore.get("continue_on_fail[2]");
4854
4855 assert!(ok1_result.is_some(), "First iteration result should exist");
4856 assert!(
4857 fail_result.is_some(),
4858 "Second iteration result should exist"
4859 );
4860 assert!(
4861 ok2_result.is_some(),
4862 "Third iteration result should exist (fail_fast=false)"
4863 );
4864
4865 assert!(
4866 ok1_result.unwrap().is_success(),
4867 "First iteration should succeed"
4868 );
4869 assert!(
4870 !fail_result.unwrap().is_success(),
4871 "Second iteration should fail"
4872 );
4873 assert!(
4874 ok2_result.unwrap().is_success(),
4875 "Third iteration should succeed (not aborted by fail_fast=false)"
4876 );
4877 }
4878
4879 #[tokio::test]
4880 async fn audit_for_each_with_depends_on_runs_after_dependency() {
4881 let workflow = create_two_step_for_each_workflow(
4882 r#"echo '["red", "green", "blue"]'"#,
4883 true,
4884 "$step1",
4885 "echo color={{with.item}}",
4886 );
4887
4888 let mut runner = Runner::new(workflow).unwrap();
4889 let result = runner.run().await;
4890 assert!(
4891 result.is_ok(),
4892 "for_each with depends_on should complete: {:?}",
4893 result.err()
4894 );
4895
4896 let step1_result = runner.datastore.get("step1");
4897 assert!(step1_result.is_some());
4898 assert!(step1_result.unwrap().is_success());
4899
4900 let step2_result = runner.datastore.get("step2");
4901 assert!(step2_result.is_some());
4902 let tr = step2_result.unwrap();
4903 assert!(
4904 tr.is_success(),
4905 "step2 should succeed, got error: {:?}",
4906 tr.error()
4907 );
4908
4909 let output = tr.output_str();
4910 assert!(
4911 output.contains("red") && output.contains("green") && output.contains("blue"),
4912 "for_each with depends_on should produce all 3 colors, got: {}",
4913 output
4914 );
4915 }
4916
4917 #[tokio::test]
4918 async fn audit_for_each_items_non_array_non_string_errors() {
4919 let workflow =
4920 create_two_step_for_each_workflow("echo 42", false, "$step1", "echo {{with.item}}");
4921
4922 let mut runner = Runner::new(workflow).unwrap();
4923 let _ = runner.run().await;
4924
4925 let task_result = runner.datastore.get("step2");
4926 assert!(task_result.is_some(), "step2 result should exist");
4927
4928 let tr = task_result.unwrap();
4929 assert!(
4930 !tr.is_success(),
4931 "for_each with non-array binding should fail"
4932 );
4933 let error_msg = tr.error().expect("should have error");
4934 assert!(
4935 error_msg.contains("non-array"),
4936 "Error should mention non-array, got: {}",
4937 error_msg
4938 );
4939 }
4940
4941 #[tokio::test]
4942 async fn audit_for_each_large_array_with_concurrency() {
4943 let items: Vec<String> = (0..20).map(|i| format!("\"item{}\"", i)).collect();
4944 let items_json = format!("[{}]", items.join(", "));
4945
4946 let workflow = create_for_each_workflow(
4947 "large_batch",
4948 &items_json,
4949 "x",
4950 "echo {{with.x}}",
4951 Some(4),
4952 true,
4953 false,
4954 );
4955
4956 let mut runner = Runner::new(workflow).unwrap();
4957 let result = runner.run().await;
4958 assert!(
4959 result.is_ok(),
4960 "Large for_each should complete: {:?}",
4961 result.err()
4962 );
4963
4964 let task_result = runner.datastore.get("large_batch");
4965 assert!(task_result.is_some());
4966 let tr = task_result.unwrap();
4967 assert!(tr.is_success(), "Large batch should succeed");
4968
4969 let output = tr.output_str();
4970 let parsed: Vec<Value> = serde_json::from_str(&output)
4971 .unwrap_or_else(|_| panic!("Should be JSON array, got: {}", output));
4972 assert_eq!(
4973 parsed.len(),
4974 20,
4975 "Should have 20 results from large batch, got: {}",
4976 parsed.len()
4977 );
4978 }
4979
4980 #[tokio::test]
4985 async fn audit_structured_output_layer3_with_mock_callback_succeeds() {
4986 use crate::runtime::structured_output::{InferCallback, StructuredOutputEngine};
4987
4988 let log = Arc::new(EventLog::new());
4989 let schema = json!({
4990 "type": "object",
4991 "properties": {
4992 "name": { "type": "string" },
4993 "age": { "type": "integer" }
4994 },
4995 "required": ["name", "age"]
4996 });
4997 let mut spec = StructuredOutputSpec::with_inline_schema(schema);
4998 spec.max_retries = Some(2);
4999 spec.enable_retry = Some(true);
5000
5001 let callback: InferCallback = Arc::new(move |_prompt: String| {
5002 Box::pin(async move { Ok(r#"{"name": "Fixed", "age": 42}"#.to_string()) })
5003 });
5004
5005 let mut engine = StructuredOutputEngine::new(spec, log.clone())
5006 .with_infer_callback(callback)
5007 .with_original_prompt("Generate a user".to_string());
5008
5009 let result = engine
5010 .validate("retry-test", r#"{"name": "Incomplete"}"#)
5011 .await;
5012
5013 assert!(
5014 result.is_ok(),
5015 "Layer 3 should succeed with mock callback: {:?}",
5016 result.err()
5017 );
5018 let r = result.unwrap();
5019 assert_eq!(r.layer, 3, "Should have succeeded at Layer 3");
5020 assert_eq!(r.value["name"], "Fixed");
5021 assert_eq!(r.value["age"], 42);
5022 }
5023
5024 #[tokio::test]
5025 async fn audit_structured_output_layer3_exhausts_retries() {
5026 use crate::runtime::structured_output::{InferCallback, StructuredOutputEngine};
5027
5028 let log = Arc::new(EventLog::new());
5029 let schema = json!({
5030 "type": "object",
5031 "properties": {
5032 "name": { "type": "string" },
5033 "score": { "type": "number" }
5034 },
5035 "required": ["name", "score"]
5036 });
5037 let mut spec = StructuredOutputSpec::with_inline_schema(schema);
5038 spec.max_retries = Some(2);
5039 spec.enable_retry = Some(true);
5040 spec.enable_repair = Some(false);
5041
5042 let call_count = Arc::new(std::sync::atomic::AtomicU32::new(0));
5043 let call_count_clone = Arc::clone(&call_count);
5044 let callback: InferCallback = Arc::new(move |_prompt: String| {
5045 call_count_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
5046 Box::pin(async move { Ok(r#"{"name": "Still Wrong"}"#.to_string()) })
5047 });
5048
5049 let mut engine = StructuredOutputEngine::new(spec, log.clone())
5050 .with_infer_callback(callback)
5051 .with_original_prompt("test".to_string());
5052
5053 let result = engine
5054 .validate("exhaust-test", r#"{"name": "Invalid"}"#)
5055 .await;
5056
5057 assert!(result.is_err(), "Should fail after exhausting all retries");
5058
5059 let calls = call_count.load(std::sync::atomic::Ordering::SeqCst);
5060 assert_eq!(
5061 calls, 2,
5062 "Layer 3 should call LLM exactly max_retries times, got: {}",
5063 calls
5064 );
5065 }
5066
5067 #[tokio::test]
5068 async fn audit_structured_output_layer4_repair_succeeds() {
5069 use crate::runtime::structured_output::{InferCallback, StructuredOutputEngine};
5070
5071 let log = Arc::new(EventLog::new());
5072 let schema = json!({
5073 "type": "object",
5074 "properties": {
5075 "valid": { "type": "boolean" }
5076 },
5077 "required": ["valid"]
5078 });
5079 let mut spec = StructuredOutputSpec::with_inline_schema(schema);
5080 spec.enable_retry = Some(false);
5081 spec.enable_repair = Some(true);
5082
5083 let callback: InferCallback = Arc::new(move |_prompt: String| {
5084 Box::pin(async move { Ok(r#"{"valid": true}"#.to_string()) })
5085 });
5086
5087 let mut engine =
5088 StructuredOutputEngine::new(spec, log.clone()).with_infer_callback(callback);
5089
5090 let result = engine
5091 .validate("repair-test", r#"{"invalid_field": 123}"#)
5092 .await;
5093
5094 assert!(
5095 result.is_ok(),
5096 "Layer 4 repair should succeed: {:?}",
5097 result.err()
5098 );
5099 let r = result.unwrap();
5100 assert_eq!(r.layer, 4, "Should have succeeded at Layer 4");
5101 assert_eq!(r.value["valid"], true);
5102 }
5103
5104 #[tokio::test]
5105 async fn audit_structured_output_validates_array_schema() {
5106 use crate::runtime::structured_output::StructuredOutputEngine;
5107
5108 let log = Arc::new(EventLog::new());
5109 let schema = json!({
5110 "type": "array",
5111 "items": { "type": "string" },
5112 "minItems": 1
5113 });
5114 let spec = StructuredOutputSpec::with_inline_schema(schema);
5115 let mut engine = StructuredOutputEngine::new(spec, log);
5116
5117 let result = engine.validate("arr-ok", r#"["hello", "world"]"#).await;
5118 assert!(result.is_ok(), "String array should validate");
5119
5120 let mut engine2 = StructuredOutputEngine::new(
5121 StructuredOutputSpec::with_inline_schema(json!({
5122 "type": "array",
5123 "items": { "type": "string" },
5124 "minItems": 1
5125 })),
5126 Arc::new(EventLog::new()),
5127 );
5128 let result = engine2.validate("arr-empty", "[]").await;
5129 assert!(result.is_err(), "Empty array should fail minItems check");
5130 }
5131
5132 #[tokio::test]
5133 async fn audit_structured_output_validates_additional_properties_false() {
5134 use crate::runtime::structured_output::StructuredOutputEngine;
5135
5136 let log = Arc::new(EventLog::new());
5137 let spec = StructuredOutputSpec::with_inline_schema(json!({
5138 "type": "object",
5139 "properties": {
5140 "name": { "type": "string" }
5141 },
5142 "required": ["name"],
5143 "additionalProperties": false
5144 }));
5145 let mut engine = StructuredOutputEngine::new(spec, log);
5146
5147 let result = engine.validate("addl-ok", r#"{"name": "test"}"#).await;
5148 assert!(result.is_ok(), "Known properties only should validate");
5149
5150 let mut engine2 = StructuredOutputEngine::new(
5151 StructuredOutputSpec::with_inline_schema(json!({
5152 "type": "object",
5153 "properties": {
5154 "name": { "type": "string" }
5155 },
5156 "required": ["name"],
5157 "additionalProperties": false
5158 })),
5159 Arc::new(EventLog::new()),
5160 );
5161 let result = engine2
5162 .validate("addl-bad", r#"{"name": "test", "extra": true}"#)
5163 .await;
5164 assert!(
5165 result.is_err(),
5166 "Extra properties should fail when additionalProperties=false"
5167 );
5168 }
5169
5170 #[tokio::test]
5171 async fn audit_structured_output_validates_deeply_nested_schema() {
5172 use crate::runtime::structured_output::StructuredOutputEngine;
5173
5174 let log = Arc::new(EventLog::new());
5175 let schema = json!({
5176 "type": "object",
5177 "properties": {
5178 "level1": {
5179 "type": "object",
5180 "properties": {
5181 "level2": {
5182 "type": "object",
5183 "properties": {
5184 "value": { "type": "integer" }
5185 },
5186 "required": ["value"]
5187 }
5188 },
5189 "required": ["level2"]
5190 }
5191 },
5192 "required": ["level1"]
5193 });
5194 let spec = StructuredOutputSpec::with_inline_schema(schema);
5195 let mut engine = StructuredOutputEngine::new(spec, log);
5196
5197 let result = engine
5198 .validate("deep-ok", r#"{"level1": {"level2": {"value": 42}}}"#)
5199 .await;
5200 assert!(result.is_ok(), "Deeply nested valid should pass");
5201
5202 let mut engine2 = StructuredOutputEngine::new(
5203 StructuredOutputSpec::with_inline_schema(json!({
5204 "type": "object",
5205 "properties": {
5206 "level1": {
5207 "type": "object",
5208 "properties": {
5209 "level2": {
5210 "type": "object",
5211 "properties": {
5212 "value": { "type": "integer" }
5213 },
5214 "required": ["value"]
5215 }
5216 },
5217 "required": ["level2"]
5218 }
5219 },
5220 "required": ["level1"]
5221 })),
5222 Arc::new(EventLog::new()),
5223 );
5224 let result = engine2
5225 .validate(
5226 "deep-bad",
5227 r#"{"level1": {"level2": {"value": "not_a_number"}}}"#,
5228 )
5229 .await;
5230 assert!(
5231 result.is_err(),
5232 "Wrong type at deep level should fail validation"
5233 );
5234 }
5235
5236 #[tokio::test]
5237 async fn audit_structured_output_validates_primitive_types() {
5238 use crate::runtime::structured_output::StructuredOutputEngine;
5239
5240 let spec = StructuredOutputSpec::with_inline_schema(json!({"type": "string"}));
5242 let mut engine = StructuredOutputEngine::new(spec, Arc::new(EventLog::new()));
5243 let result = engine.validate("str-ok", r#""hello""#).await;
5244 assert!(
5245 result.is_ok(),
5246 "Quoted string should validate as string type"
5247 );
5248
5249 let spec = StructuredOutputSpec::with_inline_schema(json!({"type": "number"}));
5251 let mut engine = StructuredOutputEngine::new(spec, Arc::new(EventLog::new()));
5252 let result = engine.validate("num-ok", "42.5").await;
5253 assert!(result.is_ok(), "Number should validate as number type");
5254
5255 let spec = StructuredOutputSpec::with_inline_schema(json!({"type": "boolean"}));
5257 let mut engine = StructuredOutputEngine::new(spec, Arc::new(EventLog::new()));
5258 let result = engine.validate("bool-ok", "true").await;
5259 assert!(result.is_ok(), "Boolean should validate as boolean type");
5260
5261 let spec = StructuredOutputSpec::with_inline_schema(json!({"type": "null"}));
5263 let mut engine = StructuredOutputEngine::new(spec, Arc::new(EventLog::new()));
5264 let result = engine.validate("null-ok", "null").await;
5265 assert!(result.is_ok(), "null should validate as null type");
5266 }
5267
5268 #[test]
5273 fn audit_build_retry_prompt_includes_all_components() {
5274 let schema = json!({"type": "object", "required": ["name"]});
5275 let prompt = Runner::build_retry_prompt(
5276 "Generate a user",
5277 &schema,
5278 r#"{"broken": true}"#,
5279 "missing required field: name",
5280 );
5281
5282 assert!(
5283 prompt.contains("Generate a user"),
5284 "Should contain original prompt"
5285 );
5286 assert!(
5287 prompt.contains(r#"{"broken": true}"#),
5288 "Should contain the actual previous output"
5289 );
5290 assert!(
5291 prompt.contains("missing required field: name"),
5292 "Should contain validation errors"
5293 );
5294 }
5295
5296 #[test]
5301 fn audit_to_output_policy_preserves_structured_spec() {
5302 let schema = json!({"type": "object"});
5303 let mut spec = StructuredOutputSpec::with_inline_schema(schema.clone());
5304 spec.max_retries = Some(5);
5305 spec.enable_repair = Some(false);
5306
5307 let policy = spec.to_output_policy();
5308
5309 assert_eq!(policy.format, crate::ast::output::OutputFormat::Json,);
5310
5311 assert!(policy.schema.is_some());
5312
5313 assert_eq!(policy.max_retries, Some(5));
5314
5315 assert!(policy.source_structured_spec.is_some());
5316 let roundtripped = policy.source_structured_spec.unwrap();
5317 assert_eq!(roundtripped.max_retries, Some(5));
5318 assert_eq!(roundtripped.enable_repair, Some(false));
5319 }
5320
5321 #[test]
5326 fn lockfile_guard_creates_and_removes_on_drop() {
5327 let dir = tempfile::tempdir().unwrap();
5328 let lock_path = dir.path().join("store").join(".nika-run.lock");
5329
5330 {
5331 let _guard = LockfileGuard::create(lock_path.clone());
5332 assert!(
5333 lock_path.exists(),
5334 "Lockfile should exist while guard is alive"
5335 );
5336
5337 let content = std::fs::read_to_string(&lock_path).unwrap();
5338 assert!(
5339 content.starts_with("pid:"),
5340 "Lockfile should contain pid, got: {content}"
5341 );
5342 }
5343
5344 assert!(
5345 !lock_path.exists(),
5346 "Lockfile should be removed after guard is dropped"
5347 );
5348 }
5349
5350 #[test]
5351 fn lockfile_guard_removes_on_panic_unwind() {
5352 let dir = tempfile::tempdir().unwrap();
5353 let lock_path = dir.path().join("store").join(".nika-run.lock");
5354
5355 let result = std::panic::catch_unwind(|| {
5356 let _guard = LockfileGuard::create(lock_path.clone());
5357 assert!(lock_path.exists(), "Lockfile should exist before panic");
5358 panic!("simulated runner panic");
5359 });
5360
5361 assert!(result.is_err(), "Should have caught the panic");
5362 assert!(
5363 !lock_path.exists(),
5364 "Lockfile should be removed even after panic unwind"
5365 );
5366 }
5367
5368 #[test]
5369 fn lockfile_guard_removes_on_early_return() {
5370 let dir = tempfile::tempdir().unwrap();
5371 let lock_path = dir.path().join("store").join(".nika-run.lock");
5372
5373 fn simulate_early_return(path: &std::path::Path) -> Result<(), &'static str> {
5374 let _guard = LockfileGuard::create(path.to_path_buf());
5375 assert!(path.exists(), "Lockfile should exist before early return");
5376 Err("simulated ? operator bail-out")?;
5377 unreachable!()
5378 }
5379
5380 let result = simulate_early_return(&lock_path);
5381 assert!(result.is_err());
5382 assert!(
5383 !lock_path.exists(),
5384 "Lockfile should be removed after early return via ?"
5385 );
5386 }
5387
5388 #[test]
5389 fn lockfile_guard_tolerates_missing_file() {
5390 let dir = tempfile::tempdir().unwrap();
5393 let lock_path = dir.path().join("store").join(".nika-run.lock");
5394
5395 let guard = LockfileGuard::create(lock_path.clone());
5396 assert!(lock_path.exists());
5397
5398 std::fs::remove_file(&lock_path).unwrap();
5400 assert!(!lock_path.exists());
5401
5402 drop(guard);
5404 }
5405
5406 fn create_with_template_for_each_workflow(
5413 step1_cmd: &str,
5414 step1_shell: bool,
5415 for_each_template: &str,
5416 step2_cmd: &str,
5417 ) -> AnalyzedWorkflow {
5418 let mut task_table = TaskTable::new();
5419 task_table.insert("step1");
5420 task_table.insert("step2");
5421 let tid1 = task_table.get_id("step1").unwrap();
5422 let tid2 = task_table.get_id("step2").unwrap();
5423
5424 let step1 = AnalyzedTask {
5425 id: tid1,
5426 name: "step1".to_string(),
5427 description: None,
5428 action: AnalyzedTaskAction::Exec(AnalyzedExecAction {
5429 command: step1_cmd.to_string(),
5430 shell: step1_shell,
5431 cwd: None,
5432 env: IndexMap::new(),
5433 timeout_ms: None,
5434 span: Span::dummy(),
5435 }),
5436 provider: None,
5437 model: None,
5438 with_spec: Default::default(),
5439 depends_on: vec![],
5440 implicit_deps: vec![],
5441 output: None,
5442 for_each: None,
5443 retry: None,
5444 decompose: None,
5445 concurrency: None,
5446 fail_fast: None,
5447 artifact: None,
5448 log: None,
5449 structured: None,
5450 span: Span::dummy(),
5451 };
5452
5453 let mut with_spec = WithSpec::default();
5454 with_spec.insert(
5455 "step1".to_string(),
5456 WithEntry::simple(BindingPath {
5457 source: BindingSource::Task(intern("step1")),
5458 segments: vec![],
5459 }),
5460 );
5461
5462 let step2 = AnalyzedTask {
5463 id: tid2,
5464 name: "step2".to_string(),
5465 description: None,
5466 action: AnalyzedTaskAction::Exec(AnalyzedExecAction {
5467 command: step2_cmd.to_string(),
5468 shell: false,
5469 cwd: None,
5470 env: IndexMap::new(),
5471 timeout_ms: None,
5472 span: Span::dummy(),
5473 }),
5474 provider: None,
5475 model: None,
5476 with_spec,
5477 depends_on: vec![tid1],
5478 implicit_deps: vec![],
5479 output: None,
5480 for_each: Some(AnalyzedForEach {
5481 items: for_each_template.to_string(),
5482 as_var: "item".to_string(),
5483 concurrency: None,
5484 fail_fast: true,
5485 span: Span::dummy(),
5486 }),
5487 retry: None,
5488 decompose: None,
5489 concurrency: None,
5490 fail_fast: None,
5491 artifact: None,
5492 log: None,
5493 structured: None,
5494 span: Span::dummy(),
5495 };
5496
5497 AnalyzedWorkflow {
5498 schema_version: SchemaVersion::V03,
5499 name: None,
5500 description: None,
5501 provider: Some("mock".to_string()),
5502 model: None,
5503 task_table,
5504 tasks: vec![step1, step2],
5505 mcp_servers: IndexMap::new(),
5506 context_files: vec![],
5507 imports: vec![],
5508 inputs: IndexMap::new(),
5509 artifacts: None,
5510 log: None,
5511 agents: None,
5512 skills_map: std::collections::HashMap::new(),
5513 span: Span::dummy(),
5514 }
5515 }
5516
5517 #[tokio::test]
5518 async fn bug24_for_each_with_template_traversal_failure_records_error() {
5519 let workflow = create_with_template_for_each_workflow(
5521 r#"echo '{"items": ["a","b"]}'"#,
5522 true,
5523 "{{with.step1.nonexistent}}",
5524 "echo {{with.item}}",
5525 );
5526
5527 let mut runner = Runner::new(workflow).unwrap().quiet();
5528 let _ = runner.run().await;
5529
5530 let task_result = runner.datastore.get("step2");
5531 assert!(task_result.is_some(), "step2 result should exist");
5532
5533 let result = task_result.unwrap();
5534 assert!(
5535 !result.is_success(),
5536 "step2 should FAIL when path traversal fails, not run as regular task"
5537 );
5538 let error_msg = result.error().expect("should have error message");
5539 assert!(
5540 error_msg.contains("traversal failed"),
5541 "Error should mention path traversal failure, got: {}",
5542 error_msg
5543 );
5544 }
5545
5546 #[tokio::test]
5552 async fn bug25_for_each_with_template_non_array_records_error() {
5553 let workflow = create_with_template_for_each_workflow(
5555 "echo not_an_array",
5556 false,
5557 "{{with.step1}}",
5558 "echo {{with.item}}",
5559 );
5560
5561 let mut runner = Runner::new(workflow).unwrap().quiet();
5562 let _ = runner.run().await;
5563
5564 let task_result = runner.datastore.get("step2");
5565 assert!(task_result.is_some(), "step2 result should exist");
5566
5567 let result = task_result.unwrap();
5568 assert!(
5569 !result.is_success(),
5570 "step2 should FAIL when for_each binding resolves to non-array"
5571 );
5572 let error_msg = result.error().expect("should have error message");
5573 assert!(
5574 error_msg.contains("non-array"),
5575 "Error should mention 'non-array', got: {}",
5576 error_msg
5577 );
5578 }
5579
5580 #[tokio::test]
5585 async fn for_each_concurrent_fail_fast_cancels_remaining_iterations() {
5586 let workflow = create_for_each_workflow(
5607 "cancel_test",
5608 r#"["FAIL", "ok1", "wait2", "wait3", "wait4", "wait5"]"#,
5609 "item",
5610 "if [ '{{with.item}}' = 'FAIL' ]; then exit 1; else echo {{with.item}}; fi",
5611 Some(2), true, true, );
5615
5616 let mut runner = Runner::new(workflow).unwrap().quiet();
5617 let result = runner.run().await;
5618
5619 assert!(
5621 result.is_err(),
5622 "Workflow should fail when fail_fast triggers on concurrent for_each"
5623 );
5624
5625 let fail_iter = runner.datastore.get("cancel_test[0]");
5627 assert!(
5628 fail_iter.is_some(),
5629 "Failing iteration [0] result should exist"
5630 );
5631 assert!(
5632 !fail_iter.unwrap().is_success(),
5633 "Iteration [0] should have failed"
5634 );
5635
5636 let mut skipped_count = 0;
5640 let mut total_stored = 0;
5641 for idx in 2..6 {
5642 let key = format!("cancel_test[{}]", idx);
5643 if let Some(iter_result) = runner.datastore.get(&key) {
5644 total_stored += 1;
5645 if iter_result.is_skipped() {
5646 skipped_count += 1;
5647 }
5648 }
5649 }
5652
5653 let succeeded_after_cancel: Vec<usize> = (2..6)
5658 .filter(|idx| {
5659 let key = format!("cancel_test[{}]", idx);
5660 runner
5661 .datastore
5662 .get(&key)
5663 .map(|r| r.is_success())
5664 .unwrap_or(false)
5665 })
5666 .collect();
5667
5668 assert!(
5669 succeeded_after_cancel.is_empty(),
5670 "Iterations behind the semaphore should not succeed after fail_fast cancellation, \
5671 but these succeeded: {:?}",
5672 succeeded_after_cancel
5673 );
5674
5675 let not_spawned = 4 - total_stored;
5678 assert!(
5679 skipped_count + not_spawned >= 1,
5680 "At least one iteration should be cancelled (skipped={}, not_spawned={}). \
5681 This suggests cancellation tokens are not working for concurrent for_each.",
5682 skipped_count,
5683 not_spawned
5684 );
5685 }
5686
5687 #[tokio::test]
5693 async fn bug26_fail_fast_does_not_abort_unrelated_sibling_tasks() {
5694 let mut task_table = TaskTable::new();
5703 task_table.insert("failing_parent");
5704 task_table.insert("passing_parent");
5705 let tid_fail = task_table.get_id("failing_parent").unwrap();
5706 let tid_pass = task_table.get_id("passing_parent").unwrap();
5707
5708 let failing_parent = AnalyzedTask {
5709 id: tid_fail,
5710 name: "failing_parent".to_string(),
5711 description: None,
5712 action: AnalyzedTaskAction::Exec(AnalyzedExecAction {
5713 command: "test '{{with.item}}' != 'FAIL' && echo {{with.item}}".to_string(),
5714 shell: true,
5715 cwd: None,
5716 env: IndexMap::new(),
5717 timeout_ms: None,
5718 span: Span::dummy(),
5719 }),
5720 provider: None,
5721 model: None,
5722 with_spec: Default::default(),
5723 depends_on: vec![],
5724 implicit_deps: vec![],
5725 output: None,
5726 for_each: Some(AnalyzedForEach {
5727 items: r#"["ok", "FAIL", "ok2"]"#.to_string(),
5728 as_var: "item".to_string(),
5729 concurrency: Some(3),
5730 fail_fast: true,
5731 span: Span::dummy(),
5732 }),
5733 retry: None,
5734 decompose: None,
5735 concurrency: None,
5736 fail_fast: None,
5737 artifact: None,
5738 log: None,
5739 structured: None,
5740 span: Span::dummy(),
5741 };
5742
5743 let passing_parent = AnalyzedTask {
5744 id: tid_pass,
5745 name: "passing_parent".to_string(),
5746 description: None,
5747 action: AnalyzedTaskAction::Exec(AnalyzedExecAction {
5748 command: "echo {{with.item}}".to_string(),
5749 shell: true,
5750 cwd: None,
5751 env: IndexMap::new(),
5752 timeout_ms: None,
5753 span: Span::dummy(),
5754 }),
5755 provider: None,
5756 model: None,
5757 with_spec: Default::default(),
5758 depends_on: vec![],
5759 implicit_deps: vec![],
5760 output: None,
5761 for_each: Some(AnalyzedForEach {
5762 items: r#"["a", "b", "c"]"#.to_string(),
5763 as_var: "item".to_string(),
5764 concurrency: Some(3),
5765 fail_fast: true,
5766 span: Span::dummy(),
5767 }),
5768 retry: None,
5769 decompose: None,
5770 concurrency: None,
5771 fail_fast: None,
5772 artifact: None,
5773 log: None,
5774 structured: None,
5775 span: Span::dummy(),
5776 };
5777
5778 let workflow = AnalyzedWorkflow {
5779 schema_version: SchemaVersion::V03,
5780 name: None,
5781 description: None,
5782 provider: Some("mock".to_string()),
5783 model: None,
5784 task_table,
5785 tasks: vec![failing_parent, passing_parent],
5786 mcp_servers: IndexMap::new(),
5787 context_files: vec![],
5788 imports: vec![],
5789 inputs: IndexMap::new(),
5790 artifacts: None,
5791 log: None,
5792 agents: None,
5793 skills_map: std::collections::HashMap::new(),
5794 span: Span::dummy(),
5795 };
5796
5797 let mut runner = Runner::new(workflow).unwrap().quiet();
5798 let _ = runner.run().await;
5799
5800 let fail_result = runner.datastore.get("failing_parent");
5802 assert!(fail_result.is_some(), "failing_parent result should exist");
5803
5804 let pass_result = runner.datastore.get("passing_parent");
5806 assert!(
5807 pass_result.is_some(),
5808 "passing_parent result should exist (fail_fast of sibling should not abort it)"
5809 );
5810
5811 let pass_tr = pass_result.unwrap();
5812 assert!(
5813 pass_tr.is_success(),
5814 "passing_parent should succeed — its iterations were all OK. \
5815 Error: {:?}",
5816 pass_tr.error()
5817 );
5818 }
5819}