1use rustc_hash::FxHashMap;
11use std::sync::Arc;
12
13use serde::Deserialize;
14
15use crate::binding::WithSpec;
16use crate::error::NikaError;
17
18use super::action::TaskAction;
19use super::decompose::DecomposeSpec;
20use super::output::OutputPolicy;
21
22pub type McpConfigInline = nika_mcp::McpConfigInline;
39
40#[derive(Debug, Clone)]
42pub struct Workflow {
43 pub schema: String,
44 pub name: Option<String>,
45 pub provider: String,
46 pub model: Option<String>,
47 pub mcp: Option<FxHashMap<String, McpConfigInline>>,
53 pub context: Option<super::context::ContextConfig>,
58 pub include: Option<Vec<super::include::IncludeSpec>>,
63 pub agents: Option<FxHashMap<String, super::agent_def::AgentDef>>,
68 pub skills: Option<FxHashMap<String, super::skill_def::SkillDef>>,
72 pub artifacts: Option<super::artifact::ArtifactsConfig>,
76 pub log: Option<super::logging::LogConfig>,
80 pub inputs: Option<FxHashMap<String, serde_json::Value>>,
86 pub tasks: Vec<Arc<Task>>,
87}
88
89impl Workflow {
90 pub fn compute_hash(&self) -> String {
99 use xxhash_rust::xxh3::xxh3_64;
100
101 let mut hasher_input = String::new();
102 hasher_input.push_str(&self.schema);
103 hasher_input.push_str(&self.provider);
104 if let Some(ref model) = self.model {
105 hasher_input.push_str(model);
106 }
107 hasher_input.push_str(&self.tasks.len().to_string());
108 for task in &self.tasks {
109 hasher_input.push_str(&task.id);
110 }
111
112 let hash = xxh3_64(hasher_input.as_bytes());
113 format!("{:016x}", hash)
114 }
115
116 pub fn flow_count(&self) -> usize {
118 self.tasks
119 .iter()
120 .map(|t| t.depends_on.as_ref().map_or(0, |deps| deps.len()))
121 .sum()
122 }
123
124 pub fn edges(&self) -> Vec<(&str, &str)> {
128 let mut edges = Vec::new();
129 for task in &self.tasks {
130 if let Some(ref deps) = task.depends_on {
131 for dep in deps {
132 edges.push((dep.as_str(), task.id.as_str()));
133 }
134 }
135 }
136 edges
137 }
138}
139
140#[derive(Debug, Clone, Deserialize)]
141pub struct Task {
142 pub id: String,
143 #[serde(default, rename = "with")]
158 pub with_spec: Option<WithSpec>,
159 #[serde(default)]
161 pub output: Option<OutputPolicy>,
162 #[serde(default)]
179 pub decompose: Option<DecomposeSpec>,
180 #[serde(default)]
196 pub for_each: Option<serde_json::Value>,
197 #[serde(default, rename = "as")]
202 pub for_each_as: Option<String>,
203 #[serde(default)]
215 pub concurrency: Option<usize>,
216 #[serde(default)]
228 pub fail_fast: Option<bool>,
229 #[serde(flatten)]
230 pub action: TaskAction,
231 #[serde(default)]
236 pub artifact: Option<super::artifact::ArtifactSpec>,
237 #[serde(default)]
241 pub log: Option<super::logging::LogConfig>,
242 #[serde(default)]
256 pub depends_on: Option<Vec<String>>,
257 #[serde(default)]
281 pub structured: Option<super::structured::StructuredOutputSpec>,
282}
283
284impl Task {
285 pub fn validate_for_each(&self) -> Result<(), NikaError> {
294 if let Some(for_each) = &self.for_each {
295 if for_each.is_array() {
297 if let Some(arr) = for_each.as_array() {
298 if arr.is_empty() {
299 return Err(NikaError::ValidationError {
300 reason: "for_each array cannot be empty".to_string(),
301 });
302 }
303 }
304 return Ok(());
305 }
306 if let Some(s) = for_each.as_str() {
308 if s.contains("{{") || s.starts_with('$') {
309 return Ok(());
310 }
311 }
312 return Err(NikaError::ValidationError {
314 reason: format!(
315 "for_each must be an array or binding expression, got {}",
316 for_each
317 ),
318 });
319 }
320 Ok(())
321 }
322
323 pub fn has_for_each(&self) -> bool {
325 self.for_each.is_some()
326 }
327
328 pub fn for_each_var(&self) -> &str {
330 self.for_each_as.as_deref().unwrap_or("item")
331 }
332
333 pub fn for_each_concurrency(&self) -> usize {
335 self.concurrency.unwrap_or(1).max(1) }
337
338 pub fn for_each_fail_fast(&self) -> bool {
340 self.fail_fast.unwrap_or(true)
341 }
342
343 pub fn has_decompose(&self) -> bool {
345 self.decompose.is_some()
346 }
347
348 pub fn decompose_spec(&self) -> Option<&DecomposeSpec> {
350 self.decompose.as_ref()
351 }
352
353 pub fn action_icon(&self) -> &'static str {
364 match &self.action {
365 TaskAction::Infer { .. } => "⚡", TaskAction::Exec { .. } => "📟", TaskAction::Fetch { .. } => "🛰️", TaskAction::Invoke { .. } => "🔌", TaskAction::Agent { .. } => "🐔", }
371 }
372
373 pub fn subagent_icon() -> &'static str {
375 "🐤" }
377
378 pub fn depends_on_ids(&self) -> Vec<&str> {
382 self.depends_on
383 .as_ref()
384 .map(|deps| deps.iter().map(|s| s.as_str()).collect())
385 .unwrap_or_default()
386 }
387}
388
389#[cfg(test)]
392mod tests {
393 use super::*;
394 use crate::ast::parse_workflow;
395 use crate::serde_yaml;
396
397 #[test]
402 fn test_workflow_parse_minimal() {
403 let yaml = r#"
404schema: "nika/workflow@0.12"
405model: test-model
406tasks:
407 - id: hello
408 infer: "Say hello"
409"#;
410 let workflow = parse_workflow(yaml).expect("Failed to parse workflow");
411
412 assert_eq!(workflow.schema, "nika/workflow@0.12");
413 assert_eq!(workflow.provider, "claude"); assert_eq!(workflow.tasks.len(), 1);
415 assert_eq!(workflow.tasks[0].id, "hello");
416 assert_eq!(workflow.model.as_deref(), Some("test-model"));
417 assert!(workflow.mcp.is_none());
418 assert_eq!(workflow.flow_count(), 0);
419 }
420
421 #[test]
422 fn test_workflow_parse_with_provider_and_model() {
423 let yaml = r#"
424schema: "nika/workflow@0.12"
425provider: openai
426model: gpt-4-turbo
427tasks:
428 - id: task1
429 exec: "echo test"
430"#;
431 let workflow = parse_workflow(yaml).expect("Failed to parse workflow");
432
433 assert_eq!(workflow.provider, "openai");
434 assert_eq!(workflow.model, Some("gpt-4-turbo".to_string()));
435 }
436
437 #[test]
438 fn test_workflow_parse_multiple_tasks() {
439 let yaml = r#"
440schema: "nika/workflow@0.12"
441model: test-model
442tasks:
443 - id: task1
444 infer: "First task"
445 - id: task2
446 exec: "echo done"
447 - id: task3
448 fetch:
449 url: "https://example.com"
450"#;
451 let workflow = parse_workflow(yaml).expect("Failed to parse workflow");
452
453 assert_eq!(workflow.tasks.len(), 3);
454 assert_eq!(workflow.tasks[0].id, "task1");
455 assert_eq!(workflow.tasks[1].id, "task2");
456 assert_eq!(workflow.tasks[2].id, "task3");
457 }
458
459 #[test]
460 fn test_workflow_parse_with_mcp_config() {
461 let yaml = r#"
462schema: "nika/workflow@0.12"
463mcp:
464 servers:
465 novanet:
466 command: cargo
467 args: [run, -p, novanet-mcp]
468 env:
469 NEO4J_URI: bolt://localhost:7687
470tasks:
471 - id: invoke_task
472 invoke:
473 mcp: novanet
474 tool: novanet_context
475 params:
476 entity: qr-code
477"#;
478 let workflow = parse_workflow(yaml).expect("Failed to parse workflow");
479
480 assert!(workflow.mcp.is_some());
481 let mcp = workflow.mcp.unwrap();
482 assert!(mcp.contains_key("novanet"));
483
484 let novanet_config = &mcp["novanet"];
485 assert_eq!(novanet_config.command, "cargo");
486 assert_eq!(novanet_config.args.len(), 3);
487 }
488
489 #[test]
494 fn test_task_for_each_helpers_with_for_each() {
495 let yaml = r#"
496id: test_task
497for_each: ["en-US", "fr-FR", "de-DE"]
498as: locale
499concurrency: 3
500fail_fast: false
501infer: "Generate for {{with.locale}}"
502"#;
503 let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse task");
504
505 assert!(task.has_for_each());
506 assert_eq!(task.for_each_var(), "locale");
507 assert_eq!(task.for_each_concurrency(), 3);
508 assert!(!task.for_each_fail_fast());
509 }
510
511 #[test]
512 fn test_task_for_each_helpers_defaults() {
513 let yaml = r#"
514id: test_task
515for_each: ["a", "b"]
516infer: "Test {{with.item}}"
517"#;
518 let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse task");
519
520 assert!(task.has_for_each());
521 assert_eq!(task.for_each_var(), "item"); assert_eq!(task.for_each_concurrency(), 1); assert!(task.for_each_fail_fast()); }
525
526 #[test]
527 fn test_task_without_for_each() {
528 let yaml = r#"
529id: simple_task
530infer: "Simple test"
531"#;
532 let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse task");
533
534 assert!(!task.has_for_each());
535 assert_eq!(task.for_each_var(), "item");
536 assert_eq!(task.for_each_concurrency(), 1);
537 }
538
539 #[test]
540 fn test_task_decompose_helpers() {
541 let yaml = r#"
542id: decompose_task
543decompose:
544 strategy: semantic
545 traverse: HAS_CHILD
546 source: "$entity"
547infer: "Generate for {{with.item}}"
548"#;
549 let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse task");
550
551 assert!(task.has_decompose());
552 assert!(task.decompose_spec().is_some());
553 }
554
555 #[test]
556 fn test_task_without_decompose() {
557 let yaml = r#"
558id: normal_task
559infer: "No decompose"
560"#;
561 let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse task");
562
563 assert!(!task.has_decompose());
564 assert!(task.decompose_spec().is_none());
565 }
566
567 #[test]
572 fn test_validate_for_each_with_array() {
573 let yaml = r#"
574id: test
575for_each: ["a", "b", "c"]
576infer: "Test"
577"#;
578 let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
579 assert!(task.validate_for_each().is_ok());
580 }
581
582 #[test]
583 fn test_validate_for_each_with_binding_expression_template() {
584 let yaml = r#"
585id: test
586for_each: "{{with.items}}"
587infer: "Test"
588"#;
589 let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
590 assert!(task.validate_for_each().is_ok());
591 }
592
593 #[test]
594 fn test_validate_for_each_with_binding_expression_dollar() {
595 let yaml = r#"
596id: test
597for_each: "$items"
598infer: "Test"
599"#;
600 let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
601 assert!(task.validate_for_each().is_ok());
602 }
603
604 #[test]
605 fn test_validate_for_each_empty_array_fails() {
606 let yaml = r#"
607id: test
608for_each: []
609infer: "Test"
610"#;
611 let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
612 let result = task.validate_for_each();
613
614 assert!(result.is_err());
615 if let Err(e) = result {
616 let error_str = format!("{:?}", e);
617 assert!(error_str.contains("for_each array cannot be empty"));
618 }
619 }
620
621 #[test]
622 fn test_validate_for_each_invalid_type_fails() {
623 let yaml = r#"
624id: test
625for_each: 42
626infer: "Test"
627"#;
628 let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
629 let result = task.validate_for_each();
630
631 assert!(result.is_err());
632 if let Err(e) = result {
633 let error_str = format!("{:?}", e);
634 assert!(error_str.contains("for_each must be an array or binding expression"));
635 }
636 }
637
638 #[test]
639 fn test_validate_for_each_invalid_string_fails() {
640 let yaml = r#"
641id: test
642for_each: "plain_string"
643infer: "Test"
644"#;
645 let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
646 let result = task.validate_for_each();
647
648 assert!(result.is_err());
649 }
650
651 #[test]
652 fn test_validate_for_each_none() {
653 let yaml = r#"
654id: test
655infer: "Test"
656"#;
657 let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
658 assert!(task.validate_for_each().is_ok());
659 }
660
661 #[test]
666 fn test_task_action_icon_infer() {
667 let yaml = r#"
668id: test
669infer: "Generate something"
670"#;
671 let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
672 assert_eq!(task.action_icon(), "⚡");
673 }
674
675 #[test]
676 fn test_task_action_icon_exec() {
677 let yaml = r#"
678id: test
679exec: "echo hello"
680"#;
681 let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
682 assert_eq!(task.action_icon(), "📟");
683 }
684
685 #[test]
686 fn test_task_action_icon_fetch() {
687 let yaml = r#"
688id: test
689fetch:
690 url: "https://example.com"
691"#;
692 let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
693 assert_eq!(task.action_icon(), "🛰️");
694 }
695
696 #[test]
697 fn test_task_action_icon_invoke() {
698 let yaml = r#"
699id: test
700invoke:
701 mcp: novanet
702 tool: novanet_context
703"#;
704 let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
705 assert_eq!(task.action_icon(), "🔌");
706 }
707
708 #[test]
709 fn test_task_action_icon_agent() {
710 let yaml = r#"
711id: test
712agent:
713 prompt: "Generate something"
714"#;
715 let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
716 assert_eq!(task.action_icon(), "🐔");
717 }
718
719 #[test]
720 fn test_task_subagent_icon() {
721 assert_eq!(Task::subagent_icon(), "🐤");
722 }
723
724 #[test]
729 fn test_workflow_compute_hash() {
730 let yaml = r#"
731schema: "nika/workflow@0.12"
732provider: claude
733model: claude-sonnet-4-6
734tasks:
735 - id: task1
736 infer: "Test"
737 - id: task2
738 exec: "echo done"
739"#;
740 let workflow = parse_workflow(yaml).expect("Failed to parse");
741 let hash = workflow.compute_hash();
742
743 assert_eq!(hash.len(), 16);
745 assert!(hash.chars().all(|c| c.is_ascii_hexdigit()));
746 }
747
748 #[test]
749 fn test_workflow_compute_hash_consistency() {
750 let yaml = r#"
751schema: "nika/workflow@0.12"
752model: test-model
753tasks:
754 - id: task1
755 infer: "Test"
756"#;
757 let workflow = parse_workflow(yaml).expect("Failed to parse");
758 let hash1 = workflow.compute_hash();
759 let hash2 = workflow.compute_hash();
760
761 assert_eq!(hash1, hash2);
763 }
764
765 #[test]
766 fn test_workflow_compute_hash_differs_with_schema() {
767 let yaml_v10 = r#"
768schema: "nika/workflow@0.10"
769model: test-model
770tasks:
771 - id: task1
772 infer: "Test"
773"#;
774 let yaml_v12 = r#"
775schema: "nika/workflow@0.12"
776model: test-model
777tasks:
778 - id: task1
779 infer: "Test"
780"#;
781 let workflow_v10 = parse_workflow(yaml_v10).expect("Failed to parse");
782 let workflow_v12 = parse_workflow(yaml_v12).expect("Failed to parse");
783
784 let hash_v10 = workflow_v10.compute_hash();
785 let hash_v12 = workflow_v12.compute_hash();
786
787 assert_ne!(hash_v10, hash_v12);
789 }
790
791 #[test]
792 fn test_workflow_compute_hash_differs_with_tasks() {
793 let yaml_1task = r#"
794schema: "nika/workflow@0.12"
795model: test-model
796tasks:
797 - id: task1
798 infer: "Test"
799"#;
800 let yaml_2tasks = r#"
801schema: "nika/workflow@0.12"
802model: test-model
803tasks:
804 - id: task1
805 infer: "Test"
806 - id: task2
807 exec: "echo done"
808"#;
809 let workflow_1 = parse_workflow(yaml_1task).expect("Failed to parse");
810 let workflow_2 = parse_workflow(yaml_2tasks).expect("Failed to parse");
811
812 assert_ne!(workflow_1.compute_hash(), workflow_2.compute_hash());
814 }
815
816 #[test]
817 fn test_workflow_compute_hash_differs_with_model() {
818 let yaml_claude = r#"
819schema: "nika/workflow@0.12"
820model: claude-sonnet-4-6
821tasks:
822 - id: task1
823 infer: "Test"
824"#;
825 let yaml_openai = r#"
826schema: "nika/workflow@0.12"
827model: gpt-4-turbo
828tasks:
829 - id: task1
830 infer: "Test"
831"#;
832 let workflow_claude = parse_workflow(yaml_claude).expect("Failed to parse");
833 let workflow_openai = parse_workflow(yaml_openai).expect("Failed to parse");
834
835 assert_ne!(
837 workflow_claude.compute_hash(),
838 workflow_openai.compute_hash()
839 );
840 }
841
842 #[test]
847 fn test_task_depends_on_ids_returns_empty_when_no_deps() {
848 let yaml = r#"
849id: task1
850infer: "Test"
851"#;
852 let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
853 let deps = task.depends_on_ids();
854 assert!(deps.is_empty());
855 }
856
857 #[test]
858 fn test_task_depends_on_alias_works() {
859 let yaml = r#"
860id: task1
861depends_on: [step_a, step_b]
862infer: "Test"
863"#;
864 let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
865 let deps = task.depends_on_ids();
866 assert_eq!(deps, vec!["step_a", "step_b"]);
867 }
868
869 #[test]
870 fn test_task_depends_on_field_works() {
871 let yaml = r#"
872id: task1
873depends_on: [step_a, step_b]
874infer: "Test"
875"#;
876 let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
877 let deps = task.depends_on_ids();
878 assert_eq!(deps, vec!["step_a", "step_b"]);
879 }
880
881 #[test]
882 fn test_task_with_with_spec() {
883 let yaml = r#"
884id: task1
885with:
886 input: $previous_task.result
887infer: "Process {{with.input}}"
888"#;
889 let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
890 assert!(task.with_spec.is_some());
891 }
892
893 #[test]
894 fn test_task_with_output_policy() {
895 let yaml = r#"
896id: task1
897output:
898 format: json
899infer: "Generate JSON"
900"#;
901 let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
902 assert!(task.output.is_some());
903 }
904
905 #[test]
906 fn test_mcp_config_inline_minimal() {
907 let yaml = r#"
908schema: "nika/workflow@0.12"
909model: test-model
910mcp:
911 servers:
912 test_server:
913 command: echo
914tasks:
915 - id: task1
916 infer: "Test"
917"#;
918 let workflow = parse_workflow(yaml).expect("Failed to parse");
919 let mcp = workflow.mcp.unwrap();
920 let server = &mcp["test_server"];
921
922 assert_eq!(server.command, "echo");
923 assert!(server.args.is_empty());
924 assert!(server.env.is_empty());
925 assert!(server.cwd.is_none());
926 }
927
928 #[test]
929 fn test_mcp_config_inline_full() {
930 let yaml = r#"
931schema: "nika/workflow@0.12"
932model: test-model
933mcp:
934 servers:
935 novanet:
936 command: cargo
937 args: [run, -p, novanet-mcp]
938 env:
939 NEO4J_URI: bolt://localhost:7687
940 NEO4J_USER: neo4j
941 cwd: /path/to/workspace
942tasks:
943 - id: task1
944 infer: "Test"
945"#;
946 let workflow = parse_workflow(yaml).expect("Failed to parse");
947 let mcp = workflow.mcp.unwrap();
948 let server = &mcp["novanet"];
949
950 assert_eq!(server.command, "cargo");
951 assert_eq!(server.args.len(), 3);
952 assert_eq!(server.env.len(), 2);
953 assert_eq!(server.cwd, Some("/path/to/workspace".to_string()));
954 }
955
956 #[test]
957 fn test_task_concurrency_zero_becomes_one() {
958 let yaml = r#"
959id: test
960for_each: ["a", "b"]
961concurrency: 0
962infer: "Test"
963"#;
964 let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
965 assert_eq!(task.for_each_concurrency(), 1);
967 }
968
969 #[test]
970 fn test_task_concurrency_large_value() {
971 let yaml = r#"
972id: test
973for_each: ["a", "b"]
974concurrency: 1000
975infer: "Test"
976"#;
977 let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
978 assert_eq!(task.for_each_concurrency(), 1000);
979 }
980
981 #[test]
982 fn test_workflow_default_provider_is_claude() {
983 let yaml = r#"
984schema: "nika/workflow@0.12"
985model: test-model
986tasks:
987 - id: task1
988 infer: "Test"
989"#;
990 let workflow = parse_workflow(yaml).expect("Failed to parse");
991 assert_eq!(workflow.provider, "claude");
992 }
993
994 #[test]
995 fn test_task_as_field_empty_string() {
996 let yaml = r#"
997id: test
998for_each: ["a", "b"]
999as: ""
1000infer: "Test"
1001"#;
1002 let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
1003 assert_eq!(task.for_each_var(), "");
1005 }
1006
1007 #[test]
1008 fn test_task_as_field_custom_name() {
1009 let yaml = r#"
1010id: test
1011for_each: ["en-US", "fr-FR"]
1012as: locale
1013infer: "Generate {{with.locale}}"
1014"#;
1015 let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
1016 assert_eq!(task.for_each_var(), "locale");
1017 }
1018}