use rustc_hash::FxHashMap;
use std::sync::Arc;
use serde::Deserialize;
use crate::binding::WithSpec;
use crate::error::NikaError;
use super::action::TaskAction;
use super::decompose::DecomposeSpec;
use super::output::OutputPolicy;
pub type McpConfigInline = nika_mcp::McpConfigInline;
#[derive(Debug, Clone)]
pub struct Workflow {
pub schema: String,
pub name: Option<String>,
pub provider: String,
pub model: Option<String>,
pub mcp: Option<FxHashMap<String, McpConfigInline>>,
pub context: Option<super::context::ContextConfig>,
pub include: Option<Vec<super::include::IncludeSpec>>,
pub agents: Option<FxHashMap<String, super::agent_def::AgentDef>>,
pub skills: Option<FxHashMap<String, super::skill_def::SkillDef>>,
pub artifacts: Option<super::artifact::ArtifactsConfig>,
pub log: Option<super::logging::LogConfig>,
pub inputs: Option<FxHashMap<String, serde_json::Value>>,
pub tasks: Vec<Arc<Task>>,
}
impl Workflow {
pub fn compute_hash(&self) -> String {
use xxhash_rust::xxh3::xxh3_64;
let mut hasher_input = String::new();
hasher_input.push_str(&self.schema);
hasher_input.push_str(&self.provider);
if let Some(ref model) = self.model {
hasher_input.push_str(model);
}
hasher_input.push_str(&self.tasks.len().to_string());
for task in &self.tasks {
hasher_input.push_str(&task.id);
}
let hash = xxh3_64(hasher_input.as_bytes());
format!("{:016x}", hash)
}
pub fn flow_count(&self) -> usize {
self.tasks
.iter()
.map(|t| t.depends_on.as_ref().map_or(0, |deps| deps.len()))
.sum()
}
pub fn edges(&self) -> Vec<(&str, &str)> {
let mut edges = Vec::new();
for task in &self.tasks {
if let Some(ref deps) = task.depends_on {
for dep in deps {
edges.push((dep.as_str(), task.id.as_str()));
}
}
}
edges
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct Task {
pub id: String,
#[serde(default, rename = "with")]
pub with_spec: Option<WithSpec>,
#[serde(default)]
pub output: Option<OutputPolicy>,
#[serde(default)]
pub decompose: Option<DecomposeSpec>,
#[serde(default)]
pub for_each: Option<serde_json::Value>,
#[serde(default, rename = "as")]
pub for_each_as: Option<String>,
#[serde(default)]
pub concurrency: Option<usize>,
#[serde(default)]
pub fail_fast: Option<bool>,
#[serde(flatten)]
pub action: TaskAction,
#[serde(default)]
pub artifact: Option<super::artifact::ArtifactSpec>,
#[serde(default)]
pub log: Option<super::logging::LogConfig>,
#[serde(default)]
pub depends_on: Option<Vec<String>>,
#[serde(default)]
pub structured: Option<super::structured::StructuredOutputSpec>,
}
impl Task {
pub fn validate_for_each(&self) -> Result<(), NikaError> {
if let Some(for_each) = &self.for_each {
if for_each.is_array() {
if let Some(arr) = for_each.as_array() {
if arr.is_empty() {
return Err(NikaError::ValidationError {
reason: "for_each array cannot be empty".to_string(),
});
}
}
return Ok(());
}
if let Some(s) = for_each.as_str() {
if s.contains("{{") || s.starts_with('$') {
return Ok(());
}
}
return Err(NikaError::ValidationError {
reason: format!(
"for_each must be an array or binding expression, got {}",
for_each
),
});
}
Ok(())
}
pub fn has_for_each(&self) -> bool {
self.for_each.is_some()
}
pub fn for_each_var(&self) -> &str {
self.for_each_as.as_deref().unwrap_or("item")
}
pub fn for_each_concurrency(&self) -> usize {
self.concurrency.unwrap_or(1).max(1) }
pub fn for_each_fail_fast(&self) -> bool {
self.fail_fast.unwrap_or(true)
}
pub fn has_decompose(&self) -> bool {
self.decompose.is_some()
}
pub fn decompose_spec(&self) -> Option<&DecomposeSpec> {
self.decompose.as_ref()
}
pub fn action_icon(&self) -> &'static str {
match &self.action {
TaskAction::Infer { .. } => "⚡", TaskAction::Exec { .. } => "📟", TaskAction::Fetch { .. } => "🛰️", TaskAction::Invoke { .. } => "🔌", TaskAction::Agent { .. } => "🐔", }
}
pub fn subagent_icon() -> &'static str {
"🐤" }
pub fn depends_on_ids(&self) -> Vec<&str> {
self.depends_on
.as_ref()
.map(|deps| deps.iter().map(|s| s.as_str()).collect())
.unwrap_or_default()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ast::parse_workflow;
use crate::serde_yaml;
#[test]
fn test_workflow_parse_minimal() {
let yaml = r#"
schema: "nika/workflow@0.12"
model: test-model
tasks:
- id: hello
infer: "Say hello"
"#;
let workflow = parse_workflow(yaml).expect("Failed to parse workflow");
assert_eq!(workflow.schema, "nika/workflow@0.12");
assert_eq!(workflow.provider, "claude"); assert_eq!(workflow.tasks.len(), 1);
assert_eq!(workflow.tasks[0].id, "hello");
assert_eq!(workflow.model.as_deref(), Some("test-model"));
assert!(workflow.mcp.is_none());
assert_eq!(workflow.flow_count(), 0);
}
#[test]
fn test_workflow_parse_with_provider_and_model() {
let yaml = r#"
schema: "nika/workflow@0.12"
provider: openai
model: gpt-4-turbo
tasks:
- id: task1
exec: "echo test"
"#;
let workflow = parse_workflow(yaml).expect("Failed to parse workflow");
assert_eq!(workflow.provider, "openai");
assert_eq!(workflow.model, Some("gpt-4-turbo".to_string()));
}
#[test]
fn test_workflow_parse_multiple_tasks() {
let yaml = r#"
schema: "nika/workflow@0.12"
model: test-model
tasks:
- id: task1
infer: "First task"
- id: task2
exec: "echo done"
- id: task3
fetch:
url: "https://example.com"
"#;
let workflow = parse_workflow(yaml).expect("Failed to parse workflow");
assert_eq!(workflow.tasks.len(), 3);
assert_eq!(workflow.tasks[0].id, "task1");
assert_eq!(workflow.tasks[1].id, "task2");
assert_eq!(workflow.tasks[2].id, "task3");
}
#[test]
fn test_workflow_parse_with_mcp_config() {
let yaml = r#"
schema: "nika/workflow@0.12"
mcp:
servers:
novanet:
command: cargo
args: [run, -p, novanet-mcp]
env:
NEO4J_URI: bolt://localhost:7687
tasks:
- id: invoke_task
invoke:
mcp: novanet
tool: novanet_context
params:
entity: qr-code
"#;
let workflow = parse_workflow(yaml).expect("Failed to parse workflow");
assert!(workflow.mcp.is_some());
let mcp = workflow.mcp.unwrap();
assert!(mcp.contains_key("novanet"));
let novanet_config = &mcp["novanet"];
assert_eq!(novanet_config.command, "cargo");
assert_eq!(novanet_config.args.len(), 3);
}
#[test]
fn test_task_for_each_helpers_with_for_each() {
let yaml = r#"
id: test_task
for_each: ["en-US", "fr-FR", "de-DE"]
as: locale
concurrency: 3
fail_fast: false
infer: "Generate for {{with.locale}}"
"#;
let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse task");
assert!(task.has_for_each());
assert_eq!(task.for_each_var(), "locale");
assert_eq!(task.for_each_concurrency(), 3);
assert!(!task.for_each_fail_fast());
}
#[test]
fn test_task_for_each_helpers_defaults() {
let yaml = r#"
id: test_task
for_each: ["a", "b"]
infer: "Test {{with.item}}"
"#;
let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse task");
assert!(task.has_for_each());
assert_eq!(task.for_each_var(), "item"); assert_eq!(task.for_each_concurrency(), 1); assert!(task.for_each_fail_fast()); }
#[test]
fn test_task_without_for_each() {
let yaml = r#"
id: simple_task
infer: "Simple test"
"#;
let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse task");
assert!(!task.has_for_each());
assert_eq!(task.for_each_var(), "item");
assert_eq!(task.for_each_concurrency(), 1);
}
#[test]
fn test_task_decompose_helpers() {
let yaml = r#"
id: decompose_task
decompose:
strategy: semantic
traverse: HAS_CHILD
source: "$entity"
infer: "Generate for {{with.item}}"
"#;
let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse task");
assert!(task.has_decompose());
assert!(task.decompose_spec().is_some());
}
#[test]
fn test_task_without_decompose() {
let yaml = r#"
id: normal_task
infer: "No decompose"
"#;
let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse task");
assert!(!task.has_decompose());
assert!(task.decompose_spec().is_none());
}
#[test]
fn test_validate_for_each_with_array() {
let yaml = r#"
id: test
for_each: ["a", "b", "c"]
infer: "Test"
"#;
let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
assert!(task.validate_for_each().is_ok());
}
#[test]
fn test_validate_for_each_with_binding_expression_template() {
let yaml = r#"
id: test
for_each: "{{with.items}}"
infer: "Test"
"#;
let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
assert!(task.validate_for_each().is_ok());
}
#[test]
fn test_validate_for_each_with_binding_expression_dollar() {
let yaml = r#"
id: test
for_each: "$items"
infer: "Test"
"#;
let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
assert!(task.validate_for_each().is_ok());
}
#[test]
fn test_validate_for_each_empty_array_fails() {
let yaml = r#"
id: test
for_each: []
infer: "Test"
"#;
let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
let result = task.validate_for_each();
assert!(result.is_err());
if let Err(e) = result {
let error_str = format!("{:?}", e);
assert!(error_str.contains("for_each array cannot be empty"));
}
}
#[test]
fn test_validate_for_each_invalid_type_fails() {
let yaml = r#"
id: test
for_each: 42
infer: "Test"
"#;
let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
let result = task.validate_for_each();
assert!(result.is_err());
if let Err(e) = result {
let error_str = format!("{:?}", e);
assert!(error_str.contains("for_each must be an array or binding expression"));
}
}
#[test]
fn test_validate_for_each_invalid_string_fails() {
let yaml = r#"
id: test
for_each: "plain_string"
infer: "Test"
"#;
let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
let result = task.validate_for_each();
assert!(result.is_err());
}
#[test]
fn test_validate_for_each_none() {
let yaml = r#"
id: test
infer: "Test"
"#;
let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
assert!(task.validate_for_each().is_ok());
}
#[test]
fn test_task_action_icon_infer() {
let yaml = r#"
id: test
infer: "Generate something"
"#;
let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
assert_eq!(task.action_icon(), "⚡");
}
#[test]
fn test_task_action_icon_exec() {
let yaml = r#"
id: test
exec: "echo hello"
"#;
let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
assert_eq!(task.action_icon(), "📟");
}
#[test]
fn test_task_action_icon_fetch() {
let yaml = r#"
id: test
fetch:
url: "https://example.com"
"#;
let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
assert_eq!(task.action_icon(), "🛰️");
}
#[test]
fn test_task_action_icon_invoke() {
let yaml = r#"
id: test
invoke:
mcp: novanet
tool: novanet_context
"#;
let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
assert_eq!(task.action_icon(), "🔌");
}
#[test]
fn test_task_action_icon_agent() {
let yaml = r#"
id: test
agent:
prompt: "Generate something"
"#;
let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
assert_eq!(task.action_icon(), "🐔");
}
#[test]
fn test_task_subagent_icon() {
assert_eq!(Task::subagent_icon(), "🐤");
}
#[test]
fn test_workflow_compute_hash() {
let yaml = r#"
schema: "nika/workflow@0.12"
provider: claude
model: claude-sonnet-4-6
tasks:
- id: task1
infer: "Test"
- id: task2
exec: "echo done"
"#;
let workflow = parse_workflow(yaml).expect("Failed to parse");
let hash = workflow.compute_hash();
assert_eq!(hash.len(), 16);
assert!(hash.chars().all(|c| c.is_ascii_hexdigit()));
}
#[test]
fn test_workflow_compute_hash_consistency() {
let yaml = r#"
schema: "nika/workflow@0.12"
model: test-model
tasks:
- id: task1
infer: "Test"
"#;
let workflow = parse_workflow(yaml).expect("Failed to parse");
let hash1 = workflow.compute_hash();
let hash2 = workflow.compute_hash();
assert_eq!(hash1, hash2);
}
#[test]
fn test_workflow_compute_hash_differs_with_schema() {
let yaml_v10 = r#"
schema: "nika/workflow@0.10"
model: test-model
tasks:
- id: task1
infer: "Test"
"#;
let yaml_v12 = r#"
schema: "nika/workflow@0.12"
model: test-model
tasks:
- id: task1
infer: "Test"
"#;
let workflow_v10 = parse_workflow(yaml_v10).expect("Failed to parse");
let workflow_v12 = parse_workflow(yaml_v12).expect("Failed to parse");
let hash_v10 = workflow_v10.compute_hash();
let hash_v12 = workflow_v12.compute_hash();
assert_ne!(hash_v10, hash_v12);
}
#[test]
fn test_workflow_compute_hash_differs_with_tasks() {
let yaml_1task = r#"
schema: "nika/workflow@0.12"
model: test-model
tasks:
- id: task1
infer: "Test"
"#;
let yaml_2tasks = r#"
schema: "nika/workflow@0.12"
model: test-model
tasks:
- id: task1
infer: "Test"
- id: task2
exec: "echo done"
"#;
let workflow_1 = parse_workflow(yaml_1task).expect("Failed to parse");
let workflow_2 = parse_workflow(yaml_2tasks).expect("Failed to parse");
assert_ne!(workflow_1.compute_hash(), workflow_2.compute_hash());
}
#[test]
fn test_workflow_compute_hash_differs_with_model() {
let yaml_claude = r#"
schema: "nika/workflow@0.12"
model: claude-sonnet-4-6
tasks:
- id: task1
infer: "Test"
"#;
let yaml_openai = r#"
schema: "nika/workflow@0.12"
model: gpt-4-turbo
tasks:
- id: task1
infer: "Test"
"#;
let workflow_claude = parse_workflow(yaml_claude).expect("Failed to parse");
let workflow_openai = parse_workflow(yaml_openai).expect("Failed to parse");
assert_ne!(
workflow_claude.compute_hash(),
workflow_openai.compute_hash()
);
}
#[test]
fn test_task_depends_on_ids_returns_empty_when_no_deps() {
let yaml = r#"
id: task1
infer: "Test"
"#;
let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
let deps = task.depends_on_ids();
assert!(deps.is_empty());
}
#[test]
fn test_task_depends_on_alias_works() {
let yaml = r#"
id: task1
depends_on: [step_a, step_b]
infer: "Test"
"#;
let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
let deps = task.depends_on_ids();
assert_eq!(deps, vec!["step_a", "step_b"]);
}
#[test]
fn test_task_depends_on_field_works() {
let yaml = r#"
id: task1
depends_on: [step_a, step_b]
infer: "Test"
"#;
let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
let deps = task.depends_on_ids();
assert_eq!(deps, vec!["step_a", "step_b"]);
}
#[test]
fn test_task_with_with_spec() {
let yaml = r#"
id: task1
with:
input: $previous_task.result
infer: "Process {{with.input}}"
"#;
let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
assert!(task.with_spec.is_some());
}
#[test]
fn test_task_with_output_policy() {
let yaml = r#"
id: task1
output:
format: json
infer: "Generate JSON"
"#;
let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
assert!(task.output.is_some());
}
#[test]
fn test_mcp_config_inline_minimal() {
let yaml = r#"
schema: "nika/workflow@0.12"
model: test-model
mcp:
servers:
test_server:
command: echo
tasks:
- id: task1
infer: "Test"
"#;
let workflow = parse_workflow(yaml).expect("Failed to parse");
let mcp = workflow.mcp.unwrap();
let server = &mcp["test_server"];
assert_eq!(server.command, "echo");
assert!(server.args.is_empty());
assert!(server.env.is_empty());
assert!(server.cwd.is_none());
}
#[test]
fn test_mcp_config_inline_full() {
let yaml = r#"
schema: "nika/workflow@0.12"
model: test-model
mcp:
servers:
novanet:
command: cargo
args: [run, -p, novanet-mcp]
env:
NEO4J_URI: bolt://localhost:7687
NEO4J_USER: neo4j
cwd: /path/to/workspace
tasks:
- id: task1
infer: "Test"
"#;
let workflow = parse_workflow(yaml).expect("Failed to parse");
let mcp = workflow.mcp.unwrap();
let server = &mcp["novanet"];
assert_eq!(server.command, "cargo");
assert_eq!(server.args.len(), 3);
assert_eq!(server.env.len(), 2);
assert_eq!(server.cwd, Some("/path/to/workspace".to_string()));
}
#[test]
fn test_task_concurrency_zero_becomes_one() {
let yaml = r#"
id: test
for_each: ["a", "b"]
concurrency: 0
infer: "Test"
"#;
let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
assert_eq!(task.for_each_concurrency(), 1);
}
#[test]
fn test_task_concurrency_large_value() {
let yaml = r#"
id: test
for_each: ["a", "b"]
concurrency: 1000
infer: "Test"
"#;
let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
assert_eq!(task.for_each_concurrency(), 1000);
}
#[test]
fn test_workflow_default_provider_is_claude() {
let yaml = r#"
schema: "nika/workflow@0.12"
model: test-model
tasks:
- id: task1
infer: "Test"
"#;
let workflow = parse_workflow(yaml).expect("Failed to parse");
assert_eq!(workflow.provider, "claude");
}
#[test]
fn test_task_as_field_empty_string() {
let yaml = r#"
id: test
for_each: ["a", "b"]
as: ""
infer: "Test"
"#;
let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
assert_eq!(task.for_each_var(), "");
}
#[test]
fn test_task_as_field_custom_name() {
let yaml = r#"
id: test
for_each: ["en-US", "fr-FR"]
as: locale
infer: "Generate {{with.locale}}"
"#;
let task: Task = serde_yaml::from_str(yaml).expect("Failed to parse");
assert_eq!(task.for_each_var(), "locale");
}
}