// @harn-entrypoint-category llm.stdlib
//
// std/llm/structural_validator - deterministic pre-dispatch turn checks.
//
// Opt in via `agent_loop(..., {structural_validator: with_structural_validator({...})})`.
// The validator intercepts the agent loop's internal
// `__structural_validator_turn__` pre-dispatch probe, emits a
// `structural_validator_decision` event, and either regenerates with
// feedback or raises.
import { agent_emit_event } from "std/agent/state"
fn __sv_tool_name() -> string {
return "__structural_validator_turn__"
}
fn __sv_dict(value) -> dict {
if type_of(value) == "dict" {
return value
}
return {}
}
fn __sv_list(value) -> list {
if type_of(value) == "list" {
return value
}
return []
}
fn __sv_string(value) -> string {
if value == nil {
return ""
}
return to_string(value)
}
fn __sv_positive_int(value, default_value, name) -> int {
if value == nil {
return default_value
}
let parsed = to_int(value)
if parsed == nil {
throw "with_structural_validator: `" + name + "` must be an integer; got " + type_of(value)
}
if parsed <= 0 {
throw "with_structural_validator: `" + name + "` must be > 0; got " + to_string(parsed)
}
return parsed
}
fn __sv_bool(value, default_value, name) -> bool {
if value == nil {
return default_value
}
if type_of(value) != "bool" {
throw "with_structural_validator: `" + name + "` must be bool; got " + type_of(value)
}
return value
}
fn __sv_string_list(value) -> list {
var out = []
for item in __sv_list(value) {
let text = trim(__sv_string(item))
if text != "" {
out = out.push(text)
}
}
return out
}
fn __sv_default_no_phantom_completion_catalog() {
return {
en: [
"i fixed",
"fixed the",
"updated the",
"implemented the",
"i implemented",
"i completed",
"completed the",
"successfully",
"all set",
"done",
],
}
}
fn __sv_merge_catalog_locale(base, override_items) {
var merged = []
for item in __sv_string_list(base) + __sv_string_list(override_items) {
let normalized = lowercase(item)
if normalized != "" && !contains(merged, normalized) {
merged = merged.push(normalized)
}
}
return merged
}
fn __sv_normalize_no_phantom_completion_catalog(value) {
let defaults = __sv_default_no_phantom_completion_catalog()
if value == nil {
return defaults
}
let raw = __sv_dict(value)
if len(raw.keys()) == 0 && type_of(value) != "dict" {
throw "with_structural_validator: `no_phantom_completion_catalog` must be a dict"
}
var out = defaults
for locale in raw.keys() {
out = out + {[locale]: __sv_merge_catalog_locale(out?[locale] ?? [], raw[locale])}
}
return out
}
fn __sv_locale(value) -> string {
let locale = trim(__sv_string(value))
if locale == "" {
return "en"
}
return locale
}
fn __sv_rule_names() {
return [
"non_empty_when_writes_expected",
"no_phantom_completion",
"tool_calls_well_formed",
"output_token_cap_with_zero_calls",
]
}
fn __sv_normalize_rule_entry(value) {
if type_of(value) == "string" {
let name = trim(__sv_string(value))
if name == "" {
throw "with_structural_validator: rule names must be non-empty strings"
}
return {name: name, warn_only: false}
}
let entry = __sv_dict(value)
let name = trim(__sv_string(entry?.name ?? entry?.rule))
if name == "" {
throw "with_structural_validator: rule entries need `name`"
}
return {name: name, warn_only: __sv_bool(entry?.warn_only, false, "rules[].warn_only")}
}
fn __sv_normalize_rules(value) {
if value == nil {
return [__sv_normalize_rule_entry("non_empty_when_writes_expected")]
}
let raw = __sv_list(value)
if len(raw) == 0 {
return [__sv_normalize_rule_entry("non_empty_when_writes_expected")]
}
var rules = []
for item in raw {
let entry = __sv_normalize_rule_entry(item)
let name = entry.name
if !contains(rules.map({ rule -> rule.name }), name) {
rules = rules.push(entry)
}
}
if len(rules) == 0 {
return [__sv_normalize_rule_entry("non_empty_when_writes_expected")]
}
return rules
}
fn __sv_validate_opts(opts) {
let cfg = __sv_dict(opts)
let on_failure = __sv_string(cfg?.on_failure ?? "regenerate_with_feedback")
if on_failure != "regenerate_with_feedback" && on_failure != "raise" {
throw "with_structural_validator: `on_failure` must be regenerate_with_feedback|raise; got "
+ on_failure
}
let rules = __sv_normalize_rules(cfg?.rules)
for rule in rules {
if !contains(__sv_rule_names(), rule.name) {
throw "with_structural_validator: unknown rule `" + rule.name + "`"
}
}
return {
on_failure: on_failure,
max_attempts: __sv_positive_int(cfg?.max_attempts, 3, "max_attempts"),
rules: rules,
locale: __sv_locale(cfg?.locale),
no_phantom_completion_catalog: __sv_normalize_no_phantom_completion_catalog(cfg?.no_phantom_completion_catalog),
}
}
fn __sv_tool_annotations(entry) {
let direct = entry?.annotations
if type_of(direct) == "dict" {
return direct
}
let func = entry?.function
if type_of(func) == "dict" && type_of(func?.annotations) == "dict" {
return func.annotations
}
return {}
}
fn __sv_annotation_enabled(value) -> bool {
return type_of(value) == "bool" && value
}
fn __sv_tool_entry_is_structural(entry) -> bool {
let annotations = __sv_tool_annotations(entry)
return __sv_annotation_enabled(annotations?.structural)
|| __sv_annotation_enabled(annotations?.agent_lifecycle)
}
fn __sv_tool_entry_has_write_capability(entry) -> bool {
if __sv_tool_entry_is_structural(entry) {
return false
}
let annotations = __sv_tool_annotations(entry)
let side_effect_level = lowercase(__sv_string(annotations?.side_effect_level ?? annotations?.sideEffectLevel))
if side_effect_level == "none" || side_effect_level == "read_only" {
return false
}
let kind = lowercase(__sv_string(annotations?.kind))
if contains(["read", "search", "think", "fetch"], kind) {
return false
}
return true
}
fn __sv_workspace_has_write_capability(payload) -> bool {
let policy = __sv_dict(payload?.policy)
let ceiling = lowercase(__sv_string(policy?.side_effect_level))
if ceiling == "none" || ceiling == "read_only" {
return false
}
let tools = __sv_dict(payload?.tools)
let entries = __sv_list(tools?.tools)
for entry in entries {
if type_of(entry) == "dict" && __sv_tool_entry_has_write_capability(entry) {
return true
}
}
return false
}
fn __sv_has_non_structural_tools(payload) -> bool {
let tools = __sv_dict(payload?.tools)
for entry in __sv_list(tools?.tools) {
if type_of(entry) == "dict" && !__sv_tool_entry_is_structural(entry) {
return true
}
}
return false
}
fn __sv_done_marker_present(payload) -> bool {
return trim(__sv_string(payload?.parsed_done_marker)) != ""
}
fn __sv_registry_tool_entry(registry, name) {
let tools = __sv_dict(registry)
let entries = __sv_list(tools?.tools)
for entry in entries {
let direct_name = __sv_string(entry?.name)
let function = entry?.function
let function_name = if type_of(function) == "dict" {
__sv_string(function?.name)
} else {
""
}
if direct_name == name || function_name == name {
return entry
}
}
return nil
}
fn __sv_any_prior_write_tools(payload) -> bool {
let tools = payload?.tools
let prior = __sv_string_list(payload?.prior_successful_tools)
+ __sv_string_list(payload?.prior_rejected_tools)
for name in prior {
let entry = __sv_registry_tool_entry(tools, name)
if entry != nil && __sv_tool_entry_has_write_capability(entry) {
return true
}
}
return false
}
fn __sv_no_phantom_completion_phrases(cfg, rule_cfg) {
let locale = __sv_locale(rule_cfg?.locale ?? cfg?.locale)
let catalog = __sv_dict(cfg?.no_phantom_completion_catalog)
let phrases = __sv_string_list(catalog?[locale] ?? catalog?.en ?? [])
return __sv_merge_catalog_locale([], phrases)
}
fn __sv_claims_completion(payload, cfg, rule_cfg) -> bool {
let text = lowercase(
trim(__sv_string(payload?.assistant_text ?? payload?.raw_text ?? payload?.visible_text ?? "")),
)
if text == "" {
return false
}
for phrase in __sv_no_phantom_completion_phrases(cfg, rule_cfg) {
if phrase != "" && text.contains(phrase) {
return true
}
}
return false
}
fn __sv_non_empty_when_writes_expected(payload) {
if !__sv_workspace_has_write_capability(payload) {
return nil
}
if len(__sv_list(payload?.tool_calls)) > 0 {
return nil
}
if __sv_any_prior_write_tools(payload) {
return nil
}
if __sv_done_marker_present(payload) {
return nil
}
return {
rule: "non_empty_when_writes_expected",
diagnostic: "Assistant emitted no tool calls while writable tools were available.",
recommended_action: "Emit the concrete write or edit tool call needed for the task, or only mark the task done after that work is complete.",
}
}
fn __sv_no_phantom_completion(payload, cfg, rule_cfg) {
if !__sv_workspace_has_write_capability(payload) {
return nil
}
if len(__sv_list(payload?.tool_calls)) > 0 {
return nil
}
if __sv_any_prior_write_tools(payload) {
return nil
}
if !__sv_claims_completion(payload, cfg, rule_cfg) {
return nil
}
return {
rule: "no_phantom_completion",
diagnostic: "Assistant claimed completion before any write-capable tool call occurred in this session.",
recommended_action: "Call the concrete write or edit tool that performs the work before claiming the task is complete.",
}
}
fn __sv_join_messages(values) -> string {
return join(__sv_string_list(values), "; ")
}
fn __sv_tool_call_name(call) -> string {
return trim(__sv_string(call?.name ?? call?.tool_name))
}
fn __sv_tool_call_arguments(call) {
let raw = call?.arguments ?? call?.tool_args ?? {}
if type_of(raw) == "dict" {
return raw
}
return nil
}
fn __sv_tool_entry_parameters(entry) {
let function = entry?.function
if type_of(function) == "dict" && type_of(function?.parameters) == "dict" {
return function.parameters
}
if type_of(entry?.parameters) == "dict" {
return entry.parameters
}
if type_of(entry?.input_schema) == "dict" {
return entry.input_schema
}
if type_of(entry?.inputSchema) == "dict" {
return entry.inputSchema
}
return {}
}
fn __sv_parameter_entries(parameters) {
if type_of(parameters?.properties) == "dict" {
return parameters.properties
}
var entries = {}
for name in __sv_dict(parameters).keys() {
if !contains(
["type", "properties", "required", "additionalProperties", "description", "title", "$schema"],
name,
) {
entries = entries + {[name]: parameters[name]}
}
}
return entries
}
fn __sv_param_is_required(parameters, name, schema) -> bool {
if type_of(parameters?.properties) == "dict" {
return contains(__sv_string_list(parameters?.required), name)
}
if type_of(schema) == "dict" {
if schema?.default != nil {
return false
}
if type_of(schema?.required) == "bool" {
return schema.required
}
if type_of(schema?.optional) == "bool" {
return !schema.optional
}
}
return true
}
fn __sv_schema_type_names(schema) {
var raw = nil
if type_of(schema) == "string" {
raw = schema
} else if type_of(schema) == "dict" {
raw = schema?.type
}
var names = []
if type_of(raw) == "list" {
for item in raw {
let name = lowercase(trim(__sv_string(item)))
if name != "" {
names = names.push(name)
}
}
} else {
let name = lowercase(trim(__sv_string(raw)))
if name != "" {
names = names.push(name)
}
}
return names
}
fn __sv_value_matches_schema_type(value, type_name) -> bool {
let actual = type_of(value)
if type_name == "any" || type_name == "unknown" {
return true
}
if type_name == "string" {
return actual == "string"
}
if type_name == "integer" || type_name == "int" {
return actual == "int"
}
if type_name == "number" || type_name == "float" {
return actual == "int" || actual == "float"
}
if type_name == "boolean" || type_name == "bool" {
return actual == "bool"
}
if type_name == "array" || type_name == "list" {
return actual == "list"
}
if type_name == "object" || type_name == "dict" {
return actual == "dict"
}
return true
}
fn __sv_type_violation(tool_name, arg_name, value, schema) {
if value == nil {
return nil
}
let expected = __sv_schema_type_names(schema)
if len(expected) == 0 {
return nil
}
for type_name in expected {
if __sv_value_matches_schema_type(value, type_name) {
return nil
}
}
return "Tool '"
+ tool_name
+ "' parameter '"
+ arg_name
+ "' expected "
+ join(expected, "|")
+ " but got "
+ type_of(value)
+ "."
}
fn __sv_tool_schema_violations(payload) {
var violations = []
let registry = payload?.tools
for call in __sv_list(payload?.tool_calls) {
let tool_name = __sv_tool_call_name(call)
if tool_name == "" {
violations = violations.push("Tool call is missing a tool name.")
continue
}
let entry = __sv_registry_tool_entry(registry, tool_name)
if entry == nil {
violations = violations.push("Unknown tool '" + tool_name + "'.")
continue
}
let args = __sv_tool_call_arguments(call)
if args == nil {
violations = violations.push("Tool '" + tool_name + "' arguments must be a dict.")
continue
}
let parameters = __sv_tool_entry_parameters(entry)
let entries = __sv_parameter_entries(parameters)
var missing = []
for name in entries.keys() {
let schema = entries[name]
if __sv_param_is_required(parameters, name, schema) && args[name] == nil {
missing = missing.push(name)
} else {
let type_error = __sv_type_violation(tool_name, name, args[name], schema)
if type_error != nil {
violations = violations.push(type_error)
}
}
}
if len(missing) > 0 {
violations = violations
.push(
"Tool '"
+ tool_name
+ "' is missing required parameter(s): "
+ join(missing, ", ")
+ ". Provide all required parameters and try again.",
)
}
}
return violations
}
fn __sv_tool_calls_well_formed(payload) {
let is_text_tool_format = lowercase(__sv_string(payload?.tool_format)) == "text"
let should_enforce_text_protocol = is_text_tool_format && __sv_has_non_structural_tools(payload)
let parse_errors = if should_enforce_text_protocol {
__sv_string_list(payload?.tool_parse_errors)
} else {
[]
}
let protocol_violations = if should_enforce_text_protocol {
__sv_string_list(payload?.protocol_violations)
} else {
[]
}
let schema_violations = __sv_tool_schema_violations(payload)
if len(parse_errors) == 0 && len(protocol_violations) == 0 && len(schema_violations) == 0 {
return nil
}
let details = __sv_join_messages(protocol_violations + parse_errors + schema_violations)
return {
rule: "tool_calls_well_formed",
diagnostic: "Assistant emitted malformed tool calls: " + details,
recommended_action: "Emit only well-formed tool calls that match the bound tool schemas and Harn tool-call protocol.",
}
}
fn __sv_output_token_cap_with_zero_calls(payload) {
if !__sv_workspace_has_write_capability(payload) {
return nil
}
if len(__sv_list(payload?.tool_calls)) > 0 {
return nil
}
let max_output_tokens = to_int(payload?.max_output_tokens) ?? 0
if max_output_tokens <= 0 {
return nil
}
let output_tokens = to_int(payload?.output_tokens) ?? 0
if output_tokens * 100 < max_output_tokens * 95 {
return nil
}
return {
rule: "output_token_cap_with_zero_calls",
diagnostic: "Assistant used nearly the full output-token budget without emitting any tool calls.",
recommended_action: "The model appears stuck in a prose loop. Emit the next tool call directly or shorten the narration and try again.",
}
}
fn __sv_feedback_payload(verdict) {
return json_stringify(
{rule: verdict.rule, diagnostic: verdict.diagnostic, recommended_action: verdict.recommended_action},
)
}
fn __sv_emit_decision(
session_id,
iteration,
cfg,
verdict,
attempts,
vetoed = true,
skipped = false,
reason = nil,
) {
agent_emit_event(
session_id,
"structural_validator_decision",
{
iteration: iteration,
rule: verdict?.rule ?? "",
diagnostic: verdict?.diagnostic ?? "",
recommended_action: verdict?.recommended_action ?? "",
vetoed: vetoed,
skipped: skipped,
reason: reason,
on_failure: cfg.on_failure,
attempts: attempts,
max_attempts: cfg.max_attempts,
},
)
}
fn __sv_pass_result(call, configured, skipped = false, reason = nil, extra = nil) {
let result = {configured: configured, vetoed: false, skipped: skipped, reason: reason}
let merged = if type_of(extra) == "dict" {
result + extra
} else {
result
}
return {
ok: true,
status: "ok",
tool_name: call.tool_name,
tool_call_id: call.call_id,
arguments: call.tool_args,
result: merged,
rendered_result: "",
observation: "",
error: nil,
error_category: nil,
executor: "harn",
}
}
fn __sv_veto_result(call, cfg, verdict) {
return {
ok: true,
status: "ok",
tool_name: call.tool_name,
tool_call_id: call.call_id,
arguments: call.tool_args,
result: {
configured: true,
vetoed: true,
skipped: false,
rule: verdict.rule,
diagnostic: verdict.diagnostic,
recommended_action: verdict.recommended_action,
feedback: __sv_feedback_payload(verdict),
on_failure: cfg.on_failure,
},
rendered_result: verdict.diagnostic,
observation: "",
error: nil,
error_category: nil,
executor: "harn",
}
}
fn __sv_rule_verdict(payload, cfg, rule_cfg) {
let rule = __sv_string(rule_cfg?.name)
if rule == "non_empty_when_writes_expected" {
return __sv_non_empty_when_writes_expected(payload)
}
if rule == "no_phantom_completion" {
return __sv_no_phantom_completion(payload, cfg, rule_cfg)
}
if rule == "tool_calls_well_formed" {
return __sv_tool_calls_well_formed(payload)
}
if rule == "output_token_cap_with_zero_calls" {
return __sv_output_token_cap_with_zero_calls(payload)
}
return nil
}
fn __sv_handle_turn(call, cfg) {
let payload = __sv_dict(call?.tool_args)
let session_id = __sv_string(payload?.session_id ?? call?.turn?.session_id)
let iteration = to_int(payload?.iteration ?? call?.turn?.iteration) ?? 0
let attempts = to_int(payload?.attempts) ?? 0
if attempts >= cfg.max_attempts {
__sv_emit_decision(session_id, iteration, cfg, nil, attempts, false, true, "max_attempts_reached")
return __sv_pass_result(call, true, true, "max_attempts_reached")
}
for rule_cfg in cfg.rules {
let verdict = __sv_rule_verdict(payload, cfg, rule_cfg)
if verdict != nil {
if rule_cfg?.warn_only ?? false {
__sv_emit_decision(session_id, iteration, cfg, verdict, attempts, false, false, "warn_only")
return __sv_pass_result(
call,
true,
false,
"warn_only",
{
warned: true,
rule: verdict.rule,
diagnostic: verdict.diagnostic,
recommended_action: verdict.recommended_action,
},
)
}
__sv_emit_decision(session_id, iteration, cfg, verdict, attempts)
return __sv_veto_result(call, cfg, verdict)
}
}
return __sv_pass_result(call, true)
}
/**
* with_structural_validator(opts) -> caller
*
* Deterministic pre-dispatch turn validator. The current landable rule
* set covers deterministic tool-use and completion-shape checks.
*
* Options:
* on_failure: "regenerate_with_feedback" | "raise"
* max_attempts: int > 0 (default 3)
* locale: string (default "en")
* no_phantom_completion_catalog: {locale: [phrase, ...]}
* rules: ["non_empty_when_writes_expected", ...]
* or [{name: "tool_calls_well_formed", warn_only: true}]
*
* @effects: [host]
* @allocation: heap
* @errors: [runtime]
* @api_stability: experimental
* Pass the returned closure via `agent_loop({structural_validator: ...})`.
*
* @example: with_structural_validator({on_failure: "regenerate_with_feedback"})
*/
pub fn with_structural_validator(opts = nil) {
let cfg = __sv_validate_opts(opts)
return { call, next ->
if call?.tool_name != __sv_tool_name() {
return next(call)
}
return __sv_handle_turn(call, cfg)
}
}