use std::cell::RefCell;
use std::rc::Rc;
use crate::event_log::EventLog;
use crate::value::VmError;
use super::api::{vm_call_llm_full_streaming, vm_call_llm_full_streaming_offthread, DeltaSender};
use super::trace::{trace_llm_call, LlmTraceEntry};
use super::agent_tools::next_call_id;
thread_local! {
static LAST_SYSTEM_PROMPT_HASH: RefCell<Option<u64>> = const { RefCell::new(None) };
static LAST_TOOL_SCHEMAS_HASH: RefCell<Option<u64>> = const { RefCell::new(None) };
static CURRENT_ITERATION: RefCell<Option<usize>> = const { RefCell::new(None) };
}
fn hash_str(value: &str) -> u64 {
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
value.hash(&mut hasher);
hasher.finish()
}
fn hash_json(value: &serde_json::Value) -> u64 {
let encoded = serde_json::to_string(value).unwrap_or_default();
hash_str(&encoded)
}
pub(crate) fn reset_transcript_dedup() {
LAST_SYSTEM_PROMPT_HASH.with(|cell| *cell.borrow_mut() = None);
LAST_TOOL_SCHEMAS_HASH.with(|cell| *cell.borrow_mut() = None);
CURRENT_ITERATION.with(|cell| *cell.borrow_mut() = None);
}
pub(crate) fn set_current_iteration(iteration: Option<usize>) {
CURRENT_ITERATION.with(|cell| *cell.borrow_mut() = iteration);
}
fn current_iteration() -> Option<usize> {
CURRENT_ITERATION.with(|cell| *cell.borrow())
}
pub(super) fn is_retryable_llm_error(err: &VmError) -> bool {
use crate::value::{classify_error_message, ErrorCategory};
let msg = match err {
VmError::CategorizedError { category, message } => {
let llm_info = crate::llm::api::classify_llm_error(category.clone(), message);
return if llm_info.reason == crate::llm::api::LlmErrorReason::Unknown {
category.is_transient()
} else {
llm_info.kind == crate::llm::api::LlmErrorKind::Transient
};
}
VmError::Thrown(crate::value::VmValue::Dict(d)) => {
if let Some(kind) = d.get("kind").map(|v| v.display()) {
return kind == "transient";
}
if let Some(category) = d.get("category").map(|v| v.display()) {
return ErrorCategory::parse(&category).is_transient();
}
return false;
}
VmError::Thrown(crate::value::VmValue::String(s)) => s.as_ref(),
VmError::Runtime(s) => s.as_str(),
_ => return false,
};
let category = classify_error_message(msg);
let llm_info = crate::llm::api::classify_llm_error(category.clone(), msg);
if llm_info.kind == crate::llm::api::LlmErrorKind::Transient {
return true;
}
if llm_info.reason != crate::llm::api::LlmErrorReason::Unknown {
return false;
}
let derived = classify_error_message(msg);
if derived != ErrorCategory::Generic {
return derived.is_transient();
}
let lower = msg.to_lowercase();
lower.contains("too many requests")
|| lower.contains("rate limit")
|| lower.contains("overloaded")
|| lower.contains("service unavailable")
|| lower.contains("bad gateway")
|| lower.contains("gateway timeout")
|| lower.contains("timed out")
|| lower.contains("timeout")
|| lower.contains("delivered no content")
|| lower.contains("eof")
}
fn is_empty_completion_retry_error(err: &VmError) -> bool {
let msg = match err {
VmError::Thrown(crate::value::VmValue::String(s)) => s.as_ref(),
VmError::CategorizedError { message, .. } => message.as_str(),
VmError::Runtime(s) => s.as_str(),
_ => return false,
};
let lower = msg.to_lowercase();
lower.contains("completion_tokens=") && lower.contains("delivered no content")
}
pub(super) fn extract_retry_after_ms(err: &VmError) -> Option<u64> {
let msg = match err {
VmError::Thrown(crate::value::VmValue::String(s)) => s.as_ref(),
VmError::Thrown(crate::value::VmValue::Dict(d)) => {
return d.get("retry_after_ms").and_then(|v| match v {
crate::value::VmValue::Int(ms) if *ms >= 0 => Some(*ms as u64),
_ => None,
});
}
VmError::CategorizedError { message, .. } => message.as_str(),
VmError::Runtime(s) => s.as_str(),
_ => return None,
};
parse_retry_after(msg)
}
pub(crate) fn parse_retry_after(msg: &str) -> Option<u64> {
const MAX_MS: u64 = 60_000;
let lower = msg.to_lowercase();
let pos = lower.find("retry-after:")?;
let after = &msg[pos + "retry-after:".len()..];
let end = after.find(['\r', '\n']).unwrap_or(after.len());
let value = after[..end].trim();
if value.is_empty() {
return None;
}
if let Some(num_str) = value.split_whitespace().next() {
if let Ok(secs) = num_str.parse::<f64>() {
if !secs.is_finite() || secs < 0.0 {
return Some(0);
}
let ms = (secs * 1000.0) as u64;
return Some(ms.min(MAX_MS));
}
}
if let Ok(target) = httpdate::parse_http_date(value) {
let now = std::time::SystemTime::now();
let delta = target
.duration_since(now)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
return Some(delta.min(MAX_MS));
}
None
}
pub(super) fn append_llm_transcript_entry(entry: &serde_json::Value) {
append_llm_transcript_event_log(entry);
let dir = match std::env::var("HARN_LLM_TRANSCRIPT_DIR") {
Ok(d) if !d.is_empty() => d,
_ => return,
};
let _ = std::fs::create_dir_all(&dir);
let path = format!("{dir}/llm_transcript.jsonl");
if let Ok(line) = serde_json::to_string(&entry) {
use std::io::Write;
if let Ok(mut f) = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)
{
let _ = writeln!(f, "{line}");
}
}
}
fn append_llm_transcript_event_log(entry: &serde_json::Value) {
let Some(log) = crate::event_log::active_event_log() else {
return;
};
let topic = crate::event_log::Topic::new("agent.transcript.llm")
.expect("static transcript topic should be valid");
let kind = entry
.get("type")
.and_then(|value| value.as_str())
.unwrap_or("transcript_event")
.to_string();
let mut headers = std::collections::BTreeMap::new();
if let Some(span_id) = entry.get("span_id").and_then(|value| value.as_u64()) {
headers.insert("span_id".to_string(), span_id.to_string());
}
if let Some(context) = crate::triggers::dispatcher::current_dispatch_context() {
headers.insert("trigger_id".to_string(), context.binding_id.clone());
headers.insert(
"binding_key".to_string(),
format!("{}@v{}", context.binding_id, context.binding_version),
);
headers.insert("event_id".to_string(), context.trigger_event.id.0.clone());
headers.insert(
"trace_id".to_string(),
context.trigger_event.trace_id.0.clone(),
);
headers.insert("pipeline".to_string(), context.binding_id);
headers.insert("action".to_string(), context.action);
if let Some(tenant_id) = context.trigger_event.tenant_id {
headers.insert("tenant_id".to_string(), tenant_id.0);
}
}
let event = crate::event_log::LogEvent::new(kind, entry.clone()).with_headers(headers);
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(async move {
let _ = log.append(&topic, event).await;
});
} else {
let _ = futures::executor::block_on(log.append(&topic, event));
}
}
pub(crate) fn append_llm_observability_entry(
event_type: &str,
mut fields: serde_json::Map<String, serde_json::Value>,
) {
fields.insert("type".to_string(), serde_json::json!(event_type));
fields
.entry("timestamp".to_string())
.or_insert_with(|| serde_json::json!(chrono_now()));
fields
.entry("span_id".to_string())
.or_insert_with(|| serde_json::json!(crate::tracing::current_span_id()));
append_llm_transcript_entry(&serde_json::Value::Object(fields));
}
pub(crate) fn emit_message_event_with_iteration(
message: &serde_json::Value,
iteration: Option<usize>,
) {
let role = message
.get("role")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
append_llm_transcript_entry(&serde_json::json!({
"type": "message",
"timestamp": chrono_now(),
"span_id": crate::tracing::current_span_id(),
"iteration": iteration,
"role": role,
"content": message.get("content").cloned().unwrap_or(serde_json::Value::Null),
"tool_calls": message.get("tool_calls").cloned(),
"tool_call_id": message.get("tool_call_id").cloned(),
"name": message.get("name").cloned(),
}));
}
pub(crate) fn emit_message_event(message: &serde_json::Value) {
emit_message_event_with_iteration(message, current_iteration());
}
fn emit_system_prompt_if_changed(system: Option<&str>) {
let content = system.unwrap_or("");
let current = hash_str(content);
let changed = LAST_SYSTEM_PROMPT_HASH.with(|cell| {
let mut slot = cell.borrow_mut();
if slot.as_ref() == Some(¤t) {
false
} else {
*slot = Some(current);
true
}
});
if !changed {
return;
}
append_llm_transcript_entry(&serde_json::json!({
"type": "system_prompt",
"timestamp": chrono_now(),
"span_id": crate::tracing::current_span_id(),
"hash": current,
"content": content,
}));
}
fn emit_tool_schemas_if_changed(schemas: &[crate::llm::tools::ToolSchema]) {
let value = serde_json::to_value(schemas).unwrap_or(serde_json::Value::Null);
let current = hash_json(&value);
let changed = LAST_TOOL_SCHEMAS_HASH.with(|cell| {
let mut slot = cell.borrow_mut();
if slot.as_ref() == Some(¤t) {
false
} else {
*slot = Some(current);
true
}
});
if !changed {
return;
}
append_llm_transcript_entry(&serde_json::json!({
"type": "tool_schemas",
"timestamp": chrono_now(),
"span_id": crate::tracing::current_span_id(),
"hash": current,
"schemas": value,
}));
}
pub(super) fn dump_llm_request(
iteration: usize,
call_id: &str,
tool_format: &str,
opts: &super::api::LlmCallOptions,
) {
emit_system_prompt_if_changed(opts.system.as_deref());
let tool_schemas =
crate::llm::tools::collect_tool_schemas(opts.tools.as_ref(), opts.native_tools.as_deref());
emit_tool_schemas_if_changed(&tool_schemas);
let structural_experiment = opts
.applied_structural_experiment
.as_ref()
.map(serde_json::to_value)
.transpose()
.unwrap_or(None)
.unwrap_or(serde_json::Value::Null);
if let Some(decision) = opts.routing_decision.as_ref() {
append_llm_transcript_entry(&serde_json::json!({
"type": "routing_decision",
"iteration": iteration,
"call_id": call_id,
"span_id": crate::tracing::current_span_id(),
"timestamp": chrono_now(),
"policy": decision.policy.clone(),
"requested_quality": decision.requested_quality.clone(),
"selected_provider": decision.selected_provider.clone(),
"selected_model": decision.selected_model.clone(),
"fallback_chain": opts.fallback_chain.clone(),
"alternatives": decision.alternatives.clone(),
}));
}
append_llm_transcript_entry(&serde_json::json!({
"type": "provider_call_request",
"iteration": iteration,
"call_id": call_id,
"span_id": crate::tracing::current_span_id(),
"timestamp": chrono_now(),
"model": opts.model,
"provider": opts.provider,
"max_tokens": opts.max_tokens,
"temperature": opts.temperature,
"thinking": match &opts.thinking {
super::api::ThinkingConfig::Disabled => serde_json::json!({
"mode": "disabled",
"enabled": false,
"budget_tokens": serde_json::Value::Null,
}),
super::api::ThinkingConfig::Enabled { budget_tokens } => serde_json::json!({
"mode": "enabled",
"enabled": true,
"budget_tokens": budget_tokens,
}),
super::api::ThinkingConfig::Adaptive => serde_json::json!({
"mode": "adaptive",
"enabled": true,
"budget_tokens": serde_json::Value::Null,
}),
super::api::ThinkingConfig::Effort { level } => serde_json::json!({
"mode": "effort",
"level": level.as_str(),
"enabled": true,
"budget_tokens": serde_json::Value::Null,
}),
},
"tool_choice": opts.tool_choice,
"tool_format": tool_format,
"native_tool_count": opts.native_tools.as_ref().map(|tools| tools.len()).unwrap_or(0),
"message_count": opts.messages.len(),
"structural_experiment": structural_experiment,
"route_policy": opts.route_policy.as_label(),
"fallback_chain": opts.fallback_chain.clone(),
"routing_decision": opts.routing_decision.clone(),
}));
}
pub(super) fn dump_llm_response(
iteration: usize,
call_id: &str,
result: &super::api::LlmResult,
response_ms: u64,
structural_experiment: Option<&crate::llm::structural_experiments::AppliedStructuralExperiment>,
) {
let structural_experiment = structural_experiment
.map(serde_json::to_value)
.transpose()
.unwrap_or(None)
.unwrap_or(serde_json::Value::Null);
append_llm_transcript_entry(&serde_json::json!({
"type": "provider_call_response",
"iteration": iteration,
"call_id": call_id,
"span_id": crate::tracing::current_span_id(),
"timestamp": chrono_now(),
"provider": result.provider,
"model": result.model,
"text": result.text,
"tool_calls": result.tool_calls,
"input_tokens": result.input_tokens,
"output_tokens": result.output_tokens,
"cost_usd": crate::llm::cost::calculate_cost_for_provider(
&result.provider,
&result.model,
result.input_tokens,
result.output_tokens,
),
"cache_read_tokens": result.cache_read_tokens,
"cache_write_tokens": result.cache_write_tokens,
"cache_creation_input_tokens": result.cache_write_tokens,
"cache_hit_ratio": crate::llm::cost::cache_hit_ratio(
result.input_tokens,
result.cache_read_tokens,
result.cache_write_tokens,
),
"cache_savings_usd": crate::llm::cost::cache_savings_usd_for_provider(
&result.provider,
&result.model,
result.cache_read_tokens,
result.cache_write_tokens,
),
"cache_hit": result.cache_read_tokens > 0,
"thinking": result.thinking,
"thinking_summary": result.thinking_summary,
"response_ms": response_ms,
"structural_experiment": structural_experiment,
}));
}
pub(super) fn dump_llm_interpreted_response(
iteration: usize,
call_id: &str,
tool_format: &str,
prose: &str,
tool_calls: &[serde_json::Value],
tool_parse_errors: &[String],
) {
append_llm_transcript_entry(&serde_json::json!({
"type": "interpreted_response",
"iteration": iteration,
"call_id": call_id,
"span_id": crate::tracing::current_span_id(),
"timestamp": chrono_now(),
"tool_format": tool_format,
"prose": prose,
"tool_calls": tool_calls,
"tool_parse_errors": tool_parse_errors,
}));
}
pub(super) fn annotate_current_span(metadata: &[(&str, serde_json::Value)]) {
let Some(span_id) = crate::tracing::current_span_id() else {
return;
};
for (key, value) in metadata {
crate::tracing::span_set_metadata(span_id, key, value.clone());
}
}
pub(super) fn chrono_now() -> String {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default();
format!("{}.{:03}", now.as_secs(), now.subsec_millis())
}
pub(crate) struct StreamingDetectorContext {
pub session_id: String,
pub known_tools: std::collections::BTreeSet<String>,
}
pub(super) fn spawn_progress_forwarder(
bridge: &Rc<crate::bridge::HostBridge>,
call_id: String,
user_visible: bool,
detector_ctx: Option<StreamingDetectorContext>,
) -> DeltaSender {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<String>();
let bridge = bridge.clone();
let mut detector = detector_ctx.map(|ctx| {
crate::llm::tools::StreamingToolCallDetector::new(ctx.session_id, ctx.known_tools)
});
tokio::task::spawn_local(async move {
let mut token_count: u64 = 0;
while let Some(delta) = rx.recv().await {
token_count += 1;
bridge.send_call_progress(&call_id, &delta, token_count, user_visible);
if let Some(d) = detector.as_mut() {
for event in d.push(&delta) {
crate::agent_events::emit_event(&event);
}
}
}
if let Some(mut d) = detector {
for event in d.finalize() {
crate::agent_events::emit_event(&event);
}
}
});
tx
}
pub(super) fn spawn_detector_only_forwarder(detector_ctx: StreamingDetectorContext) -> DeltaSender {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<String>();
tokio::task::spawn_local(run_detector_loop(detector_ctx, rx));
tx
}
async fn run_detector_loop_with_sink(
detector_ctx: StreamingDetectorContext,
mut rx: tokio::sync::mpsc::UnboundedReceiver<String>,
mut sink: impl FnMut(&crate::agent_events::AgentEvent),
) {
let mut detector = crate::llm::tools::StreamingToolCallDetector::new(
detector_ctx.session_id,
detector_ctx.known_tools,
);
while let Some(delta) = rx.recv().await {
for event in detector.push(&delta) {
sink(&event);
}
}
for event in detector.finalize() {
sink(&event);
}
}
async fn run_detector_loop(
detector_ctx: StreamingDetectorContext,
rx: tokio::sync::mpsc::UnboundedReceiver<String>,
) {
run_detector_loop_with_sink(detector_ctx, rx, |event| {
crate::agent_events::emit_event(event);
})
.await;
}
pub(crate) const DEFAULT_LLM_CALL_RETRIES: usize = 0;
pub(crate) const DEFAULT_LLM_CALL_BACKOFF_MS: u64 = 250;
pub(crate) struct LlmRetryConfig {
pub retries: usize,
pub backoff_ms: u64,
}
impl Default for LlmRetryConfig {
fn default() -> Self {
Self {
retries: DEFAULT_LLM_CALL_RETRIES,
backoff_ms: DEFAULT_LLM_CALL_BACKOFF_MS,
}
}
}
fn llm_retry_backoff_ms(
error: &VmError,
retry_config: &LlmRetryConfig,
attempt: usize,
provider: &str,
) -> u64 {
if crate::llm::providers::MockProvider::should_intercept(provider) {
return 0;
}
extract_retry_after_ms(error)
.unwrap_or_else(|| retry_config.backoff_ms.saturating_mul(1 << attempt.min(4)))
}
pub(crate) async fn observed_llm_call(
opts: &super::api::LlmCallOptions,
tool_format: Option<&str>,
bridge: Option<&Rc<crate::bridge::HostBridge>>,
retry_config: &LlmRetryConfig,
iteration: Option<usize>,
user_visible: bool,
offthread: bool,
streaming_detector: Option<StreamingDetectorContext>,
) -> Result<super::api::LlmResult, VmError> {
let effective_tool_format = tool_format
.map(str::to_string)
.or_else(|| {
std::env::var("HARN_AGENT_TOOL_FORMAT")
.ok()
.filter(|value| !value.trim().is_empty())
})
.unwrap_or_else(|| crate::llm_config::default_tool_format(&opts.model, &opts.provider));
let mut attempt = 0usize;
loop {
super::rate_limit::acquire_permit(&opts.provider).await;
let call_id = next_call_id();
let prompt_chars: usize = opts
.messages
.iter()
.filter_map(|m| m.get("content").and_then(|c| c.as_str()))
.map(|s| s.len())
.sum();
let mut span_meta = vec![
("call_id", serde_json::json!(call_id.clone())),
("model", serde_json::json!(opts.model.clone())),
("provider", serde_json::json!(opts.provider.clone())),
("prompt_chars", serde_json::json!(prompt_chars)),
(
"route_policy",
serde_json::json!(opts.route_policy.as_label()),
),
(
"fallback_chain",
serde_json::json!(opts.fallback_chain.clone()),
),
];
if let Some(decision) = opts.routing_decision.as_ref() {
span_meta.push(("routing_decision", serde_json::json!(decision)));
}
if let Some(iter) = iteration {
span_meta.push(("iteration", serde_json::json!(iter)));
span_meta.push(("llm_attempt", serde_json::json!(attempt)));
}
annotate_current_span(&span_meta);
let mut call_start_meta =
serde_json::json!({"model": opts.model, "prompt_chars": prompt_chars});
call_start_meta["stream_publicly"] =
serde_json::json!(opts.response_format.as_deref() != Some("json"));
call_start_meta["user_visible"] = serde_json::json!(user_visible);
if let Some(iter) = iteration {
call_start_meta["iteration"] = serde_json::json!(iter);
call_start_meta["llm_attempt"] = serde_json::json!(attempt);
}
if let Some(b) = bridge {
b.send_call_start(&call_id, "llm", "llm_call", call_start_meta);
}
dump_llm_request(
iteration.unwrap_or(0),
&call_id,
&effective_tool_format,
opts,
);
let start = std::time::Instant::now();
let detector_ctx = streaming_detector
.as_ref()
.map(|c| StreamingDetectorContext {
session_id: c.session_id.clone(),
known_tools: c.known_tools.clone(),
});
let llm_result = if let Some(b) = bridge {
let delta_tx = spawn_progress_forwarder(b, call_id.clone(), user_visible, detector_ctx);
if offthread {
vm_call_llm_full_streaming_offthread(opts, delta_tx).await
} else {
vm_call_llm_full_streaming(opts, delta_tx).await
}
} else if offthread {
let delta_tx = match detector_ctx {
Some(ctx) => spawn_detector_only_forwarder(ctx),
None => {
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::<String>();
tx
}
};
vm_call_llm_full_streaming_offthread(opts, delta_tx).await
} else {
super::api::vm_call_llm_full(opts).await
};
let duration_ms = start.elapsed().as_millis() as u64;
match llm_result {
Ok(result) => {
annotate_current_span(&[
("status", serde_json::json!("ok")),
("input_tokens", serde_json::json!(result.input_tokens)),
("output_tokens", serde_json::json!(result.output_tokens)),
(
"cost_usd",
serde_json::json!(crate::llm::cost::calculate_cost_for_provider(
&result.provider,
&result.model,
result.input_tokens,
result.output_tokens,
)),
),
]);
dump_llm_response(
iteration.unwrap_or(0),
&call_id,
&result,
duration_ms,
opts.applied_structural_experiment.as_ref(),
);
annotate_current_span(&[(
"structural_experiment",
opts.applied_structural_experiment
.as_ref()
.map(serde_json::to_value)
.transpose()
.unwrap_or(None)
.unwrap_or(serde_json::Value::Null),
)]);
if let Some(b) = bridge {
b.send_call_end(
&call_id,
"llm",
"llm_call",
duration_ms,
"ok",
serde_json::json!({
"model": result.model,
"input_tokens": result.input_tokens,
"output_tokens": result.output_tokens,
"user_visible": user_visible,
"structural_experiment": opts.applied_structural_experiment.as_ref(),
}),
);
}
trace_llm_call(LlmTraceEntry {
model: result.model.clone(),
input_tokens: result.input_tokens,
output_tokens: result.output_tokens,
duration_ms,
});
if let Some(metrics) = crate::active_metrics_registry() {
metrics.record_llm_call(
&result.provider,
&result.model,
"succeeded",
super::cost::calculate_cost(
&result.model,
result.input_tokens,
result.output_tokens,
),
);
if result.cache_read_tokens > 0 {
metrics.record_llm_cache_hit(&result.provider);
}
}
super::trace::emit_agent_event(super::trace::AgentTraceEvent::LlmCall {
call_id: call_id.clone(),
model: result.model.clone(),
input_tokens: result.input_tokens,
output_tokens: result.output_tokens,
cache_tokens: result.cache_read_tokens,
duration_ms,
iteration: iteration.unwrap_or(0),
});
return Ok(result);
}
Err(error) => {
let retryable = is_retryable_llm_error(&error);
let can_retry = retryable && attempt < retry_config.retries;
let status = if can_retry {
"retrying"
} else if retryable {
"retries_exhausted"
} else {
"error"
};
annotate_current_span(&[
("status", serde_json::json!(status)),
("error", serde_json::json!(error.to_string())),
("retryable", serde_json::json!(retryable)),
("attempt", serde_json::json!(attempt)),
]);
if let Some(b) = bridge {
b.send_call_end(
&call_id,
"llm",
"llm_call",
duration_ms,
status,
serde_json::json!({
"error": error.to_string(),
"retryable": retryable,
"attempt": attempt,
"user_visible": user_visible,
}),
);
}
if !can_retry {
if let Some(metrics) = crate::active_metrics_registry() {
metrics.record_llm_call(&opts.provider, &opts.model, status, 0.0);
}
return Err(error);
}
if is_empty_completion_retry_error(&error) {
append_llm_observability_entry(
"empty_completion_retry",
serde_json::Map::from_iter([
(
"iteration".to_string(),
serde_json::json!(iteration.unwrap_or(0)),
),
("attempt".to_string(), serde_json::json!(attempt + 1)),
("provider".to_string(), serde_json::json!(opts.provider)),
("model".to_string(), serde_json::json!(opts.model)),
("error".to_string(), serde_json::json!(error.to_string())),
]),
);
super::trace::emit_agent_event(
super::trace::AgentTraceEvent::EmptyCompletionRetry {
iteration: iteration.unwrap_or(0),
attempt: attempt + 1,
error: error.to_string(),
},
);
}
attempt += 1;
let backoff = llm_retry_backoff_ms(&error, retry_config, attempt, &opts.provider);
crate::events::log_warn(
"llm",
&format!(
"LLM call failed ({}), retrying in {}ms (attempt {}/{})",
error, backoff, attempt, retry_config.retries
),
);
if backoff > 0 {
tokio::time::sleep(std::time::Duration::from_millis(backoff)).await;
}
}
}
}
}
#[cfg(test)]
mod retry_tests {
use super::*;
use crate::value::{ErrorCategory, VmError, VmValue};
use std::rc::Rc;
fn thrown(s: &str) -> VmError {
VmError::Thrown(VmValue::String(Rc::from(s)))
}
fn categorized(msg: &str, category: ErrorCategory) -> VmError {
VmError::CategorizedError {
message: msg.to_string(),
category,
}
}
#[test]
fn mock_provider_retry_backoff_is_zero() {
let config = LlmRetryConfig {
retries: 1,
backoff_ms: 2000,
};
assert_eq!(
llm_retry_backoff_ms(&thrown("HTTP 503"), &config, 1, "mock"),
0
);
}
#[test]
fn categorized_overloaded_is_retryable() {
assert!(is_retryable_llm_error(&categorized(
"upstream overloaded",
ErrorCategory::Overloaded
)));
}
#[test]
fn categorized_server_error_is_retryable() {
assert!(is_retryable_llm_error(&categorized(
"500 internal",
ErrorCategory::ServerError
)));
}
#[test]
fn categorized_transient_network_is_retryable() {
assert!(is_retryable_llm_error(&categorized(
"reset",
ErrorCategory::TransientNetwork
)));
}
#[test]
fn categorized_auth_not_retryable() {
assert!(!is_retryable_llm_error(&categorized(
"invalid key",
ErrorCategory::Auth
)));
}
#[test]
fn llm_error_kind_dict_gates_retry() {
let transient =
VmError::Thrown(VmValue::Dict(Rc::new(std::collections::BTreeMap::from([
("kind".to_string(), VmValue::String(Rc::from("transient"))),
(
"reason".to_string(),
VmValue::String(Rc::from("network_error")),
),
]))));
assert!(is_retryable_llm_error(&transient));
let terminal = VmError::Thrown(VmValue::Dict(Rc::new(std::collections::BTreeMap::from([
("kind".to_string(), VmValue::String(Rc::from("terminal"))),
(
"reason".to_string(),
VmValue::String(Rc::from("context_overflow")),
),
]))));
assert!(!is_retryable_llm_error(&terminal));
}
#[test]
fn context_overflow_message_is_not_retryable() {
assert!(!is_retryable_llm_error(&thrown(
"local HTTP 400 Bad Request [context_overflow]: prompt is too long"
)));
}
#[test]
fn http_503_is_retryable_via_classifier() {
assert!(is_retryable_llm_error(&thrown(
"HTTP 503 Service Unavailable"
)));
}
#[test]
fn http_504_is_retryable() {
assert!(is_retryable_llm_error(&thrown("HTTP 504 Gateway Timeout")));
}
#[test]
fn http_529_is_retryable() {
assert!(is_retryable_llm_error(&thrown("HTTP 529 overloaded_error")));
}
#[test]
fn bad_gateway_string_is_retryable() {
assert!(is_retryable_llm_error(&thrown("bad gateway response")));
}
#[test]
fn service_unavailable_string_is_retryable() {
assert!(is_retryable_llm_error(&thrown("service unavailable")));
}
#[test]
fn auth_error_not_retryable() {
assert!(!is_retryable_llm_error(&thrown("HTTP 401 Unauthorized")));
}
#[test]
fn retry_after_integer_seconds() {
assert_eq!(parse_retry_after("err: retry-after: 5"), Some(5_000));
}
#[test]
fn retry_after_fractional_seconds() {
assert_eq!(parse_retry_after("retry-after: 2.5"), Some(2_500));
}
#[test]
fn retry_after_clamped_to_cap() {
assert_eq!(parse_retry_after("retry-after: 600"), Some(60_000));
}
#[test]
fn retry_after_http_date_past_is_zero() {
let past = "retry-after: Mon, 01 Jan 1990 00:00:00 GMT";
assert_eq!(parse_retry_after(past), Some(0));
}
#[test]
fn retry_after_missing_returns_none() {
assert_eq!(parse_retry_after("nothing here"), None);
}
#[test]
fn retry_after_malformed_returns_none() {
assert_eq!(parse_retry_after("retry-after: soon-ish"), None);
}
}
#[cfg(test)]
mod streaming_detector_tests {
use std::cell::RefCell;
use std::rc::Rc;
use crate::agent_events::{AgentEvent, ToolCallStatus};
use super::{run_detector_loop_with_sink, StreamingDetectorContext};
async fn drive(session_id: &str, known: &[&str], chunks: &[&str]) -> Vec<AgentEvent> {
let captured: Rc<RefCell<Vec<AgentEvent>>> = Rc::new(RefCell::new(Vec::new()));
let sink_buf = captured.clone();
let known_tools = known.iter().map(|s| (*s).to_string()).collect();
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<String>();
for chunk in chunks {
tx.send((*chunk).to_string()).expect("send delta");
}
drop(tx);
run_detector_loop_with_sink(
StreamingDetectorContext {
session_id: session_id.to_string(),
known_tools,
},
rx,
move |event| sink_buf.borrow_mut().push(event.clone()),
)
.await;
let events = captured.borrow().clone();
events
}
#[tokio::test(flavor = "current_thread")]
async fn detector_loop_emits_start_and_promoted_through_sink() {
let events = drive(
"session-stream-promote",
&["read"],
&["read({ path: \"a.md\" })"],
)
.await;
assert_eq!(
events.len(),
2,
"expected start + promoted, got: {events:#?}"
);
match &events[0] {
AgentEvent::ToolCall {
parsing,
tool_name,
status,
..
} => {
assert_eq!(*parsing, Some(true));
assert_eq!(tool_name, "read");
assert_eq!(*status, ToolCallStatus::Pending);
}
other => panic!("expected ToolCall, got {other:?}"),
}
match &events[1] {
AgentEvent::ToolCallUpdate {
parsing,
status,
error_category,
..
} => {
assert_eq!(*parsing, Some(false));
assert_eq!(*status, ToolCallStatus::Pending);
assert!(error_category.is_none());
}
other => panic!("expected ToolCallUpdate, got {other:?}"),
}
}
#[tokio::test(flavor = "current_thread")]
async fn detector_loop_finalizes_unclosed_tagged_block_as_aborted() {
let events = drive(
"session-stream-abort",
&["run"],
&["<tool_call>\nrun({ command: \"ls\""],
)
.await;
assert_eq!(events.len(), 2, "events={events:#?}");
match &events[1] {
AgentEvent::ToolCallUpdate {
status,
error_category,
parsing,
..
} => {
assert_eq!(*status, ToolCallStatus::Failed);
assert_eq!(
*error_category,
Some(crate::agent_events::ToolCallErrorCategory::ParseAborted)
);
assert_eq!(*parsing, Some(false));
}
other => panic!("expected ToolCallUpdate, got {other:?}"),
}
}
}