use std::collections::{HashMap, HashSet};
use serde_json::Value;
use super::*;
pub(crate) fn validate_json_schema(schema: &Value) -> Result<(), String> {
jsonschema::JSONSchema::compile(schema)
.map(|_| ())
.map_err(|error| format!("invalid JSON schema: {error}"))
}
pub(crate) fn validate_schema_instance(schema: &Value, instance: &Value) -> Result<(), String> {
let validator = jsonschema::JSONSchema::compile(schema)
.map_err(|error| format!("invalid JSON schema: {error}"))?;
if let Err(errors) = validator.validate(instance) {
let message = errors
.into_iter()
.next()
.map(|error| error.to_string())
.unwrap_or_else(|| "unknown schema validation error".to_string());
return Err(format!("schema validation failed: {message}"));
}
Ok(())
}
pub(crate) fn schema_type(schema: &Value) -> Option<&str> {
schema.get("type").and_then(Value::as_str)
}
pub(crate) fn schema_expects_object(schema: &Value) -> bool {
schema_type(schema) == Some("object")
}
pub fn verify_yaml_workflow(workflow: &YamlWorkflow) -> Vec<YamlWorkflowDiagnostic> {
let mut diagnostics = Vec::new();
let known_ids: HashMap<&str, &YamlNode> = workflow
.nodes
.iter()
.map(|node| (node.id.as_str(), node))
.collect();
if !known_ids.contains_key(workflow.entry_node.as_str()) {
diagnostics.push(YamlWorkflowDiagnostic {
node_id: None,
code: "missing_entry".to_string(),
severity: YamlWorkflowDiagnosticSeverity::Error,
message: format!("entry node '{}' does not exist", workflow.entry_node),
});
}
let mut seen_edge_sources = HashSet::new();
for edge in &workflow.edges {
if !seen_edge_sources.insert(edge.from.as_str()) {
diagnostics.push(YamlWorkflowDiagnostic {
node_id: Some(edge.from.clone()),
code: "duplicate_edge_from".to_string(),
severity: YamlWorkflowDiagnosticSeverity::Error,
message: format!(
"multiple outgoing edges from '{}' are not supported; use a switch node for branching",
edge.from
),
});
}
if !known_ids.contains_key(edge.from.as_str()) {
diagnostics.push(YamlWorkflowDiagnostic {
node_id: Some(edge.from.clone()),
code: "unknown_edge_from".to_string(),
severity: YamlWorkflowDiagnosticSeverity::Error,
message: format!("edge.from '{}' does not exist", edge.from),
});
}
if !known_ids.contains_key(edge.to.as_str()) {
diagnostics.push(YamlWorkflowDiagnostic {
node_id: Some(edge.to.clone()),
code: "unknown_edge_to".to_string(),
severity: YamlWorkflowDiagnosticSeverity::Error,
message: format!("edge.to '{}' does not exist", edge.to),
});
}
}
for node in &workflow.nodes {
let mut active_types: Vec<&str> = Vec::new();
if node.node_type.llm_call.is_some() {
active_types.push("llm_call");
}
if node.node_type.switch.is_some() {
active_types.push("switch");
}
if node.node_type.custom_worker.is_some() {
active_types.push("custom_worker");
}
if node.node_type.human_input.is_some() {
active_types.push("human_input");
}
if node.node_type.end.is_some() {
active_types.push("end");
}
if active_types.is_empty() {
diagnostics.push(YamlWorkflowDiagnostic {
node_id: Some(node.id.clone()),
code: "missing_node_type".to_string(),
severity: YamlWorkflowDiagnosticSeverity::Error,
message: format!(
"node '{}' has no node_type variant set; expected exactly one of: llm_call, switch, custom_worker, human_input, end",
node.id
),
});
} else if active_types.len() > 1 {
diagnostics.push(YamlWorkflowDiagnostic {
node_id: Some(node.id.clone()),
code: "ambiguous_node_type".to_string(),
severity: YamlWorkflowDiagnosticSeverity::Error,
message: format!(
"node '{}' has {} node_type variants set ({}); expected exactly one",
node.id,
active_types.len(),
active_types.join(", ")
),
});
}
if let Some(llm) = &node.node_type.llm_call {
let user_input_prompt = node
.config
.as_ref()
.and_then(|cfg| cfg.user_input_prompt.as_deref())
.map(str::trim)
.filter(|value| !value.is_empty());
if llm.messages_path.is_none() && user_input_prompt.is_none() {
diagnostics.push(YamlWorkflowDiagnostic {
node_id: Some(node.id.clone()),
code: "missing_llm_input_source".to_string(),
severity: YamlWorkflowDiagnosticSeverity::Error,
message:
"llm_call requires at least one input source: node_type.llm_call.messages_path or config.user_input_prompt"
.to_string(),
});
}
if llm.model.trim().is_empty() {
diagnostics.push(YamlWorkflowDiagnostic {
node_id: Some(node.id.clone()),
code: "empty_model".to_string(),
severity: YamlWorkflowDiagnosticSeverity::Error,
message: "llm_call.model must not be empty".to_string(),
});
}
if llm.stream.unwrap_or(false) && llm.heal.unwrap_or(false) {
diagnostics.push(YamlWorkflowDiagnostic {
node_id: Some(node.id.clone()),
code: "stream_heal_conflict".to_string(),
severity: YamlWorkflowDiagnosticSeverity::Warning,
message:
"llm_call.stream=true with heal=true is not streamable; runtime will disable streaming"
.to_string(),
});
}
if llm.max_tool_roundtrips.unwrap_or(1) == 0 {
diagnostics.push(YamlWorkflowDiagnostic {
node_id: Some(node.id.clone()),
code: "invalid_max_tool_roundtrips".to_string(),
severity: YamlWorkflowDiagnosticSeverity::Error,
message: "llm_call.max_tool_roundtrips must be >= 1".to_string(),
});
}
if let Some(global_key) = llm.tool_calls_global_key.as_ref() {
if global_key.trim().is_empty() {
diagnostics.push(YamlWorkflowDiagnostic {
node_id: Some(node.id.clone()),
code: "empty_tool_calls_global_key".to_string(),
severity: YamlWorkflowDiagnosticSeverity::Error,
message: "llm_call.tool_calls_global_key must not be empty".to_string(),
});
}
}
match normalize_tool_choice(llm.tool_choice.clone()) {
Ok(choice) => {
if let Some(ToolChoice::Tool(choice_tool)) = choice.as_ref() {
if !llm.tools.iter().any(|tool| match (llm.tools_format, tool) {
(YamlToolFormat::Openai, YamlToolDeclaration::OpenAi(openai)) => {
openai.function.name == choice_tool.function.name
}
(
YamlToolFormat::Simplified,
YamlToolDeclaration::Simplified(simple),
) => simple.name == choice_tool.function.name,
_ => false,
}) {
diagnostics.push(YamlWorkflowDiagnostic {
node_id: Some(node.id.clone()),
code: "unknown_tool_choice_function".to_string(),
severity: YamlWorkflowDiagnosticSeverity::Error,
message: format!(
"llm_call.tool_choice references unknown function '{}'",
choice_tool.function.name
),
});
}
}
}
Err(message) => {
diagnostics.push(YamlWorkflowDiagnostic {
node_id: Some(node.id.clone()),
code: "invalid_tool_choice".to_string(),
severity: YamlWorkflowDiagnosticSeverity::Error,
message,
});
}
}
let normalized_tools = match normalize_llm_tools(llm) {
Ok(tools) => tools,
Err(message) => {
diagnostics.push(YamlWorkflowDiagnostic {
node_id: Some(node.id.clone()),
code: "invalid_tools_format".to_string(),
severity: YamlWorkflowDiagnosticSeverity::Error,
message,
});
Vec::new()
}
};
let mut seen_tool_names = HashSet::new();
for tool in &normalized_tools {
let name = tool.definition.function.name.trim();
if name.is_empty() {
diagnostics.push(YamlWorkflowDiagnostic {
node_id: Some(node.id.clone()),
code: "empty_tool_name".to_string(),
severity: YamlWorkflowDiagnosticSeverity::Error,
message: "tool function name must not be empty".to_string(),
});
}
if !seen_tool_names.insert(tool.definition.function.name.clone()) {
diagnostics.push(YamlWorkflowDiagnostic {
node_id: Some(node.id.clone()),
code: "duplicate_tool_name".to_string(),
severity: YamlWorkflowDiagnosticSeverity::Error,
message: format!(
"duplicate tool function name '{}' in node",
tool.definition.function.name
),
});
}
let schema = tool
.definition
.function
.parameters
.clone()
.unwrap_or(Value::Null);
if schema.is_null() {
diagnostics.push(YamlWorkflowDiagnostic {
node_id: Some(node.id.clone()),
code: "missing_tool_input_schema".to_string(),
severity: YamlWorkflowDiagnosticSeverity::Error,
message: format!(
"tool '{}' is missing input schema",
tool.definition.function.name
),
});
} else if let Err(message) = validate_json_schema(&schema) {
diagnostics.push(YamlWorkflowDiagnostic {
node_id: Some(node.id.clone()),
code: "invalid_tool_input_schema".to_string(),
severity: YamlWorkflowDiagnosticSeverity::Error,
message: format!(
"tool '{}' has invalid input schema: {}",
tool.definition.function.name, message
),
});
}
if let Some(output_schema) = tool.output_schema.as_ref() {
if let Err(message) = validate_json_schema(output_schema) {
diagnostics.push(YamlWorkflowDiagnostic {
node_id: Some(node.id.clone()),
code: "invalid_tool_output_schema".to_string(),
severity: YamlWorkflowDiagnosticSeverity::Error,
message: format!(
"tool '{}' has invalid output schema: {}",
tool.definition.function.name, message
),
});
}
}
}
}
if let Some(switch) = &node.node_type.switch {
for branch in &switch.branches {
if !known_ids.contains_key(branch.target.as_str()) {
diagnostics.push(YamlWorkflowDiagnostic {
node_id: Some(node.id.clone()),
code: "unknown_switch_target".to_string(),
severity: YamlWorkflowDiagnosticSeverity::Error,
message: format!("switch branch target '{}' does not exist", branch.target),
});
}
}
if !known_ids.contains_key(switch.default.as_str()) {
diagnostics.push(YamlWorkflowDiagnostic {
node_id: Some(node.id.clone()),
code: "unknown_switch_default".to_string(),
severity: YamlWorkflowDiagnosticSeverity::Error,
message: format!("switch default target '{}' does not exist", switch.default),
});
}
}
if let Some(human) = &node.node_type.human_input {
match human.input_type {
YamlHumanInputType::Choice => {
let options = human.options.as_ref();
if options.is_none() || options.is_some_and(|items| items.is_empty()) {
diagnostics.push(YamlWorkflowDiagnostic {
node_id: Some(node.id.clone()),
code: "missing_human_choice_options".to_string(),
severity: YamlWorkflowDiagnosticSeverity::Error,
message:
"human_input.input_type=choice requires non-empty options list"
.to_string(),
});
}
}
YamlHumanInputType::Text => {
if human
.options
.as_ref()
.is_some_and(|items| !items.is_empty())
{
diagnostics.push(YamlWorkflowDiagnostic {
node_id: Some(node.id.clone()),
code: "human_text_options_ignored".to_string(),
severity: YamlWorkflowDiagnosticSeverity::Warning,
message: "human_input.options are ignored for input_type=text"
.to_string(),
});
}
}
YamlHumanInputType::Form => {
let Some(form_schema) = human.form_schema.as_ref() else {
diagnostics.push(YamlWorkflowDiagnostic {
node_id: Some(node.id.clone()),
code: "missing_human_form_schema".to_string(),
severity: YamlWorkflowDiagnosticSeverity::Error,
message: "human_input.input_type=form requires form_schema".to_string(),
});
continue;
};
if let Err(message) = validate_json_schema(form_schema) {
diagnostics.push(YamlWorkflowDiagnostic {
node_id: Some(node.id.clone()),
code: "invalid_human_form_schema".to_string(),
severity: YamlWorkflowDiagnosticSeverity::Error,
message,
});
}
if !schema_expects_object(form_schema) {
diagnostics.push(YamlWorkflowDiagnostic {
node_id: Some(node.id.clone()),
code: "human_form_schema_not_object".to_string(),
severity: YamlWorkflowDiagnosticSeverity::Warning,
message:
"human_input.form_schema should use type=object for editable form data"
.to_string(),
});
}
}
}
}
if let Some(config) = node.config.as_ref() {
if let Some(update_globals) = config.update_globals.as_ref() {
for (key, update) in update_globals {
if update.op.as_str() != "increment" && update.from.is_none() {
diagnostics.push(YamlWorkflowDiagnostic {
node_id: Some(node.id.clone()),
code: "missing_update_from".to_string(),
severity: YamlWorkflowDiagnosticSeverity::Error,
message: format!(
"update_globals key '{}' with op '{}' requires 'from'",
key,
update.op.as_str()
),
});
}
}
}
}
}
diagnostics
}