use std::rc::Rc;
use crate::stdlib::{json_to_vm_value, schema_result_value};
use crate::value::{VmError, VmValue};
use super::helpers::extract_llm_options;
use super::trace::{emit_agent_event, AgentTraceEvent};
use super::{agent_config, agent_observe, api, helpers, routing, structural_experiments};
fn output_validation_mode(opts: &api::LlmCallOptions) -> &str {
opts.output_validation.as_deref().unwrap_or("off")
}
fn schema_validation_errors(result: &VmValue) -> Vec<String> {
match result {
VmValue::EnumVariant {
enum_name,
variant,
fields,
} if enum_name.as_ref() == "Result" && variant.as_ref() == "Err" => fields
.first()
.and_then(|payload| payload.as_dict())
.and_then(|payload| payload.get("errors"))
.and_then(|errors| match errors {
VmValue::List(items) => Some(items.iter().map(|err| err.display()).collect()),
_ => None,
})
.unwrap_or_else(|| vec!["schema validation failed".to_string()]),
_ => Vec::new(),
}
}
pub(super) fn compute_validation_errors(data: &VmValue, opts: &api::LlmCallOptions) -> Vec<String> {
let Some(schema_json) = &opts.output_schema else {
return Vec::new();
};
let schema_vm = json_to_vm_value(schema_json);
let validation = schema_result_value(data, &schema_vm, false);
schema_validation_errors(&validation)
}
pub(crate) fn structured_output_errors(
result: &VmValue,
opts: &api::LlmCallOptions,
) -> Vec<String> {
let Some(dict) = result.as_dict() else {
return vec!["structured output result was not a dict".to_string()];
};
if let Some(data) = dict.get("data") {
return compute_validation_errors(data, opts);
}
let mut errors = vec!["response did not contain parseable JSON".to_string()];
if let Some(VmValue::List(violations)) = dict.get("protocol_violations") {
let joined = violations
.iter()
.map(VmValue::display)
.collect::<Vec<_>>()
.join("; ");
if !joined.is_empty() {
errors.push(format!("protocol violations: {joined}"));
}
}
if let Some(stop_reason) = dict.get("stop_reason").map(VmValue::display) {
if matches!(stop_reason.as_str(), "length" | "max_tokens") {
errors.push("response hit the token limit before producing complete JSON".to_string());
}
}
errors
}
#[derive(Debug, Clone)]
pub(crate) enum SchemaNudge {
Auto,
Verbatim(String),
Disabled,
}
pub(crate) fn parse_schema_nudge(
options: &Option<std::collections::BTreeMap<String, VmValue>>,
) -> SchemaNudge {
let Some(opts) = options.as_ref() else {
return SchemaNudge::Auto;
};
match opts.get("schema_retry_nudge") {
None | Some(VmValue::Nil) => SchemaNudge::Auto,
Some(VmValue::Bool(true)) => SchemaNudge::Auto,
Some(VmValue::Bool(false)) => SchemaNudge::Disabled,
Some(VmValue::String(s)) => SchemaNudge::Verbatim(s.to_string()),
Some(other) => SchemaNudge::Verbatim(other.display()),
}
}
pub(crate) fn build_schema_nudge(
errors: &[String],
schema: Option<&serde_json::Value>,
mode: &SchemaNudge,
) -> String {
let errors_line = if errors.is_empty() {
String::from("(no detailed errors)")
} else {
errors.join("; ")
};
match mode {
SchemaNudge::Disabled => String::new(),
SchemaNudge::Verbatim(s) => {
format!("{s}\n\nValidation errors: {errors_line}")
}
SchemaNudge::Auto => {
let mut required_keys: Vec<String> = Vec::new();
let mut property_keys: Vec<String> = Vec::new();
let mut shape_lines: Vec<String> = Vec::new();
if let Some(schema) = schema {
if let Some(req) = schema.get("required").and_then(|v| v.as_array()) {
for r in req {
if let Some(k) = r.as_str() {
required_keys.push(k.to_string());
}
}
}
if let Some(props) = schema.get("properties").and_then(|v| v.as_object()) {
for k in props.keys() {
property_keys.push(k.clone());
}
}
collect_schema_shape_lines(schema, "root", 0, &mut shape_lines);
}
let mut msg =
String::from("Your previous response did not match the required JSON schema.");
msg.push_str(&format!("\nValidation errors: {errors_line}."));
if !required_keys.is_empty() {
msg.push_str(&format!("\nRequired keys: {}.", required_keys.join(", ")));
}
if !property_keys.is_empty() {
msg.push_str(&format!(
"\nAllowed top-level keys: {}.",
property_keys.join(", ")
));
}
if !shape_lines.is_empty() {
msg.push_str("\nExpected JSON schema shape:");
for line in shape_lines {
msg.push_str("\n- ");
msg.push_str(&line);
}
}
msg.push_str(
"\nRespond again with ONLY valid JSON conforming to the schema. No prose, no markdown fences.",
);
msg
}
}
}
const SCHEMA_NUDGE_MAX_DEPTH: usize = 3;
const SCHEMA_NUDGE_MAX_LINES: usize = 8;
const SCHEMA_NUDGE_MAX_KEYS: usize = 16;
fn collect_schema_shape_lines(
schema: &serde_json::Value,
path: &str,
depth: usize,
lines: &mut Vec<String>,
) {
if depth > SCHEMA_NUDGE_MAX_DEPTH || lines.len() >= SCHEMA_NUDGE_MAX_LINES {
return;
}
let object_like = schema
.get("type")
.and_then(|value| value.as_str())
.is_some_and(|kind| kind == "object")
|| schema.get("properties").is_some();
if object_like {
if let Some(props) = schema.get("properties").and_then(|value| value.as_object()) {
let mut keys = props.keys().cloned().collect::<Vec<_>>();
keys.sort();
if !keys.is_empty() {
let mut line = format!("{path} object allowed keys: {}", join_limited_keys(&keys));
let required = schema_required_keys(schema);
if !required.is_empty() {
line.push_str(&format!(
"; required keys: {}",
join_limited_keys(&required)
));
}
lines.push(line);
}
for key in keys {
if lines.len() >= SCHEMA_NUDGE_MAX_LINES {
break;
}
if let Some(child_schema) = props.get(&key) {
let child_path = if path == "root" {
key
} else {
format!("{path}.{key}")
};
collect_schema_shape_lines(child_schema, &child_path, depth + 1, lines);
}
}
}
}
let array_like = schema
.get("type")
.and_then(|value| value.as_str())
.is_some_and(|kind| kind == "array")
|| schema.get("items").is_some();
if array_like {
if let Some(items) = schema.get("items") {
collect_schema_shape_lines(items, &format!("{path}[]"), depth + 1, lines);
}
}
}
fn schema_required_keys(schema: &serde_json::Value) -> Vec<String> {
let mut keys = schema
.get("required")
.and_then(|value| value.as_array())
.map(|items| {
items
.iter()
.filter_map(|item| item.as_str().map(ToString::to_string))
.collect::<Vec<_>>()
})
.unwrap_or_default();
keys.sort();
keys
}
fn join_limited_keys(keys: &[String]) -> String {
if keys.len() <= SCHEMA_NUDGE_MAX_KEYS {
return keys.join(", ");
}
format!(
"{}, ... (+{} more)",
keys[..SCHEMA_NUDGE_MAX_KEYS].join(", "),
keys.len() - SCHEMA_NUDGE_MAX_KEYS
)
}
pub(super) async fn llm_call_impl(args: Vec<VmValue>) -> Result<VmValue, VmError> {
let options = args.get(2).and_then(|a| a.as_dict()).cloned();
let opts = extract_llm_options(&args)?;
let provider = opts.provider.clone();
let model = opts.model.clone();
let _llm_render_guard = crate::stdlib::template::LlmRenderContextGuard::enter(
crate::stdlib::template::LlmRenderContext::resolve(&provider, &model),
);
match execute_llm_call(opts, options, None).await {
Ok(v) => Ok(v),
Err(err) => Err(VmError::Thrown(build_llm_error_dict(
&err, &provider, &model,
))),
}
}
pub(crate) fn build_llm_error_dict(err: &VmError, provider: &str, model: &str) -> VmValue {
let category = crate::value::error_to_category(err);
let message = llm_error_message(err);
let llm_error = api::classify_llm_error(category.clone(), &message);
if let VmError::Thrown(VmValue::Dict(existing)) = err {
let mut dict = existing.as_ref().clone();
dict.entry("category".to_string())
.or_insert_with(|| VmValue::String(Rc::from(category.as_str())));
dict.entry("kind".to_string())
.or_insert_with(|| VmValue::String(Rc::from(llm_error.kind.as_str())));
dict.entry("reason".to_string())
.or_insert_with(|| VmValue::String(Rc::from(llm_error.reason.as_str())));
dict.entry("message".to_string())
.or_insert_with(|| VmValue::String(Rc::from(message.as_str())));
dict.insert("provider".to_string(), VmValue::String(Rc::from(provider)));
dict.insert("model".to_string(), VmValue::String(Rc::from(model)));
return VmValue::Dict(Rc::new(dict));
}
let mut dict = std::collections::BTreeMap::new();
dict.insert(
"category".to_string(),
VmValue::String(Rc::from(category.as_str())),
);
dict.insert(
"kind".to_string(),
VmValue::String(Rc::from(llm_error.kind.as_str())),
);
dict.insert(
"reason".to_string(),
VmValue::String(Rc::from(llm_error.reason.as_str())),
);
dict.insert("message".to_string(), VmValue::String(Rc::from(message)));
if let Some(ms) = agent_observe::extract_retry_after_ms(err) {
dict.insert("retry_after_ms".to_string(), VmValue::Int(ms as i64));
}
dict.insert("provider".to_string(), VmValue::String(Rc::from(provider)));
dict.insert("model".to_string(), VmValue::String(Rc::from(model)));
VmValue::Dict(Rc::new(dict))
}
fn llm_error_message(err: &VmError) -> String {
match err {
VmError::CategorizedError { message, .. } => message.clone(),
VmError::Thrown(VmValue::String(s)) => s.to_string(),
VmError::Thrown(VmValue::Dict(d)) => d
.get("message")
.map(|v| v.display())
.unwrap_or_else(|| err.to_string()),
_ => err.to_string(),
}
}
pub(crate) async fn execute_llm_call(
opts: api::LlmCallOptions,
options: Option<std::collections::BTreeMap<String, VmValue>>,
bridge: Option<&Rc<crate::bridge::HostBridge>>,
) -> Result<VmValue, VmError> {
if let Some(policy) = opts.routing_policy.clone() {
return execute_with_routing_policy(policy, opts, bridge).await;
}
let outcome = execute_schema_retry_loop(opts, options, bridge).await?;
if outcome.errors.is_empty() {
return Ok(outcome.vm_result);
}
let hint = if outcome.schema_retries_budget == 0 {
" (hint: set `schema_retries: N` in the llm_call options to automatically re-prompt the model with a corrective nudge)"
} else {
" (hint: schema_retries budget exhausted — the model did not produce conforming output after the configured retries; consider raising `schema_retries` or relaxing the schema)"
};
let message = format!(
"LLM output failed schema validation: {}{hint}",
outcome.errors.join("; ")
);
match outcome.output_validation_mode.as_str() {
"error" => Err(crate::value::VmError::CategorizedError {
message,
category: crate::value::ErrorCategory::SchemaValidation,
}),
"warn" => {
crate::events::log_warn("llm", &message);
Ok(outcome.vm_result)
}
_ => Ok(outcome.vm_result),
}
}
async fn execute_with_routing_policy(
policy: Rc<routing::RoutingPolicyConfig>,
mut opts: api::LlmCallOptions,
bridge: Option<&Rc<crate::bridge::HostBridge>>,
) -> Result<VmValue, VmError> {
let (result, trace) = routing::execute_with_routing(&policy, opts.clone(), bridge).await?;
opts.provider = result.provider.clone();
opts.model = result.model.clone();
opts.routing_decision = Some(routing::trace_to_decision(&trace, &policy));
let envelope = agent_config::build_llm_call_result(&result, &opts);
Ok(attach_routing_block(envelope, &trace, &policy))
}
fn attach_routing_block(
envelope: VmValue,
trace: &routing::RoutingTrace,
policy: &routing::RoutingPolicyConfig,
) -> VmValue {
let VmValue::Dict(dict) = envelope else {
return envelope;
};
let mut dict = dict.as_ref().clone();
let mut routing_dict = std::collections::BTreeMap::new();
let label = if trace.label.is_empty() {
policy.label.clone()
} else {
trace.label.clone()
};
routing_dict.insert("policy".to_string(), VmValue::String(Rc::from(label)));
routing_dict.insert("attempts".to_string(), routing::trace_to_vm_attempts(trace));
if let Some(selected) = trace.selected {
routing_dict.insert("selected".to_string(), VmValue::Int(selected as i64));
}
routing_dict.insert(
"session_cost_usd".to_string(),
VmValue::Float(trace.session_cost_usd),
);
dict.insert("routing".to_string(), VmValue::Dict(Rc::new(routing_dict)));
VmValue::Dict(Rc::new(dict))
}
pub(crate) struct SchemaLoopOutcome {
pub vm_result: VmValue,
pub raw_text: String,
pub errors: Vec<String>,
pub attempts: usize,
pub schema_retries_budget: usize,
pub output_validation_mode: String,
}
pub(crate) async fn execute_schema_retry_loop(
mut opts: api::LlmCallOptions,
options: Option<std::collections::BTreeMap<String, VmValue>>,
bridge: Option<&Rc<crate::bridge::HostBridge>>,
) -> Result<SchemaLoopOutcome, VmError> {
let _ = structural_experiments::apply_structural_experiment(&mut opts, None).await?;
let retry_config = agent_observe::LlmRetryConfig {
retries: helpers::opt_int(&options, "llm_retries")
.unwrap_or(agent_observe::DEFAULT_LLM_CALL_RETRIES as i64)
.max(0) as usize,
backoff_ms: helpers::opt_int(&options, "llm_backoff_ms")
.unwrap_or(agent_observe::DEFAULT_LLM_CALL_BACKOFF_MS as i64)
.max(0) as u64,
};
let schema_retries = helpers::opt_int(&options, "schema_retries")
.unwrap_or(1)
.max(0) as usize;
let nudge_mode = parse_schema_nudge(&options);
let tool_format = helpers::opt_str(&options, "tool_format");
let bridged = bridge.is_some();
let user_visible = bridged && helpers::opt_bool(&options, "user_visible");
let output_validation_mode = output_validation_mode(&opts).to_string();
let expects_structured = helpers::expects_structured_output(&opts);
let original_messages = opts.messages.clone();
for attempt in 0..=schema_retries {
let result = agent_observe::observed_llm_call(
&opts,
tool_format.as_deref(),
bridge,
&retry_config,
None,
user_visible,
bridged, None,
)
.await?;
let raw_text = result.text.clone();
let vm_result = agent_config::build_llm_call_result(&result, &opts);
if !expects_structured {
return Ok(SchemaLoopOutcome {
vm_result,
raw_text,
errors: Vec::new(),
attempts: attempt + 1,
schema_retries_budget: schema_retries,
output_validation_mode,
});
}
let errors = structured_output_errors(&vm_result, &opts);
if errors.is_empty() {
return Ok(SchemaLoopOutcome {
vm_result,
raw_text,
errors,
attempts: attempt + 1,
schema_retries_budget: schema_retries,
output_validation_mode,
});
}
let more_attempts = attempt < schema_retries;
if more_attempts {
let nudge = build_schema_nudge(&errors, opts.output_schema.as_ref(), &nudge_mode);
emit_agent_event(AgentTraceEvent::SchemaRetry {
attempt: attempt + 1,
errors: errors.clone(),
nudge_used: !nudge.is_empty(),
correction_prompt: nudge.clone(),
});
opts.messages = original_messages.clone();
if !nudge.is_empty() {
opts.messages.push(serde_json::json!({
"role": "user",
"content": nudge,
}));
}
continue;
}
return Ok(SchemaLoopOutcome {
vm_result,
raw_text,
errors,
attempts: attempt + 1,
schema_retries_budget: schema_retries,
output_validation_mode,
});
}
unreachable!("schema retry loop exited without returning");
}
pub(super) fn llm_safe_envelope_ok(response: VmValue) -> VmValue {
let mut dict = std::collections::BTreeMap::new();
dict.insert("ok".to_string(), VmValue::Bool(true));
dict.insert("response".to_string(), response);
dict.insert("error".to_string(), VmValue::Nil);
VmValue::Dict(Rc::new(dict))
}
pub(super) fn llm_safe_envelope_err(err: &VmError) -> VmValue {
if let VmError::Thrown(VmValue::Dict(d)) = err {
let mut dict = std::collections::BTreeMap::new();
dict.insert("ok".to_string(), VmValue::Bool(false));
dict.insert("response".to_string(), VmValue::Nil);
dict.insert("error".to_string(), VmValue::Dict(d.clone()));
return VmValue::Dict(Rc::new(dict));
}
let category = crate::value::error_to_category(err);
let message = llm_error_message(err);
let llm_error = api::classify_llm_error(category.clone(), &message);
let mut err_dict = std::collections::BTreeMap::new();
err_dict.insert(
"category".to_string(),
VmValue::String(Rc::from(category.as_str())),
);
err_dict.insert(
"kind".to_string(),
VmValue::String(Rc::from(llm_error.kind.as_str())),
);
err_dict.insert(
"reason".to_string(),
VmValue::String(Rc::from(llm_error.reason.as_str())),
);
err_dict.insert("message".to_string(), VmValue::String(Rc::from(message)));
let mut dict = std::collections::BTreeMap::new();
dict.insert("ok".to_string(), VmValue::Bool(false));
dict.insert("response".to_string(), VmValue::Nil);
dict.insert("error".to_string(), VmValue::Dict(Rc::new(err_dict)));
VmValue::Dict(Rc::new(dict))
}
pub(crate) fn rewrite_structured_args(args: Vec<VmValue>) -> Result<Vec<VmValue>, VmError> {
if args.len() < 2 {
return Err(VmError::Runtime(
"llm_call_structured: missing required `schema` argument (expected \
(prompt, schema, options?))"
.to_string(),
));
}
let prompt = args.first().cloned().unwrap_or(VmValue::Nil);
let schema = match args.get(1) {
Some(VmValue::Dict(_)) => args.get(1).cloned().unwrap(),
Some(other) => {
return Err(VmError::Runtime(format!(
"llm_call_structured: `schema` must be a dict (JSON Schema), got {}",
other.type_name()
)));
}
None => unreachable!("len check above guarantees arg index 1"),
};
let mut options = args
.get(2)
.and_then(|a| a.as_dict())
.cloned()
.unwrap_or_default();
let system = options
.remove("system")
.filter(|v| !matches!(v, VmValue::Nil));
let retries_alias = options.remove("retries").and_then(|v| v.as_int());
if let Some(n) = retries_alias {
options
.entry("schema_retries".to_string())
.or_insert(VmValue::Int(n));
} else {
options
.entry("schema_retries".to_string())
.or_insert(VmValue::Int(3));
}
options
.entry("output_schema".to_string())
.or_insert(schema.clone());
options
.entry("json_schema".to_string())
.or_insert(schema.clone());
options
.entry("output_format".to_string())
.or_insert_with(|| {
let mut fmt = std::collections::BTreeMap::new();
fmt.insert("kind".to_string(), VmValue::String(Rc::from("json_schema")));
fmt.insert("schema".to_string(), schema);
fmt.insert("strict".to_string(), VmValue::Bool(true));
VmValue::Dict(Rc::new(fmt))
});
options
.entry("response_format".to_string())
.or_insert(VmValue::String(Rc::from("json")));
options
.entry("output_validation".to_string())
.or_insert(VmValue::String(Rc::from("error")));
Ok(vec![
prompt,
system.unwrap_or(VmValue::Nil),
VmValue::Dict(Rc::new(options)),
])
}
pub(crate) fn extract_structured_data(response: VmValue) -> VmValue {
match response {
VmValue::Dict(d) => d.get("data").cloned().unwrap_or(VmValue::Nil),
other => other,
}
}
pub(crate) fn structured_safe_envelope_ok(data: VmValue) -> VmValue {
let mut dict = std::collections::BTreeMap::new();
dict.insert("ok".to_string(), VmValue::Bool(true));
dict.insert("data".to_string(), data);
dict.insert("error".to_string(), VmValue::Nil);
VmValue::Dict(Rc::new(dict))
}
pub(crate) fn structured_safe_envelope_err(err: &VmError) -> VmValue {
if let VmError::Thrown(VmValue::Dict(d)) = err {
let mut dict = std::collections::BTreeMap::new();
dict.insert("ok".to_string(), VmValue::Bool(false));
dict.insert("data".to_string(), VmValue::Nil);
dict.insert("error".to_string(), VmValue::Dict(d.clone()));
return VmValue::Dict(Rc::new(dict));
}
let category = crate::value::error_to_category(err);
let message = llm_error_message(err);
let llm_error = api::classify_llm_error(category.clone(), &message);
let mut err_dict = std::collections::BTreeMap::new();
err_dict.insert(
"category".to_string(),
VmValue::String(Rc::from(category.as_str())),
);
err_dict.insert(
"kind".to_string(),
VmValue::String(Rc::from(llm_error.kind.as_str())),
);
err_dict.insert(
"reason".to_string(),
VmValue::String(Rc::from(llm_error.reason.as_str())),
);
err_dict.insert("message".to_string(), VmValue::String(Rc::from(message)));
let mut dict = std::collections::BTreeMap::new();
dict.insert("ok".to_string(), VmValue::Bool(false));
dict.insert("data".to_string(), VmValue::Nil);
dict.insert("error".to_string(), VmValue::Dict(Rc::new(err_dict)));
VmValue::Dict(Rc::new(dict))
}