use std::sync::Arc;
use indexmap::IndexMap;
use rustc_hash::FxHashMap;
use super::action::{ExecParams, FetchParams, InferParams, RetryConfig, TaskAction};
use super::agent::AgentParams;
use super::analyzed::{
AnalyzedAgentAction, AnalyzedContextFile, AnalyzedExecAction, AnalyzedFetchAction,
AnalyzedForEach, AnalyzedImportSpec, AnalyzedInferAction, AnalyzedInvokeAction,
AnalyzedMcpServer, AnalyzedOutput, AnalyzedRetry, AnalyzedTask, AnalyzedTaskAction,
AnalyzedWorkflow, HttpMethod, McpTransport, OutputFormat as AnalyzedOutputFormat, TaskId,
TaskTable,
};
use super::include::IncludeSpec;
use super::invoke::InvokeParams;
use super::output::{OutputFormat, OutputPolicy, SchemaRef};
use super::schema::SchemaVersion;
use super::workflow::{McpConfigInline, Task, Workflow};
use crate::error::NikaError;
use crate::source::Span;
pub fn lower(analyzed: AnalyzedWorkflow) -> Result<Workflow, NikaError> {
let AnalyzedWorkflow {
schema_version,
name,
description: _,
provider,
model,
task_table,
tasks,
mcp_servers,
context_files,
imports,
inputs,
artifacts,
log,
agents,
span: _,
} = analyzed;
let tasks: Vec<Arc<Task>> = tasks
.into_iter()
.map(|t| lower_task(t, &task_table).map(Arc::new))
.collect::<Result<_, _>>()?;
let mcp = lower_mcp_servers(mcp_servers);
let inputs = lower_inputs(inputs);
let context = if context_files.is_empty() {
None
} else {
let mut files = rustc_hash::FxHashMap::default();
for cf in &context_files {
if let Some(alias) = &cf.alias {
files.insert(alias.clone(), cf.path.clone());
}
}
Some(crate::ast::context::ContextConfig {
files,
session: None,
})
};
Ok(Workflow {
schema: schema_version.as_str().to_string(),
name,
provider: provider.unwrap_or_else(|| "claude".to_string()),
model,
mcp,
context,
include: lower_imports(imports),
agents: agents.map(|m| m.into_iter().collect()),
skills: None,
artifacts,
log,
inputs,
tasks,
})
}
fn lower_task(task: AnalyzedTask, table: &TaskTable) -> Result<Task, NikaError> {
let depends_on = task_dep_names(&task.depends_on, &task.implicit_deps, table)?;
let (for_each, for_each_as, fe_concurrency, fe_fail_fast) = lower_for_each(task.for_each);
let action = lower_action(task.action, task.provider, task.model, task.retry);
let output = task.output.map(lower_output);
let with_spec = if task.with_spec.is_empty() {
None
} else {
Some(task.with_spec)
};
let concurrency = fe_concurrency.or(task.concurrency.map(|c| c as usize));
let fail_fast = fe_fail_fast.or(task.fail_fast);
Ok(Task {
id: task.name,
with_spec,
output,
decompose: task.decompose,
for_each,
for_each_as,
concurrency,
fail_fast,
action,
artifact: task.artifact.clone(),
log: task.log.clone(),
depends_on,
structured: task.structured,
})
}
pub(crate) fn lower_action(
action: AnalyzedTaskAction,
provider: Option<String>,
model: Option<String>,
retry: Option<AnalyzedRetry>,
) -> TaskAction {
match action {
AnalyzedTaskAction::Infer(a) => TaskAction::Infer {
infer: lower_infer(a, provider, model),
},
AnalyzedTaskAction::Exec(a) => TaskAction::Exec {
exec: lower_exec(a),
},
AnalyzedTaskAction::Fetch(a) => TaskAction::Fetch {
fetch: lower_fetch(a, retry),
},
AnalyzedTaskAction::Invoke(a) => TaskAction::Invoke {
invoke: lower_invoke(a),
},
AnalyzedTaskAction::Agent(a) => TaskAction::Agent {
agent: lower_agent(a, provider, model),
},
}
}
fn lower_infer(
infer: AnalyzedInferAction,
provider: Option<String>,
model: Option<String>,
) -> InferParams {
use crate::ast::action::ResponseFormat;
let response_format =
infer
.response_format
.as_deref()
.and_then(|s| match s.to_lowercase().as_str() {
"text" => Some(ResponseFormat::Text),
"json" => Some(ResponseFormat::Json),
"markdown" => Some(ResponseFormat::Markdown),
_ => None,
});
InferParams {
prompt: infer.prompt,
provider,
model,
temperature: infer.temperature,
max_tokens: infer.max_tokens,
system: infer.system,
response_format,
extended_thinking: infer.thinking,
thinking_budget: infer.thinking_budget.map(u64::from),
content: infer
.content
.map(|parts| parts.into_iter().map(Into::into).collect()),
guardrails: infer.guardrails,
}
}
fn lower_exec(e: AnalyzedExecAction) -> ExecParams {
ExecParams {
command: e.command,
shell: Some(e.shell),
timeout: e.timeout_ms.map(|ms| ms.div_ceil(1000)),
cwd: e.working_dir,
env: if e.env.is_empty() {
None
} else {
Some(e.env.into_iter().collect())
},
}
}
fn lower_fetch(fetch: AnalyzedFetchAction, retry: Option<AnalyzedRetry>) -> FetchParams {
FetchParams {
url: fetch.url,
method: fetch.method.as_str().to_string(),
headers: fetch.headers.into_iter().collect(),
body: fetch.body,
json: fetch.json,
timeout: fetch.timeout_ms.map(|ms| ms.div_ceil(1000)),
retry: retry.map(lower_retry),
follow_redirects: Some(fetch.follow_redirects),
response: fetch.response,
extract: fetch.extract,
selector: fetch.selector,
}
}
fn lower_invoke(invoke: AnalyzedInvokeAction) -> InvokeParams {
InvokeParams {
mcp: invoke.server,
tool: Some(invoke.tool),
params: invoke.params,
resource: None,
timeout: invoke.timeout_ms.map(|ms| ms.div_ceil(1000)),
}
}
fn lower_agent(
agent: AnalyzedAgentAction,
provider: Option<String>,
model: Option<String>,
) -> AgentParams {
let tool_choice = agent
.tool_choice
.as_deref()
.and_then(|s| match s.to_lowercase().as_str() {
"auto" => Some(crate::ast::agent::ToolChoice::Auto),
"required" => Some(crate::ast::agent::ToolChoice::Required),
"none" => Some(crate::ast::agent::ToolChoice::None),
other => {
tracing::warn!(
tool_choice = other,
"invalid tool_choice value (expected \"auto\", \"required\", or \"none\"), ignoring"
);
None
}
});
AgentParams {
prompt: agent.prompt,
system: agent.system,
provider,
model,
mcp: agent.mcp,
tools: agent.tools,
max_turns: agent.max_iterations,
token_budget: agent.token_budget,
stop_sequences: agent.stop_sequences,
scope: agent.scope,
extended_thinking: agent.extended_thinking,
thinking_budget: agent.thinking_budget.map(u64::from),
depth_limit: agent.depth_limit,
tool_choice,
temperature: agent.temperature.map(|t| t as f32),
max_tokens: agent.max_tokens,
skills: if agent.skills.is_empty() {
None
} else {
Some(agent.skills)
},
completion: None,
guardrails: Vec::new(),
limits: None,
}
}
pub(crate) fn lower_output(output: AnalyzedOutput) -> OutputPolicy {
let schema = if let Some(ref schema_ref_str) = output.schema_ref {
Some(SchemaRef::File(schema_ref_str.clone()))
} else {
output.schema.map(|v| {
if let serde_json::Value::String(ref s) = v {
if s.starts_with("./") || s.starts_with('/') || s.ends_with(".json") {
return SchemaRef::File(s.clone());
}
}
SchemaRef::Inline(v)
})
};
OutputPolicy {
format: lower_output_format(output.format),
schema,
max_retries: output.max_retries.map(|v| v as u8),
source_structured_spec: None,
}
}
fn lower_output_format(fmt: AnalyzedOutputFormat) -> OutputFormat {
match fmt {
AnalyzedOutputFormat::Text => OutputFormat::Text,
AnalyzedOutputFormat::Json => OutputFormat::Json,
AnalyzedOutputFormat::Yaml => OutputFormat::Yaml,
}
}
pub(crate) fn lower_for_each(
fe: Option<AnalyzedForEach>,
) -> (
Option<serde_json::Value>,
Option<String>,
Option<usize>,
Option<bool>,
) {
match fe {
None => (None, None, None, None),
Some(fe) => {
let items = serde_json::from_str(&fe.items)
.unwrap_or_else(|_| serde_json::Value::String(fe.items));
let concurrency = fe.parallel.map(|p| p as usize);
(
Some(items),
Some(fe.as_var),
concurrency,
Some(fe.fail_fast),
)
}
}
}
fn lower_retry(retry: AnalyzedRetry) -> RetryConfig {
RetryConfig {
max_attempts: retry.max_attempts,
backoff_ms: retry.delay_ms,
multiplier: retry.backoff.unwrap_or(1.0),
}
}
const BACKOFF_UNITY_TOLERANCE: f64 = 0.0001;
fn unlower_retry(action: &TaskAction) -> Option<AnalyzedRetry> {
let retry = match action {
TaskAction::Fetch { fetch } => fetch.retry.as_ref(),
_ => None,
};
retry.map(|r| AnalyzedRetry {
max_attempts: r.max_attempts,
delay_ms: r.backoff_ms,
backoff: if r.multiplier.is_nan() || (r.multiplier - 1.0).abs() <= BACKOFF_UNITY_TOLERANCE {
None
} else {
Some(r.multiplier)
},
span: Span::dummy(),
})
}
pub(crate) fn lower_mcp_servers(
servers: IndexMap<String, AnalyzedMcpServer>,
) -> Option<FxHashMap<String, McpConfigInline>> {
if servers.is_empty() {
return None;
}
let map: FxHashMap<String, McpConfigInline> = servers
.into_iter()
.filter_map(|(name, server)| match server.transport {
McpTransport::Stdio => Some((
name,
McpConfigInline {
command: server.command.unwrap_or_default(),
args: server.args,
env: server.env.into_iter().collect(),
cwd: server.cwd,
},
)),
McpTransport::Sse => {
tracing::warn!(server = %name, "SSE MCP server has no runtime equivalent and will be dropped during lowering");
None
}
})
.collect();
if map.is_empty() {
None
} else {
Some(map)
}
}
fn lower_inputs(
inputs: IndexMap<String, serde_json::Value>,
) -> Option<FxHashMap<String, serde_json::Value>> {
if inputs.is_empty() {
None
} else {
Some(inputs.into_iter().collect())
}
}
fn lower_imports(imports: Vec<AnalyzedImportSpec>) -> Option<Vec<IncludeSpec>> {
if imports.is_empty() {
None
} else {
Some(
imports
.into_iter()
.map(|imp| IncludeSpec {
path: Some(imp.path),
pkg: None,
prefix: imp.prefix,
})
.collect(),
)
}
}
fn task_dep_names(
depends: &[TaskId],
implicit: &[TaskId],
table: &TaskTable,
) -> Result<Option<Vec<String>>, NikaError> {
let mut deps = Vec::new();
for id in depends.iter().chain(implicit.iter()) {
let name = table
.get_name(*id)
.ok_or_else(|| NikaError::ValidationError {
reason: format!(
"Lowering: TaskId({}) not found in TaskTable (invariant violation)",
id.0
),
})?;
deps.push(name.to_string());
}
Ok(if deps.is_empty() { None } else { Some(deps) })
}
pub fn unlower(workflow: Workflow) -> Result<AnalyzedWorkflow, NikaError> {
let schema_version = SchemaVersion::parse(&workflow.schema).unwrap_or(SchemaVersion::V12);
let mut task_table = TaskTable::new();
let mut analyzed_tasks = Vec::with_capacity(workflow.tasks.len());
for task in &workflow.tasks {
task_table.insert(&task.id);
}
for task in workflow.tasks {
let task = Arc::try_unwrap(task).unwrap_or_else(|arc| (*arc).clone());
let id = task_table.get_id(&task.id).expect("task just inserted");
let depends_on: Vec<TaskId> = match task.depends_on.as_ref() {
Some(deps) => {
let mut ids = Vec::with_capacity(deps.len());
for name in deps {
let dep_id =
task_table
.get_id(name)
.ok_or_else(|| NikaError::ValidationError {
reason: format!(
"Unlowering: dependency '{}' not found in TaskTable \
(invariant violation)",
name
),
})?;
ids.push(dep_id);
}
ids
}
None => vec![],
};
let action = unlower_action(&task.action);
let output = task.output.as_ref().map(unlower_output);
let for_each = unlower_for_each(
task.for_each.as_ref(),
task.for_each_as.as_ref(),
task.concurrency,
task.fail_fast,
);
let with_spec = task.with_spec.clone().unwrap_or_default();
let (task_provider, task_model) = extract_provider_model(&task.action);
analyzed_tasks.push(AnalyzedTask {
id,
name: task.id.clone(),
description: None,
action,
provider: task_provider,
model: task_model,
with_spec,
depends_on,
implicit_deps: vec![],
output,
for_each,
retry: unlower_retry(&task.action),
decompose: task.decompose.clone(),
concurrency: task
.concurrency
.map(|c| u32::try_from(c).unwrap_or(u32::MAX)),
fail_fast: task.fail_fast,
artifact: task.artifact.clone(),
log: task.log.clone(),
structured: task.structured.clone(),
span: Span::dummy(),
});
}
let mcp_servers = unlower_mcp_servers(workflow.mcp);
let inputs: IndexMap<String, serde_json::Value> = workflow
.inputs
.map(|m| m.into_iter().collect())
.unwrap_or_default();
let context_files = workflow
.context
.as_ref()
.map(|ctx| {
ctx.files
.iter()
.map(|(alias, path)| AnalyzedContextFile {
path: path.clone(),
alias: Some(alias.clone()),
max_bytes: None,
span: Span::dummy(),
})
.collect::<Vec<_>>()
})
.unwrap_or_default();
Ok(AnalyzedWorkflow {
schema_version,
name: workflow.name,
description: None,
provider: Some(workflow.provider),
model: workflow.model,
task_table,
tasks: analyzed_tasks,
mcp_servers,
context_files,
imports: vec![],
inputs,
artifacts: workflow.artifacts,
log: workflow.log,
agents: workflow
.agents
.map(|m| m.into_iter().collect::<IndexMap<_, _>>()),
span: Span::dummy(),
})
}
fn extract_provider_model(action: &TaskAction) -> (Option<String>, Option<String>) {
match action {
TaskAction::Infer { infer } => (infer.provider.clone(), infer.model.clone()),
TaskAction::Agent { agent } => (agent.provider.clone(), agent.model.clone()),
_ => (None, None),
}
}
fn unlower_action(action: &TaskAction) -> AnalyzedTaskAction {
match action {
TaskAction::Infer { infer } => AnalyzedTaskAction::Infer(AnalyzedInferAction {
prompt: infer.prompt.clone(),
system: infer.system.clone(),
temperature: infer.temperature,
max_tokens: infer.max_tokens,
thinking: infer.extended_thinking,
thinking_budget: infer
.thinking_budget
.map(|b| u32::try_from(b).unwrap_or(u32::MAX)),
content: infer.content.as_ref().map(|parts| {
parts
.iter()
.map(|p| match p {
crate::ast::content::ContentPart::Text { text } => {
crate::ast::content::AnalyzedContentPart::Text {
text: text.clone(),
}
}
crate::ast::content::ContentPart::Image { source, detail } => {
crate::ast::content::AnalyzedContentPart::Image {
source: source.clone(),
detail: *detail,
}
}
crate::ast::content::ContentPart::ImageUrl { url, detail } => {
crate::ast::content::AnalyzedContentPart::ImageUrl {
url: url.clone(),
detail: *detail,
}
}
})
.collect()
}),
response_format: infer.response_format.as_ref().map(|rf| {
use crate::ast::action::ResponseFormat;
match rf {
ResponseFormat::Text => "text".to_string(),
ResponseFormat::Json => "json".to_string(),
ResponseFormat::Markdown => "markdown".to_string(),
}
}),
guardrails: infer.guardrails.clone(),
span: Span::dummy(),
}),
TaskAction::Exec { exec } => AnalyzedTaskAction::Exec(AnalyzedExecAction {
command: exec.command.clone(),
shell: exec.shell.unwrap_or(false),
working_dir: exec.cwd.clone(),
env: exec
.env
.as_ref()
.map(|m| m.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
.unwrap_or_default(),
timeout_ms: exec.timeout.map(|s| s * 1000),
span: Span::dummy(),
}),
TaskAction::Fetch { fetch } => AnalyzedTaskAction::Fetch(AnalyzedFetchAction {
url: fetch.url.clone(),
method: HttpMethod::parse(&fetch.method).unwrap_or_else(|| {
tracing::warn!(method = %fetch.method, "Unknown HTTP method in lower, defaulting to GET");
HttpMethod::Get
}),
headers: fetch
.headers
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
body: fetch.body.clone(),
json: fetch.json.clone(),
timeout_ms: fetch.timeout.map(|s| s * 1000),
follow_redirects: fetch.follow_redirects.unwrap_or(true),
response: fetch.response.clone(),
extract: fetch.extract.clone(),
selector: fetch.selector.clone(),
span: Span::dummy(),
}),
TaskAction::Invoke { invoke } => AnalyzedTaskAction::Invoke(AnalyzedInvokeAction {
server: invoke.mcp.clone(),
tool: invoke.tool.clone().unwrap_or_default(),
params: invoke.params.clone(),
timeout_ms: invoke.timeout.map(|s| s * 1000),
span: Span::dummy(),
}),
TaskAction::Agent { agent } => AnalyzedTaskAction::Agent(AnalyzedAgentAction {
prompt: agent.prompt.clone(),
tools: agent.tools.clone(),
max_iterations: agent.max_turns,
max_tokens: agent.max_tokens,
from: None,
skills: agent
.skills
.as_ref()
.map(|s| s.to_vec())
.unwrap_or_default(),
mcp: agent.mcp.clone(),
system: agent.system.clone(),
temperature: agent.temperature.map(f64::from),
token_budget: agent.token_budget,
extended_thinking: agent.extended_thinking,
thinking_budget: agent.thinking_budget.map(|v| v as u32),
depth_limit: agent.depth_limit,
tool_choice: agent.tool_choice.as_ref().map(|tc| tc.as_str().to_string()),
stop_sequences: agent.stop_sequences.clone(),
scope: agent.scope.clone(),
span: Span::dummy(),
}),
}
}
fn unlower_output(output: &OutputPolicy) -> AnalyzedOutput {
let format = match output.format {
OutputFormat::Text => AnalyzedOutputFormat::Text,
OutputFormat::Json => AnalyzedOutputFormat::Json,
OutputFormat::Yaml => AnalyzedOutputFormat::Yaml,
OutputFormat::Markdown => AnalyzedOutputFormat::Text,
OutputFormat::Binary => AnalyzedOutputFormat::Text, };
let (schema, schema_ref) = match output.schema.as_ref() {
Some(SchemaRef::Inline(v)) => (Some(v.clone()), None),
Some(SchemaRef::File(path)) => (None, Some(path.clone())),
None => (None, None),
};
AnalyzedOutput {
format,
schema,
schema_ref,
max_retries: output.max_retries.map(u32::from),
span: Span::dummy(),
}
}
fn unlower_for_each(
items: Option<&serde_json::Value>,
as_var: Option<&String>,
concurrency: Option<usize>,
fail_fast: Option<bool>,
) -> Option<AnalyzedForEach> {
let items = items?;
let items_str = if items.is_string() {
items.as_str().unwrap().to_string()
} else {
serde_json::to_string(items).unwrap_or_default()
};
Some(AnalyzedForEach {
items: items_str,
as_var: as_var.cloned().unwrap_or_else(|| "item".to_string()),
parallel: concurrency.map(|c| u32::try_from(c).unwrap_or(u32::MAX)),
fail_fast: fail_fast.unwrap_or(true),
span: Span::dummy(),
})
}
fn unlower_mcp_servers(
mcp: Option<FxHashMap<String, McpConfigInline>>,
) -> IndexMap<String, AnalyzedMcpServer> {
let Some(mcp) = mcp else {
return IndexMap::new();
};
mcp.into_iter()
.map(|(name, config)| {
let server = AnalyzedMcpServer {
name: name.clone(),
command: Some(config.command),
args: config.args,
env: config.env.into_iter().collect(),
cwd: config.cwd,
url: None,
transport: McpTransport::Stdio,
span: Span::dummy(),
};
(name, server)
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ast::analyzed::*;
use crate::ast::schema::SchemaVersion;
use crate::binding::WithSpec;
use crate::source::Span;
fn dummy_workflow() -> AnalyzedWorkflow {
AnalyzedWorkflow {
schema_version: SchemaVersion::V12,
..Default::default()
}
}
fn dummy_task(id: TaskId, name: &str) -> AnalyzedTask {
AnalyzedTask {
id,
name: name.to_string(),
description: None,
action: AnalyzedTaskAction::default(),
provider: None,
model: None,
with_spec: WithSpec::default(),
depends_on: vec![],
implicit_deps: vec![],
output: None,
for_each: None,
retry: None,
decompose: None,
concurrency: None,
fail_fast: None,
artifact: None,
log: None,
structured: None,
span: Span::dummy(),
}
}
#[test]
fn lower_empty_workflow() {
let wf = dummy_workflow();
let lowered = lower(wf).unwrap();
assert_eq!(lowered.schema, "nika/workflow@0.12");
assert_eq!(lowered.provider, "claude");
assert!(lowered.tasks.is_empty());
assert!(lowered.mcp.is_none());
assert!(lowered.inputs.is_none());
}
#[test]
fn lower_provider_passthrough() {
let mut wf = dummy_workflow();
wf.provider = Some("openai".to_string());
wf.model = Some("gpt-4o".to_string());
let lowered = lower(wf).unwrap();
assert_eq!(lowered.provider, "openai");
assert_eq!(lowered.model.as_deref(), Some("gpt-4o"));
}
#[test]
fn lower_infer_task() {
let mut wf = dummy_workflow();
let id = wf.task_table.insert("step1");
wf.tasks.push(AnalyzedTask {
id,
name: "step1".to_string(),
action: AnalyzedTaskAction::Infer(AnalyzedInferAction {
prompt: "Hello".to_string(),
system: Some("You are helpful".to_string()),
temperature: Some(0.7),
max_tokens: Some(100),
thinking: Some(true),
thinking_budget: Some(8192),
content: None,
response_format: None,
guardrails: Vec::new(),
span: Span::dummy(),
}),
provider: Some("mistral".to_string()),
model: Some("mistral-large".to_string()),
with_spec: WithSpec::default(),
depends_on: vec![],
implicit_deps: vec![],
output: None,
for_each: None,
retry: None,
decompose: None,
concurrency: None,
fail_fast: None,
artifact: None,
log: None,
structured: None,
description: None,
span: Span::dummy(),
});
let lowered = lower(wf).unwrap();
assert_eq!(lowered.tasks.len(), 1);
let task = &lowered.tasks[0];
assert_eq!(task.id, "step1");
match &task.action {
TaskAction::Infer { infer } => {
assert_eq!(infer.prompt, "Hello");
assert_eq!(infer.system.as_deref(), Some("You are helpful"));
assert_eq!(infer.temperature, Some(0.7));
assert_eq!(infer.max_tokens, Some(100));
assert_eq!(infer.provider.as_deref(), Some("mistral"));
assert_eq!(infer.model.as_deref(), Some("mistral-large"));
assert_eq!(infer.extended_thinking, Some(true));
assert_eq!(infer.thinking_budget, Some(8192));
}
_ => panic!("expected Infer action"),
}
}
#[test]
fn lower_exec_task() {
let mut wf = dummy_workflow();
let id = wf.task_table.insert("build");
let mut env = IndexMap::new();
env.insert("NODE_ENV".to_string(), "production".to_string());
wf.tasks.push(AnalyzedTask {
action: AnalyzedTaskAction::Exec(AnalyzedExecAction {
command: "npm run build".to_string(),
shell: true,
working_dir: Some("/app".to_string()),
env,
timeout_ms: Some(30000),
span: Span::dummy(),
}),
..dummy_task(id, "build")
});
let lowered = lower(wf).unwrap();
match &lowered.tasks[0].action {
TaskAction::Exec { exec: e } => {
assert_eq!(e.command, "npm run build");
assert_eq!(e.shell, Some(true));
assert_eq!(e.cwd.as_deref(), Some("/app"));
assert_eq!(e.timeout, Some(30)); let env = e.env.as_ref().unwrap();
assert_eq!(env.get("NODE_ENV").map(String::as_str), Some("production"));
}
_ => panic!("expected Exec action"),
}
}
#[test]
fn lower_fetch_with_retry() {
let mut wf = dummy_workflow();
let id = wf.task_table.insert("fetch_data");
wf.tasks.push(AnalyzedTask {
action: AnalyzedTaskAction::Fetch(AnalyzedFetchAction {
url: "https://api.example.com".to_string(),
method: HttpMethod::Post,
headers: IndexMap::new(),
body: None,
json: Some(serde_json::json!({"key": "value"})),
timeout_ms: Some(5000),
follow_redirects: false,
response: None,
extract: None,
selector: None,
span: Span::dummy(),
}),
retry: Some(AnalyzedRetry {
max_attempts: 3,
delay_ms: 1000,
backoff: Some(2.0),
span: Span::dummy(),
}),
..dummy_task(id, "fetch_data")
});
let lowered = lower(wf).unwrap();
match &lowered.tasks[0].action {
TaskAction::Fetch { fetch } => {
assert_eq!(fetch.url, "https://api.example.com");
assert_eq!(fetch.method, "POST");
assert_eq!(fetch.follow_redirects, Some(false));
assert!(fetch.json.is_some());
let retry = fetch.retry.as_ref().expect("retry should be set");
assert_eq!(retry.max_attempts, 3);
assert_eq!(retry.backoff_ms, 1000);
assert_eq!(retry.multiplier, 2.0);
}
_ => panic!("expected Fetch action"),
}
}
#[test]
fn lower_invoke_task() {
let mut wf = dummy_workflow();
let id = wf.task_table.insert("call_tool");
wf.tasks.push(AnalyzedTask {
action: AnalyzedTaskAction::Invoke(AnalyzedInvokeAction {
server: Some("novanet".to_string()),
tool: "novanet_context".to_string(),
params: Some(serde_json::json!({"entity": "qr-code"})),
timeout_ms: None,
span: Span::dummy(),
}),
..dummy_task(id, "call_tool")
});
let lowered = lower(wf).unwrap();
match &lowered.tasks[0].action {
TaskAction::Invoke { invoke } => {
assert_eq!(invoke.mcp.as_deref(), Some("novanet"));
assert_eq!(invoke.tool.as_deref(), Some("novanet_context"));
assert!(invoke.params.is_some());
assert!(invoke.resource.is_none());
}
_ => panic!("expected Invoke action"),
}
}
#[test]
fn lower_agent_task() {
let mut wf = dummy_workflow();
let id = wf.task_table.insert("researcher");
wf.tasks.push(AnalyzedTask {
action: AnalyzedTaskAction::Agent(AnalyzedAgentAction {
prompt: "Research AI papers".to_string(),
tools: vec!["nika:read".to_string(), "nika:write".to_string()],
max_iterations: Some(10),
max_tokens: Some(4096),
from: None,
skills: vec!["writing".to_string()],
mcp: vec![],
system: None,
temperature: None,
token_budget: None,
extended_thinking: None,
thinking_budget: None,
depth_limit: None,
tool_choice: None,
stop_sequences: vec![],
scope: None,
span: Span::dummy(),
}),
provider: Some("claude".to_string()),
..dummy_task(id, "researcher")
});
let lowered = lower(wf).unwrap();
match &lowered.tasks[0].action {
TaskAction::Agent { agent } => {
assert_eq!(agent.prompt, "Research AI papers");
assert_eq!(agent.tools, vec!["nika:read", "nika:write"]);
assert_eq!(agent.max_turns, Some(10));
assert_eq!(agent.max_tokens, Some(4096));
assert_eq!(agent.provider.as_deref(), Some("claude"));
assert_eq!(agent.skills.as_deref(), Some(&["writing".to_string()][..]));
}
_ => panic!("expected Agent action"),
}
}
#[test]
fn lower_flows_from_deps() {
let mut wf = dummy_workflow();
let id_a = wf.task_table.insert("a");
let id_b = wf.task_table.insert("b");
let id_c = wf.task_table.insert("c");
wf.tasks.push(dummy_task(id_a, "a"));
wf.tasks.push(AnalyzedTask {
depends_on: vec![id_a],
..dummy_task(id_b, "b")
});
wf.tasks.push(AnalyzedTask {
depends_on: vec![id_a],
implicit_deps: vec![id_b],
..dummy_task(id_c, "c")
});
let lowered = lower(wf).unwrap();
assert!(lowered.tasks[0].depends_on.is_none()); assert_eq!(
lowered.tasks[1].depends_on.as_deref(),
Some(&["a".to_string()][..])
);
let c_deps = lowered.tasks[2].depends_on.as_ref().unwrap();
assert_eq!(c_deps.len(), 2);
assert!(c_deps.contains(&"a".to_string()));
assert!(c_deps.contains(&"b".to_string()));
}
#[test]
fn lower_for_each_array() {
let mut wf = dummy_workflow();
let id = wf.task_table.insert("par");
wf.tasks.push(AnalyzedTask {
for_each: Some(AnalyzedForEach {
items: r#"["a","b","c"]"#.to_string(),
as_var: "item".to_string(),
parallel: Some(3),
fail_fast: true,
span: Span::dummy(),
}),
..dummy_task(id, "par")
});
let lowered = lower(wf).unwrap();
let task = &lowered.tasks[0];
let items = task.for_each.as_ref().unwrap();
assert!(items.is_array());
assert_eq!(items.as_array().unwrap().len(), 3);
assert_eq!(task.for_each_as.as_deref(), Some("item"));
assert_eq!(task.concurrency, Some(3));
assert_eq!(task.fail_fast, Some(true));
}
#[test]
fn lower_for_each_binding() {
let (items, as_var, conc, _) = lower_for_each(Some(AnalyzedForEach {
items: "$my_list".to_string(),
as_var: "x".to_string(),
parallel: None,
fail_fast: false,
span: Span::dummy(),
}));
assert_eq!(
items.unwrap(),
serde_json::Value::String("$my_list".to_string())
);
assert_eq!(as_var.unwrap(), "x");
assert!(conc.is_none());
}
#[test]
fn lower_mcp_stdio_only() {
let mut servers = IndexMap::new();
servers.insert(
"novanet".to_string(),
AnalyzedMcpServer {
name: "novanet".to_string(),
command: Some("npx".to_string()),
args: vec!["-y".to_string(), "@novanet/mcp".to_string()],
env: IndexMap::new(),
cwd: None,
url: None,
transport: McpTransport::Stdio,
span: Span::dummy(),
},
);
servers.insert(
"sse_only".to_string(),
AnalyzedMcpServer {
name: "sse_only".to_string(),
command: None,
args: vec![],
env: IndexMap::new(),
cwd: None,
url: Some("https://mcp.example.com".to_string()),
transport: McpTransport::Sse,
span: Span::dummy(),
},
);
let result = lower_mcp_servers(servers);
let map = result.expect("should have stdio server");
assert_eq!(map.len(), 1);
assert!(map.contains_key("novanet"));
assert!(!map.contains_key("sse_only"));
assert_eq!(map["novanet"].command, "npx");
}
#[test]
fn lower_output_json_with_schema() {
let output = AnalyzedOutput {
format: AnalyzedOutputFormat::Json,
schema: Some(serde_json::json!({"type": "object"})),
schema_ref: None,
max_retries: None,
span: Span::dummy(),
};
let lowered = lower_output(output);
assert!(matches!(
lowered.format,
crate::ast::output::OutputFormat::Json
));
match lowered.schema {
Some(SchemaRef::Inline(v)) => {
assert_eq!(v, serde_json::json!({"type": "object"}))
}
_ => panic!("expected Inline schema"),
}
}
#[test]
fn lower_inputs_map() {
let mut inputs = IndexMap::new();
inputs.insert("name".to_string(), serde_json::json!("test"));
inputs.insert("count".to_string(), serde_json::json!(42));
let result = lower_inputs(inputs);
let map = result.expect("should have inputs");
assert_eq!(map.len(), 2);
assert_eq!(map["name"], serde_json::json!("test"));
}
#[test]
fn lower_with_spec_empty_becomes_none() {
let mut wf = dummy_workflow();
let id = wf.task_table.insert("t");
wf.tasks.push(dummy_task(id, "t"));
let lowered = lower(wf).unwrap();
assert!(lowered.tasks[0].with_spec.is_none());
}
#[test]
fn lower_preserves_workflow_artifacts() {
let mut wf = dummy_workflow();
wf.artifacts = Some(crate::ast::artifact::ArtifactsConfig {
dir: Some("./output".to_string()),
..Default::default()
});
let lowered = lower(wf).unwrap();
assert!(
lowered.artifacts.is_some(),
"lower() must preserve workflow-level artifacts"
);
assert_eq!(
lowered.artifacts.as_ref().unwrap().dir.as_deref(),
Some("./output")
);
}
#[test]
fn lower_preserves_workflow_log() {
let mut wf = dummy_workflow();
wf.log = Some(crate::ast::logging::LogConfig::default());
let lowered = lower(wf).unwrap();
assert!(
lowered.log.is_some(),
"lower() must preserve workflow-level log config"
);
}
#[test]
fn lower_preserves_workflow_agents() {
let mut wf = dummy_workflow();
let mut agents = IndexMap::new();
agents.insert(
"researcher".to_string(),
crate::ast::agent_def::AgentDef::From {
from: "./agents/researcher.yaml".to_string(),
},
);
wf.agents = Some(agents);
let lowered = lower(wf).unwrap();
assert!(
lowered.agents.is_some(),
"lower() must preserve workflow-level agents"
);
assert!(lowered.agents.as_ref().unwrap().contains_key("researcher"));
}
#[test]
fn lower_task_preserves_artifact() {
let mut wf = dummy_workflow();
let id = wf.task_table.insert("t");
let mut task = dummy_task(id, "t");
task.artifact = Some(crate::ast::artifact::ArtifactSpec::Enabled(true));
wf.tasks.push(task);
let lowered = lower(wf).unwrap();
assert!(
lowered.tasks[0].artifact.is_some(),
"lower_task() must preserve artifact config"
);
}
#[test]
fn lower_task_preserves_log() {
let mut wf = dummy_workflow();
let id = wf.task_table.insert("t");
let mut task = dummy_task(id, "t");
task.log = Some(crate::ast::logging::LogConfig::default());
wf.tasks.push(task);
let lowered = lower(wf).unwrap();
assert!(
lowered.tasks[0].log.is_some(),
"lower_task() must preserve log config"
);
}
#[test]
fn lower_task_structured_passthrough() {
use crate::ast::structured::StructuredOutputSpec;
let mut wf = dummy_workflow();
let id = wf.task_table.insert("t");
let mut task = dummy_task(id, "t");
task.structured = Some(StructuredOutputSpec::with_file_schema("./schema.json"));
wf.tasks.push(task);
let lowered = lower(wf).unwrap();
let structured = lowered.tasks[0]
.structured
.as_ref()
.expect("structured should pass through lower_task");
assert!(matches!(structured.schema, SchemaRef::File(ref p) if p == "./schema.json"));
}
#[test]
fn roundtrip_preserves_task_artifact_and_log() {
let mut wf = dummy_workflow();
let id = wf.task_table.insert("t");
let mut task = dummy_task(id, "t");
task.artifact = Some(crate::ast::artifact::ArtifactSpec::Enabled(true));
task.log = Some(crate::ast::logging::LogConfig::default());
wf.tasks.push(task);
let lowered = lower(wf).unwrap();
assert!(
lowered.tasks[0].artifact.is_some(),
"lower() must preserve task artifact"
);
assert!(
lowered.tasks[0].log.is_some(),
"lower() must preserve task log"
);
let unl = unlower(lowered).unwrap();
assert!(
unl.tasks[0].artifact.is_some(),
"unlower() must restore task artifact"
);
assert!(
unl.tasks[0].log.is_some(),
"unlower() must restore task log"
);
}
#[test]
fn lower_retry_only_applies_to_fetch() {
let mut wf = dummy_workflow();
let id = wf.task_table.insert("infer_retry");
let mut task = dummy_task(id, "infer_retry");
task.retry = Some(AnalyzedRetry {
max_attempts: 3,
delay_ms: 1000,
backoff: Some(2.0),
span: Span::dummy(),
});
wf.tasks.push(task);
let lowered = lower(wf).unwrap();
match &lowered.tasks[0].action {
TaskAction::Infer { infer } => {
assert!(infer.prompt.is_empty(), "default infer has empty prompt");
}
_ => panic!("expected Infer action"),
}
}
#[test]
fn task_dep_names_rejects_dangling_task_id() {
let mut table = TaskTable::new();
table.insert("step1");
let dangling = TaskId(99);
let result = task_dep_names(&[dangling], &[], &table);
assert!(
result.is_err(),
"Dangling TaskId should be rejected in lowering"
);
}
#[test]
fn task_dep_names_valid_ids_resolve() {
let mut table = TaskTable::new();
let id_a = table.insert("a");
let id_b = table.insert("b");
let result = task_dep_names(&[id_a], &[id_b], &table);
let deps = result
.expect("valid TaskIds should resolve")
.expect("should be Some");
assert_eq!(deps, vec!["a".to_string(), "b".to_string()]);
}
#[test]
fn task_dep_names_empty_returns_none() {
let table = TaskTable::new();
let result = task_dep_names(&[], &[], &table);
assert_eq!(result.unwrap(), None);
}
#[test]
fn lower_rejects_dangling_task_id_in_deps() {
let mut wf = dummy_workflow();
let id_a = wf.task_table.insert("a");
let id_b = wf.task_table.insert("b");
wf.tasks.push(dummy_task(id_a, "a"));
wf.tasks.push(AnalyzedTask {
depends_on: vec![TaskId(99)], ..dummy_task(id_b, "b")
});
let result = lower(wf);
assert!(
result.is_err(),
"lower() should reject dangling TaskId in depends_on"
);
}
#[test]
fn unlower_rejects_dangling_dep_name() {
let mut wf = dummy_workflow();
let id = wf.task_table.insert("producer");
wf.tasks.push(dummy_task(id, "producer"));
let mut lowered = lower(wf).unwrap();
let task = Arc::make_mut(&mut lowered.tasks[0]);
task.depends_on = Some(vec!["nonexistent_task".to_string()]);
let result = unlower(lowered);
assert!(result.is_err(), "unlower should reject dangling dep name");
}
#[test]
fn unlower_retry_backoff_near_one() {
let mut wf = dummy_workflow();
let id = wf.task_table.insert("fetcher");
wf.tasks.push(AnalyzedTask {
action: AnalyzedTaskAction::Fetch(AnalyzedFetchAction {
url: "https://example.com".to_string(),
method: HttpMethod::Get,
headers: IndexMap::new(),
body: None,
json: None,
timeout_ms: None,
follow_redirects: true,
response: None,
extract: None,
selector: None,
span: Span::dummy(),
}),
retry: Some(AnalyzedRetry {
max_attempts: 3,
delay_ms: 1000,
backoff: None, span: Span::dummy(),
}),
..dummy_task(id, "fetcher")
});
let lowered = lower(wf).unwrap();
let unlowered = unlower(lowered).unwrap();
assert!(
unlowered.tasks[0].retry.as_ref().unwrap().backoff.is_none(),
"backoff of 1.0 should roundtrip as None"
);
}
#[test]
fn unlower_retry_backoff_near_one_within_tolerance() {
let mut wf = dummy_workflow();
let id = wf.task_table.insert("fetcher");
wf.tasks.push(AnalyzedTask {
action: AnalyzedTaskAction::Fetch(AnalyzedFetchAction {
url: "https://example.com".to_string(),
method: HttpMethod::Get,
headers: IndexMap::new(),
body: None,
json: None,
timeout_ms: None,
follow_redirects: true,
response: None,
extract: None,
selector: None,
span: Span::dummy(),
}),
retry: Some(AnalyzedRetry {
max_attempts: 3,
delay_ms: 1000,
backoff: Some(1.00005), span: Span::dummy(),
}),
..dummy_task(id, "fetcher")
});
let lowered = lower(wf).unwrap();
let unlowered = unlower(lowered).unwrap();
assert!(
unlowered.tasks[0].retry.as_ref().unwrap().backoff.is_none(),
"backoff within tolerance of 1.0 should roundtrip as None"
);
}
#[test]
fn unlower_retry_backoff_just_over_tolerance() {
let mut wf = dummy_workflow();
let id = wf.task_table.insert("fetcher");
wf.tasks.push(AnalyzedTask {
action: AnalyzedTaskAction::Fetch(AnalyzedFetchAction {
url: "https://example.com".to_string(),
method: HttpMethod::Get,
headers: IndexMap::new(),
body: None,
json: None,
timeout_ms: None,
follow_redirects: true,
response: None,
extract: None,
selector: None,
span: Span::dummy(),
}),
retry: Some(AnalyzedRetry {
max_attempts: 3,
delay_ms: 1000,
backoff: Some(1.00011),
span: Span::dummy(),
}),
..dummy_task(id, "fetcher")
});
let lowered = lower(wf).unwrap();
let unlowered = unlower(lowered).unwrap();
let backoff = unlowered.tasks[0].retry.as_ref().unwrap().backoff;
assert!(
backoff.is_some(),
"backoff of 1.00011 (just over tolerance) must NOT be collapsed to None"
);
assert!(
(backoff.unwrap() - 1.00011).abs() < f64::EPSILON * 100.0,
"backoff value should be preserved: got {}",
backoff.unwrap()
);
}
#[test]
fn unlower_retry_backoff_just_under_one_over_tolerance() {
let mut wf = dummy_workflow();
let id = wf.task_table.insert("fetcher");
wf.tasks.push(AnalyzedTask {
action: AnalyzedTaskAction::Fetch(AnalyzedFetchAction {
url: "https://example.com".to_string(),
method: HttpMethod::Get,
headers: IndexMap::new(),
body: None,
json: None,
timeout_ms: None,
follow_redirects: true,
response: None,
extract: None,
selector: None,
span: Span::dummy(),
}),
retry: Some(AnalyzedRetry {
max_attempts: 3,
delay_ms: 1000,
backoff: Some(0.99989),
span: Span::dummy(),
}),
..dummy_task(id, "fetcher")
});
let lowered = lower(wf).unwrap();
let unlowered = unlower(lowered).unwrap();
let backoff = unlowered.tasks[0].retry.as_ref().unwrap().backoff;
assert!(
backoff.is_some(),
"backoff of 0.99989 (just over tolerance on low side) must NOT be collapsed to None"
);
assert!(
(backoff.unwrap() - 0.99989).abs() < f64::EPSILON * 100.0,
"backoff value should be preserved: got {}",
backoff.unwrap()
);
}
#[test]
fn unlower_retry_backoff_exactly_at_tolerance_boundary() {
let mut wf = dummy_workflow();
let id = wf.task_table.insert("fetcher");
wf.tasks.push(AnalyzedTask {
action: AnalyzedTaskAction::Fetch(AnalyzedFetchAction {
url: "https://example.com".to_string(),
method: HttpMethod::Get,
headers: IndexMap::new(),
body: None,
json: None,
timeout_ms: None,
follow_redirects: true,
response: None,
extract: None,
selector: None,
span: Span::dummy(),
}),
retry: Some(AnalyzedRetry {
max_attempts: 3,
delay_ms: 1000,
backoff: Some(1.0001),
span: Span::dummy(),
}),
..dummy_task(id, "fetcher")
});
let lowered = lower(wf).unwrap();
let unlowered = unlower(lowered).unwrap();
let backoff = unlowered.tasks[0].retry.as_ref().unwrap().backoff;
assert!(
backoff.is_none(),
"backoff of exactly 1.0001 (at boundary, uses >) should be collapsed to None"
);
}
#[test]
fn unlower_thinking_budget_clamps_to_u32_max() {
let mut wf = dummy_workflow();
let id = wf.task_table.insert("thinker");
wf.tasks.push(AnalyzedTask {
action: AnalyzedTaskAction::Infer(AnalyzedInferAction {
prompt: "think hard".to_string(),
thinking: Some(true),
thinking_budget: Some(u32::MAX),
..Default::default()
}),
..dummy_task(id, "thinker")
});
let lowered = lower(wf).unwrap();
match &lowered.tasks[0].action {
TaskAction::Infer { infer } => {
assert_eq!(infer.thinking_budget, Some(u32::MAX as u64));
}
_ => panic!("expected Infer"),
}
let unlowered = unlower(lowered).unwrap();
match &unlowered.tasks[0].action {
AnalyzedTaskAction::Infer(infer) => {
assert_eq!(infer.thinking_budget, Some(u32::MAX));
}
_ => panic!("expected Infer"),
}
}
#[test]
fn unlower_valid_deps_resolve() {
let mut wf = dummy_workflow();
let id_a = wf.task_table.insert("step_a");
let id_b = wf.task_table.insert("step_b");
let mut task_b = dummy_task(id_b, "step_b");
task_b.depends_on = vec![id_a];
wf.tasks.push(dummy_task(id_a, "step_a"));
wf.tasks.push(task_b);
let lowered = lower(wf).unwrap();
let result = unlower(lowered);
assert!(
result.is_ok(),
"unlower should succeed with valid deps: {:?}",
result.err()
);
}
#[test]
fn lower_unlower_roundtrip_for_each() {
let mut wf = dummy_workflow();
let id = wf.task_table.insert("iter");
wf.tasks.push(AnalyzedTask {
for_each: Some(AnalyzedForEach {
items: r#"["a","b","c"]"#.to_string(),
as_var: "item".to_string(),
parallel: Some(2),
fail_fast: true,
span: Span::dummy(),
}),
..dummy_task(id, "iter")
});
let lowered = lower(wf).unwrap();
let unlowered = unlower(lowered).unwrap();
let t = &unlowered.tasks[0];
assert!(t.for_each.is_some(), "for_each should survive roundtrip");
let fe = t.for_each.as_ref().unwrap();
assert_eq!(fe.as_var, "item");
assert_eq!(fe.parallel, Some(2));
}
#[test]
fn lower_unlower_roundtrip_mcp_stdio() {
let mut wf = dummy_workflow();
let mut servers = IndexMap::new();
servers.insert(
"test".to_string(),
AnalyzedMcpServer {
name: "test".to_string(),
command: Some("node".to_string()),
args: vec!["server.js".to_string()],
env: IndexMap::new(),
cwd: None,
url: None,
transport: McpTransport::Stdio,
span: Span::dummy(),
},
);
wf.mcp_servers = servers;
let lowered = lower(wf).unwrap();
assert!(lowered.mcp.is_some(), "stdio server should be lowered");
let unlowered = unlower(lowered).unwrap();
assert_eq!(unlowered.mcp_servers.len(), 1);
assert!(unlowered.mcp_servers.contains_key("test"));
assert_eq!(
unlowered.mcp_servers["test"].command.as_deref(),
Some("node")
);
}
#[test]
fn lower_unlower_roundtrip_invoke_with_resource() {
let mut wf = dummy_workflow();
let id = wf.task_table.insert("read_res");
wf.tasks.push(AnalyzedTask {
action: AnalyzedTaskAction::Invoke(AnalyzedInvokeAction {
server: Some("novanet".to_string()),
tool: "".to_string(), params: None,
timeout_ms: Some(10000),
span: Span::dummy(),
}),
..dummy_task(id, "read_res")
});
let lowered = lower(wf).unwrap();
let unlowered = unlower(lowered).unwrap();
let t = &unlowered.tasks[0];
match &t.action {
AnalyzedTaskAction::Invoke(inv) => {
assert_eq!(inv.server.as_deref(), Some("novanet"));
}
_ => panic!("expected Invoke action after roundtrip"),
}
}
#[test]
fn roundtrip_provider_none_becomes_claude() {
let wf = AnalyzedWorkflow {
provider: None,
..dummy_workflow()
};
let lowered = lower(wf).unwrap();
assert_eq!(
lowered.provider, "claude",
"None provider should default to claude"
);
let unlowered = unlower(lowered).unwrap();
assert_eq!(
unlowered.provider,
Some("claude".to_string()),
"After roundtrip, provider is Some(\"claude\"), not None"
);
}
#[test]
fn roundtrip_markdown_output_becomes_text() {
use crate::ast::output::OutputFormat as RuntimeOutputFormat;
let mut wf = dummy_workflow();
let id = wf.task_table.insert("md_task");
wf.tasks.push(AnalyzedTask {
output: Some(AnalyzedOutput {
format: AnalyzedOutputFormat::Text,
schema: None,
schema_ref: None,
max_retries: None,
span: Span::dummy(),
}),
..dummy_task(id, "md_task")
});
let lowered = lower(wf).unwrap();
let lowered_clone = Workflow {
tasks: lowered
.tasks
.iter()
.map(|t| {
let mut task = (**t).clone();
if let Some(ref mut out) = task.output {
out.format = RuntimeOutputFormat::Markdown;
}
Arc::new(task)
})
.collect(),
..lowered
};
let unlowered = unlower(lowered_clone).unwrap();
let out = unlowered.tasks[0].output.as_ref().unwrap();
assert_eq!(
out.format,
AnalyzedOutputFormat::Text,
"Markdown should collapse to Text after unlower"
);
}
#[test]
fn roundtrip_implicit_deps_merge_into_depends_on() {
let mut wf = dummy_workflow();
let id_a = wf.task_table.insert("task_a");
let id_b = wf.task_table.insert("task_b");
wf.tasks.push(dummy_task(id_a, "task_a"));
wf.tasks.push(AnalyzedTask {
depends_on: vec![id_a],
implicit_deps: vec![id_a], ..dummy_task(id_b, "task_b")
});
let lowered = lower(wf).unwrap();
let deps = lowered.tasks[1].depends_on.as_ref().unwrap();
assert_eq!(deps.len(), 2, "Both deps should be merged into depends_on");
let unlowered = unlower(lowered).unwrap();
let rt_task = &unlowered.tasks[1];
assert!(
rt_task.implicit_deps.is_empty(),
"After roundtrip, implicit_deps should be empty (merged into depends_on)"
);
assert_eq!(rt_task.depends_on.len(), 2);
}
#[test]
fn roundtrip_context_files_preserved() {
let wf = AnalyzedWorkflow {
context_files: vec![
AnalyzedContextFile {
path: "README.md".to_string(),
alias: None, max_bytes: None,
span: Span::dummy(),
},
AnalyzedContextFile {
path: "schema.json".to_string(),
alias: Some("schema".to_string()),
max_bytes: Some(4096),
span: Span::dummy(),
},
],
..dummy_workflow()
};
let lowered = lower(wf).unwrap();
assert!(
lowered.context.is_some(),
"context should be preserved in lowered Workflow"
);
let ctx = lowered.context.as_ref().unwrap();
assert_eq!(
ctx.files.len(),
1,
"only aliased files survive the round-trip"
);
assert_eq!(ctx.files.get("schema"), Some(&"schema.json".to_string()));
let unlowered = unlower(lowered).unwrap();
assert_eq!(
unlowered.context_files.len(),
1,
"context_files with aliases should be restored"
);
assert_eq!(unlowered.context_files[0].alias.as_deref(), Some("schema"));
assert_eq!(unlowered.context_files[0].path, "schema.json");
}
#[test]
fn roundtrip_agent_from_field_is_lost() {
let mut wf = dummy_workflow();
let id = wf.task_table.insert("agent_task");
wf.tasks.push(AnalyzedTask {
action: AnalyzedTaskAction::Agent(AnalyzedAgentAction {
prompt: "do something".to_string(),
tools: vec![],
max_iterations: Some(5),
max_tokens: None,
from: Some("my_agent_def".to_string()),
skills: vec![],
mcp: vec![],
system: None,
temperature: None,
token_budget: None,
extended_thinking: None,
thinking_budget: None,
depth_limit: None,
tool_choice: None,
stop_sequences: vec![],
scope: None,
span: Span::dummy(),
}),
..dummy_task(id, "agent_task")
});
let lowered = lower(wf).unwrap();
let unlowered = unlower(lowered).unwrap();
match &unlowered.tasks[0].action {
AnalyzedTaskAction::Agent(agent) => {
assert_eq!(
agent.from, None,
"Agent `from` field should be None after roundtrip"
);
}
_ => panic!("expected Agent action after roundtrip"),
}
}
#[test]
fn roundtrip_sse_server_permanently_lost() {
let mut wf = dummy_workflow();
let mut servers = IndexMap::new();
servers.insert(
"sse_server".to_string(),
AnalyzedMcpServer {
name: "sse_server".to_string(),
command: None,
args: vec![],
env: IndexMap::new(),
cwd: None,
url: Some("https://example.com/mcp".to_string()),
transport: McpTransport::Sse,
span: Span::dummy(),
},
);
servers.insert(
"stdio_server".to_string(),
AnalyzedMcpServer {
name: "stdio_server".to_string(),
command: Some("npx server".to_string()),
args: vec!["--port".to_string(), "3000".to_string()],
env: IndexMap::new(),
cwd: None,
url: None,
transport: McpTransport::Stdio,
span: Span::dummy(),
},
);
wf.mcp_servers = servers;
let lowered = lower(wf).unwrap();
let mcp = lowered.mcp.as_ref().unwrap();
assert_eq!(mcp.len(), 1, "Only Stdio server should survive lowering");
assert!(mcp.contains_key("stdio_server"));
let unlowered = unlower(lowered).unwrap();
assert_eq!(
unlowered.mcp_servers.len(),
1,
"SSE server should be permanently lost after roundtrip"
);
assert!(unlowered.mcp_servers.contains_key("stdio_server"));
assert!(
!unlowered.mcp_servers.contains_key("sse_server"),
"SSE server should not exist after roundtrip"
);
}
#[test]
fn roundtrip_nan_multiplier_becomes_none() {
use crate::ast::action::{FetchParams, RetryConfig, TaskAction};
let action = TaskAction::Fetch {
fetch: FetchParams {
url: "https://example.com".to_string(),
method: "GET".to_string(),
headers: Default::default(),
body: None,
json: None,
timeout: None,
retry: Some(RetryConfig {
max_attempts: 3,
backoff_ms: 1000,
multiplier: f64::NAN,
}),
follow_redirects: None,
response: None,
extract: None,
selector: None,
},
};
let result = unlower_retry(&action);
assert!(result.is_some(), "Retry config should be preserved");
assert!(
result.unwrap().backoff.is_none(),
"NaN multiplier should become None, not be silently preserved"
);
}
#[test]
fn roundtrip_task_description_is_lost() {
let mut wf = dummy_workflow();
let id = wf.task_table.insert("described");
wf.tasks.push(AnalyzedTask {
description: Some("Important task description".to_string()),
..dummy_task(id, "described")
});
let lowered = lower(wf).unwrap();
let unlowered = unlower(lowered).unwrap();
assert!(
unlowered.tasks[0].description.is_none(),
"Task description should be lost after roundtrip"
);
}
#[test]
fn roundtrip_workflow_name_preserved_description_lost() {
let mut wf = dummy_workflow();
wf.name = Some("My workflow".to_string());
wf.description = Some("Does important things".to_string());
let lowered = lower(wf).unwrap();
let unlowered = unlower(lowered).unwrap();
assert_eq!(
unlowered.name,
Some("My workflow".to_string()),
"Workflow name should be preserved after roundtrip"
);
assert!(
unlowered.description.is_none(),
"Workflow description should be lost after roundtrip"
);
}
#[test]
fn bug7_schema_ref_threaded_through_pipeline() {
let mut wf = dummy_workflow();
let id = wf.task_table.insert("t");
wf.tasks.push(AnalyzedTask {
output: Some(AnalyzedOutput {
format: AnalyzedOutputFormat::Json,
schema: None,
schema_ref: Some("./schemas/result.json".to_string()),
max_retries: None,
span: Span::dummy(),
}),
..dummy_task(id, "t")
});
let lowered = lower(wf).unwrap();
let output = lowered.tasks[0]
.output
.as_ref()
.expect("output should exist");
match &output.schema {
Some(SchemaRef::File(path)) => {
assert_eq!(path, "./schemas/result.json");
}
other => panic!("expected SchemaRef::File, got {:?}", other),
}
}
#[test]
fn bug7_schema_ref_roundtrip() {
let mut wf = dummy_workflow();
let id = wf.task_table.insert("t");
wf.tasks.push(AnalyzedTask {
output: Some(AnalyzedOutput {
format: AnalyzedOutputFormat::Json,
schema: None,
schema_ref: Some("/absolute/schema.json".to_string()),
max_retries: None,
span: Span::dummy(),
}),
..dummy_task(id, "t")
});
let lowered = lower(wf).unwrap();
let unlowered = unlower(lowered).unwrap();
let output = unlowered.tasks[0]
.output
.as_ref()
.expect("output should exist");
assert_eq!(
output.schema_ref.as_deref(),
Some("/absolute/schema.json"),
"schema_ref should survive roundtrip"
);
assert!(
output.schema.is_none(),
"File schema should not appear as inline schema"
);
}
#[test]
fn bug8_schema_file_path_classified_correctly() {
let output = AnalyzedOutput {
format: AnalyzedOutputFormat::Json,
schema: Some(serde_json::Value::String("./schemas/user.json".to_string())),
schema_ref: None,
max_retries: None,
span: Span::dummy(),
};
let lowered = lower_output(output);
assert!(
matches!(lowered.schema, Some(SchemaRef::File(ref p)) if p == "./schemas/user.json"),
"Schema starting with ./ should be File, got {:?}",
lowered.schema
);
let output = AnalyzedOutput {
format: AnalyzedOutputFormat::Json,
schema: Some(serde_json::Value::String("/etc/schema.json".to_string())),
schema_ref: None,
max_retries: None,
span: Span::dummy(),
};
let lowered = lower_output(output);
assert!(
matches!(lowered.schema, Some(SchemaRef::File(ref p)) if p == "/etc/schema.json"),
"Schema starting with / should be File"
);
let output = AnalyzedOutput {
format: AnalyzedOutputFormat::Json,
schema: Some(serde_json::Value::String("schemas/result.json".to_string())),
schema_ref: None,
max_retries: None,
span: Span::dummy(),
};
let lowered = lower_output(output);
assert!(
matches!(lowered.schema, Some(SchemaRef::File(ref p)) if p == "schemas/result.json"),
"Schema ending with .json should be File"
);
let output = AnalyzedOutput {
format: AnalyzedOutputFormat::Json,
schema: Some(serde_json::json!({"type": "object"})),
schema_ref: None,
max_retries: None,
span: Span::dummy(),
};
let lowered = lower_output(output);
assert!(
matches!(lowered.schema, Some(SchemaRef::Inline(_))),
"JSON object schema should remain Inline"
);
let output = AnalyzedOutput {
format: AnalyzedOutputFormat::Json,
schema: Some(serde_json::Value::String("just-a-string".to_string())),
schema_ref: None,
max_retries: None,
span: Span::dummy(),
};
let lowered = lower_output(output);
assert!(
matches!(lowered.schema, Some(SchemaRef::Inline(_))),
"Non-path string schema should remain Inline"
);
}
#[test]
fn bug42_output_max_retries_threaded() {
let mut wf = dummy_workflow();
let id = wf.task_table.insert("t");
wf.tasks.push(AnalyzedTask {
output: Some(AnalyzedOutput {
format: AnalyzedOutputFormat::Json,
schema: Some(serde_json::json!({"type": "object"})),
schema_ref: None,
max_retries: Some(5),
span: Span::dummy(),
}),
..dummy_task(id, "t")
});
let lowered = lower(wf).unwrap();
let output = lowered.tasks[0]
.output
.as_ref()
.expect("output should exist");
assert_eq!(
output.max_retries,
Some(5),
"max_retries should survive lowering"
);
}
#[test]
fn bug42_output_max_retries_roundtrip() {
let mut wf = dummy_workflow();
let id = wf.task_table.insert("t");
wf.tasks.push(AnalyzedTask {
output: Some(AnalyzedOutput {
format: AnalyzedOutputFormat::Json,
schema: Some(serde_json::json!({"type": "object"})),
schema_ref: None,
max_retries: Some(3),
span: Span::dummy(),
}),
..dummy_task(id, "t")
});
let lowered = lower(wf).unwrap();
let unlowered = unlower(lowered).unwrap();
let output = unlowered.tasks[0]
.output
.as_ref()
.expect("output should exist");
assert_eq!(
output.max_retries,
Some(3),
"max_retries should survive roundtrip"
);
}
#[test]
fn bug43_response_format_threaded() {
let mut wf = dummy_workflow();
let id = wf.task_table.insert("t");
wf.tasks.push(AnalyzedTask {
action: AnalyzedTaskAction::Infer(AnalyzedInferAction {
prompt: "test".to_string(),
response_format: Some("json".to_string()),
..Default::default()
}),
..dummy_task(id, "t")
});
let lowered = lower(wf).unwrap();
match &lowered.tasks[0].action {
TaskAction::Infer { infer } => {
assert_eq!(
infer.response_format,
Some(crate::ast::action::ResponseFormat::Json),
"response_format should survive lowering"
);
}
_ => panic!("expected Infer action"),
}
}
#[test]
fn bug43_response_format_roundtrip() {
let mut wf = dummy_workflow();
let id = wf.task_table.insert("t");
wf.tasks.push(AnalyzedTask {
action: AnalyzedTaskAction::Infer(AnalyzedInferAction {
prompt: "test".to_string(),
response_format: Some("markdown".to_string()),
..Default::default()
}),
..dummy_task(id, "t")
});
let lowered = lower(wf).unwrap();
let unlowered = unlower(lowered).unwrap();
match &unlowered.tasks[0].action {
AnalyzedTaskAction::Infer(infer) => {
assert_eq!(
infer.response_format.as_deref(),
Some("markdown"),
"response_format should survive roundtrip"
);
}
_ => panic!("expected Infer action"),
}
}
#[test]
fn bug44_timeout_ceiling_division() {
let mut wf = dummy_workflow();
let id = wf.task_table.insert("t");
wf.tasks.push(AnalyzedTask {
action: AnalyzedTaskAction::Exec(AnalyzedExecAction {
command: "echo hi".to_string(),
shell: false,
working_dir: None,
env: IndexMap::new(),
timeout_ms: Some(500),
span: Span::dummy(),
}),
..dummy_task(id, "t")
});
let lowered = lower(wf).unwrap();
match &lowered.tasks[0].action {
TaskAction::Exec { exec: e } => {
assert_eq!(
e.timeout,
Some(1),
"500ms should ceil to 1s, not truncate to 0s"
);
}
_ => panic!("expected Exec action"),
}
}
#[test]
fn bug44_timeout_exact_seconds_unchanged() {
let mut wf = dummy_workflow();
let id = wf.task_table.insert("t");
wf.tasks.push(AnalyzedTask {
action: AnalyzedTaskAction::Exec(AnalyzedExecAction {
command: "echo hi".to_string(),
shell: false,
working_dir: None,
env: IndexMap::new(),
timeout_ms: Some(1000),
span: Span::dummy(),
}),
..dummy_task(id, "t")
});
let lowered = lower(wf).unwrap();
match &lowered.tasks[0].action {
TaskAction::Exec { exec: e } => {
assert_eq!(e.timeout, Some(1), "1000ms should remain 1s");
}
_ => panic!("expected Exec action"),
}
}
#[test]
fn bug44_timeout_fetch_ceiling() {
let mut wf = dummy_workflow();
let id = wf.task_table.insert("t");
wf.tasks.push(AnalyzedTask {
action: AnalyzedTaskAction::Fetch(AnalyzedFetchAction {
url: "https://example.com".to_string(),
method: HttpMethod::Get,
headers: IndexMap::new(),
body: None,
json: None,
timeout_ms: Some(1500),
follow_redirects: true,
response: None,
extract: None,
selector: None,
span: Span::dummy(),
}),
..dummy_task(id, "t")
});
let lowered = lower(wf).unwrap();
match &lowered.tasks[0].action {
TaskAction::Fetch { fetch } => {
assert_eq!(
fetch.timeout,
Some(2),
"1500ms should ceil to 2s, not truncate to 1s"
);
}
_ => panic!("expected Fetch action"),
}
}
#[test]
fn bug44_timeout_invoke_ceiling() {
let mut wf = dummy_workflow();
let id = wf.task_table.insert("t");
wf.tasks.push(AnalyzedTask {
action: AnalyzedTaskAction::Invoke(AnalyzedInvokeAction {
server: Some("test".to_string()),
tool: "tool".to_string(),
params: None,
timeout_ms: Some(100),
span: Span::dummy(),
}),
..dummy_task(id, "t")
});
let lowered = lower(wf).unwrap();
match &lowered.tasks[0].action {
TaskAction::Invoke { invoke } => {
assert_eq!(
invoke.timeout,
Some(1),
"100ms should ceil to 1s, not truncate to 0s"
);
}
_ => panic!("expected Invoke action"),
}
}
#[test]
fn lower_agent_valid_tool_choice_values() {
for (input, expected_variant) in [
("auto", "Auto"),
("required", "Required"),
("none", "None"),
("AUTO", "Auto"), ("Required", "Required"),
] {
let agent = AnalyzedAgentAction {
prompt: "test".to_string(),
tools: vec![],
max_iterations: None,
max_tokens: None,
from: None,
skills: vec![],
mcp: vec![],
system: None,
temperature: None,
token_budget: None,
extended_thinking: None,
thinking_budget: None,
depth_limit: None,
tool_choice: Some(input.to_string()),
stop_sequences: vec![],
scope: None,
span: Span::dummy(),
};
let params = lower_agent(agent, None, None);
assert!(
params.tool_choice.is_some(),
"valid tool_choice '{}' should produce Some({})",
input,
expected_variant
);
}
}
#[test]
fn lower_agent_invalid_tool_choice_maps_to_none() {
for invalid in ["foo", "always", "any", ""] {
let agent = AnalyzedAgentAction {
prompt: "test".to_string(),
tools: vec![],
max_iterations: None,
max_tokens: None,
from: None,
skills: vec![],
mcp: vec![],
system: None,
temperature: None,
token_budget: None,
extended_thinking: None,
thinking_budget: None,
depth_limit: None,
tool_choice: Some(invalid.to_string()),
stop_sequences: vec![],
scope: None,
span: Span::dummy(),
};
let params = lower_agent(agent, None, None);
assert!(
params.tool_choice.is_none(),
"invalid tool_choice '{}' should map to None, got {:?}",
invalid,
params.tool_choice
);
}
}
#[test]
fn lower_infer_guardrails_preserved() {
use crate::ast::guardrails::{GuardrailConfig, LengthGuardrail, OnFailure};
let mut wf = dummy_workflow();
let id = wf.task_table.insert("guarded");
wf.tasks.push(AnalyzedTask {
action: AnalyzedTaskAction::Infer(AnalyzedInferAction {
prompt: "Summarize".to_string(),
guardrails: vec![GuardrailConfig::Length(LengthGuardrail {
id: Some("word_count".to_string()),
min_words: Some(10),
max_words: Some(200),
min_chars: None,
max_chars: None,
message: None,
on_failure: OnFailure::Fail,
})],
..Default::default()
}),
..dummy_task(id, "guarded")
});
let lowered = lower(wf).unwrap();
match &lowered.tasks[0].action {
TaskAction::Infer { infer } => {
assert_eq!(infer.guardrails.len(), 1);
assert_eq!(infer.guardrails[0].guardrail_type(), "length");
assert_eq!(infer.guardrails[0].on_failure(), OnFailure::Fail);
}
_ => panic!("expected Infer action"),
}
}
#[test]
fn unlower_infer_guardrails_roundtrip() {
use crate::ast::guardrails::{GuardrailConfig, LengthGuardrail, OnFailure};
let regex_guardrail: GuardrailConfig = serde_json::from_value(serde_json::json!({
"type": "regex",
"id": "starts_with_summary",
"pattern": "^Summary:",
"message": "Must start with Summary:",
"on_failure": "fail"
}))
.expect("valid regex guardrail JSON");
let mut wf = dummy_workflow();
let id = wf.task_table.insert("validated");
wf.tasks.push(AnalyzedTask {
action: AnalyzedTaskAction::Infer(AnalyzedInferAction {
prompt: "Write a summary".to_string(),
guardrails: vec![
GuardrailConfig::Length(LengthGuardrail {
id: None,
min_words: Some(50),
max_words: None,
min_chars: None,
max_chars: None,
message: None,
on_failure: OnFailure::default(),
}),
regex_guardrail,
],
..Default::default()
}),
..dummy_task(id, "validated")
});
let lowered = lower(wf).unwrap();
let unlowered = unlower(lowered).unwrap();
match &unlowered.tasks[0].action {
AnalyzedTaskAction::Infer(infer) => {
assert_eq!(infer.guardrails.len(), 2);
assert_eq!(infer.guardrails[0].guardrail_type(), "length");
assert_eq!(infer.guardrails[1].guardrail_type(), "regex");
assert_eq!(infer.guardrails[1].on_failure(), OnFailure::Fail);
}
_ => panic!("expected Infer action after roundtrip"),
}
}
}