// @harn-entrypoint-category llm.stdlib
//
// std/llm/handlers — composable middleware around the (call) -> envelope
// caller seam wired into agent_loop. Each `with_*` returns a NEW caller
// closure that wraps `next`. Compose left-to-right via `compose`.
//
// Caller shape:
// call = {prompt, system, opts, turn: {iteration, session_id, attempt}}
// result = {ok: true, value: <llm dict>}
// | {ok: false, status: <reserved>, error?, retryable?}
//
// Reserved statuses: "budget_exhausted", "transport_error", "caller_aborted",
// "caller_skipped", "exception", "schema_validation", "rate_limited",
// "timeout", "network", "provider_5xx", "stream_interrupt",
// "context_window_exceeded", "auth", "policy_blocked", "circuit_open".
import { agent_emit_event } from "std/agent/state"
import { cache_get, cache_put } from "std/cache"
import { llm_call_options } from "std/llm/options"
import { with_case_insensitive_keys } from "std/llm/safe"
/**
* default_llm_caller returns a closure with the canonical (call) -> result
* shape. It mirrors the built-in __default_invoke_llm body in std/agent/loop.
* Use as the bottom of a middleware composition:
*
* let caller = with_retry(default_llm_caller(), {...})
*/
pub fn default_llm_caller() {
return { call ->
let provider = __default_caller_render_context_provider(call)
let model = __default_caller_render_context_model(call)
let pushed = __push_llm_render_context(provider, model)
defer {
if pushed {
__pop_llm_render_context()
}
}
let result = try {
llm_call(call.prompt, call.system, llm_call_options(call.opts))
}
if !is_err(result) {
return {ok: true, value: unwrap(result)}
}
let err = unwrap_err(result)
let reason = if type_of(err) == "dict" {
err?.reason ?? ""
} else {
""
}
if reason == "budget_exceeded" {
return {ok: false, status: "budget_exhausted"}
}
return {ok: false, status: "exception", error: err}
}
}
/**
* Provider/model accessors for the ambient LLM render-context frame
* published around `default_llm_caller`. Named helpers keep the
* closure body readable and make the option-shape contract easy to
* extend (e.g. dispatch on `model_tier` if a future caller wants to
* publish the context before resolution).
*/
fn __default_caller_render_context_provider(call) -> string {
return to_string(call?.opts?.provider ?? "")
}
fn __default_caller_render_context_model(call) -> string {
return to_string(call?.opts?.model ?? "")
}
// -------------------------------------------------------------------------------------------------
// shared helpers
// -------------------------------------------------------------------------------------------------
fn __opts_dict(opts) {
if type_of(opts) == "dict" {
return opts
}
return {}
}
var __llm_handler_circuits = []
fn __llm_handler_is_callable(value) -> bool {
let kind = type_of(value)
return kind == "function" || kind == "closure" || kind == "fn"
}
fn __circuit_name_for(call) -> string {
let opts = call?.opts ?? {}
let provider = to_string(opts?.provider ?? "<unset>")
let model = to_string(opts?.model ?? "<unset>")
return "llm:" + provider + ":" + model
}
fn __ensure_llm_handler_circuit(name, threshold, reset_ms) {
if !contains(__llm_handler_circuits, name) {
circuit_breaker(name, threshold, reset_ms)
__llm_handler_circuits = __llm_handler_circuits + [name]
}
}
fn __circuit_open_error(call, name) {
let opts = call?.opts ?? {}
return {
kind: "terminal",
reason: "circuit_open",
category: "circuit_open",
message: "circuit open: " + name,
provider: opts?.provider ?? "<unset>",
model: opts?.model ?? "<unset>",
circuit: name,
}
}
fn __llm_handler_should_skip_cache(prompt, system, opts) -> bool {
let predicate = opts?.skip_when
if predicate == nil {
return opts?.tools != nil
}
if __llm_handler_is_callable(predicate) {
return predicate({prompt: prompt, system: system, options: opts}) ? true : false
}
return predicate ? true : false
}
fn __llm_handler_cache_options(opts) -> dict {
// LLM caches default to a 10-minute TTL when the caller has not supplied
// any TTL-equivalent. Deterministic builtins set their own.
var cache_opts = {store: opts?.store ?? "llm.with_cache"}
if opts?.ttl != nil {
cache_opts = cache_opts + {ttl: opts.ttl}
} else if opts?.ttl_seconds == nil && opts?.max_age_seconds == nil {
cache_opts = cache_opts + {ttl: "10m"}
}
for key in [
"backend",
"namespace",
"name",
"path",
"cache_dir",
"ttl_seconds",
"max_age_seconds",
"max_entries",
] {
if opts?[key] != nil {
cache_opts = cache_opts + {[key]: opts[key]}
}
}
return cache_opts
}
fn __safe_invoke(next, call) {
let envelope = try {
next(call)
}
if is_err(envelope) {
return {ok: false, status: "exception", error: unwrap_err(envelope)}
}
let value = unwrap(envelope)
if type_of(value) != "dict" {
return {ok: false, status: "exception", error: "caller returned non-dict"}
}
return value
}
/*
* Partial-application support for the canonical `(next, opts) -> caller`
* middleware shape. Every middleware in this module starts its body
* with `if !__llm_handler_is_callable(first) { return __partial(impl,
* first) }` so call sites can use either form interchangeably:
*
* with_X(next, opts) // direct: returns a caller
* with_X(opts) // curried: returns fn(next) -> caller, drops into compose
*
* Without this, `compose([with_logging({...})])` silently treats the
* opts dict as `next` and fails at the first invocation.
*/
fn __partial(handler_impl, opts) {
return { next -> handler_impl(next, opts) }
}
fn __emit_event(session_id, name, payload) {
if to_string(session_id) == "" {
return
}
let _ = try {
agent_emit_event(session_id, name, payload)
}
}
fn __retry_default_predicate(envelope) {
if type_of(envelope) != "dict" {
return false
}
if envelope?.ok ?? false {
return false
}
let status = to_string(envelope?.status ?? "")
let retryable_statuses = ["transient", "rate_limited", "timeout", "exception", "network", "provider_5xx", "stream_interrupt"]
let never_retry = [
"schema_validation",
"auth",
"budget_exhausted",
"context_window_exceeded",
"policy_blocked",
"caller_aborted",
"caller_skipped",
"circuit_open",
]
if contains(never_retry, status) {
return false
}
if contains(retryable_statuses, status) {
return true
}
// explicit retryable hint wins for unknown statuses
if envelope?.retryable ?? false {
return true
}
return false
}
fn __retry_after_ms(envelope) {
if type_of(envelope) != "dict" {
return 0
}
let err = envelope?.error
if type_of(err) == "dict" {
let ra = err?.retry_after_ms
if ra != nil {
return to_int(ra)
}
let headers = err?.headers
if type_of(headers) == "dict" {
for k in headers.keys() {
if lowercase(to_string(k)) == "retry-after" {
let v = to_string(headers[k])
if v != "" {
return to_int(v) * 1000
}
}
}
}
}
return 0
}
fn __backoff_delay(attempt, base_ms, max_ms, backoff, jitter) {
if attempt < 1 {
return 0
}
var delay = base_ms
if backoff == "linear" {
delay = base_ms * attempt
} else {
// exponential (default) and jittered share the same base curve
var factor = 1
var i = 1
while i < attempt {
factor = factor * 2
i = i + 1
}
delay = base_ms * factor
}
if delay > max_ms {
delay = max_ms
}
if backoff == "jittered" || jitter == "full" {
let r = random()
delay = to_int(r * to_float(delay))
} else if jitter == "equal" {
let half = delay / 2
let r = random()
delay = half + to_int(r * to_float(half))
}
if delay < 0 {
delay = 0
}
return delay
}
// -------------------------------------------------------------------------------------------------
// with_retry
// -------------------------------------------------------------------------------------------------
/**
* with_retry(next, opts) -> caller
*
* Wraps `next` with bounded retry. Default opts:
* {max_attempts: 3, base_ms: 250, max_ms: 8000,
* backoff: "exponential", jitter: "full", honor_retry_after: true}
*
* The default predicate retries on these statuses:
* transient, rate_limited, timeout, exception, network,
* provider_5xx, stream_interrupt
*
* The default predicate NEVER retries:
* schema_validation, auth, budget_exhausted,
* context_window_exceeded, policy_blocked
*
* `opts.predicate(envelope) -> bool` overrides the default.
* Honors `error.retry_after_ms` and case-insensitive `Retry-After` header
* when honor_retry_after is true.
*
* Returns the LAST envelope unchanged plus `retries_attempted: N`.
* Never throws — raw throws from `next` become {ok: false, status: "exception"}.
*/
pub fn with_retry(next, opts = nil) {
if !__llm_handler_is_callable(next) {
return __partial(with_retry, next)
}
let cfg = __opts_dict(opts)
let max_attempts = cfg?.max_attempts ?? 3
let base_ms = cfg?.base_ms ?? 250
let max_ms = cfg?.max_ms ?? 8000
let backoff = cfg?.backoff ?? "exponential"
let jitter = cfg?.jitter ?? "full"
let honor_retry_after = cfg?.honor_retry_after ?? true
let predicate = cfg?.predicate
return { call ->
let base_turn = call?.turn ?? {iteration: 0, session_id: "", attempt: 1}
var attempt = 1
var last_envelope = {ok: false, status: "exception", error: "with_retry: no attempts"}
while attempt <= max_attempts {
let attempt_call = call + {turn: base_turn + {attempt: attempt}}
let envelope = __safe_invoke(next, attempt_call)
last_envelope = envelope
let should_retry = if predicate != nil {
let r = try {
predicate(envelope)
}
if is_err(r) {
false
} else {
unwrap(r)
}
} else {
__retry_default_predicate(envelope)
}
if !should_retry || attempt >= max_attempts {
return envelope + {retries_attempted: attempt - 1}
}
let header_delay = if honor_retry_after {
__retry_after_ms(envelope)
} else {
0
}
let backoff_delay = __backoff_delay(attempt, base_ms, max_ms, backoff, jitter)
let delay = if header_delay > backoff_delay {
header_delay
} else {
backoff_delay
}
if delay > 0 {
sleep_ms(delay)
}
attempt = attempt + 1
}
return last_envelope + {retries_attempted: max_attempts - 1}
}
}
// -------------------------------------------------------------------------------------------------
// with_fallback
// -------------------------------------------------------------------------------------------------
/**
* with_fallback(callers) -> caller
*
* Try each caller in `callers` (a list of caller closures) in order; advance
* on {ok: false}. Emits an `llm_fallback_attempt` event per attempt when
* call.turn.session_id is non-empty.
*
* On success: result + {fallback_index, fallback_total}.
* On all-fail: last envelope + {fallback_total}.
*/
pub fn with_fallback(callers) {
let total = if type_of(callers) == "list" {
len(callers)
} else {
0
}
return { call ->
if total == 0 {
return {ok: false, status: "caller_skipped", error: "with_fallback: empty caller list"}
}
var last_envelope = {ok: false, status: "caller_skipped"}
var idx = 0
let session_id = to_string(call?.turn?.session_id ?? "")
while idx < total {
let inner = callers[idx]
let envelope = __safe_invoke(inner, call)
last_envelope = envelope
__emit_event(
session_id,
"llm_fallback_attempt",
{
fallback_index: idx,
fallback_total: total,
ok: envelope?.ok ?? false,
status: to_string(envelope?.status ?? ""),
},
)
if envelope?.ok ?? false {
return envelope + {fallback_index: idx, fallback_total: total}
}
idx = idx + 1
}
return last_envelope + {fallback_total: total}
}
}
fn __envelope_text(env) {
if !(env?.ok ?? false) {
return ""
}
return to_string(env?.value?.text ?? "")
}
// -------------------------------------------------------------------------------------------------
// with_shadow
// -------------------------------------------------------------------------------------------------
/**
* with_shadow(primary, shadow, opts) -> caller
*
* Run primary and shadow concurrently via `parallel each` over a 2-element
* list. Always returns the PRIMARY envelope. Options:
* - sampler: closure(call) -> bool (default: always true)
* - on_diff: closure(primary_env, shadow_env) -> nil
* - diff_when: "any" | "ok_only" (default "any")
*
* Emits `llm_shadow_diff` event with diff metadata when both envelopes
* differ and call.turn.session_id is non-empty.
*/
pub fn with_shadow(primary, shadow, opts = nil) {
let cfg = __opts_dict(opts)
let sampler = cfg?.sampler
let on_diff = cfg?.on_diff
let diff_when = cfg?.diff_when ?? "any"
return { call ->
let sample = if sampler != nil {
let r = try {
sampler(call)
}
if is_err(r) {
false
} else {
let v = unwrap(r)
if v == nil {
false
} else {
!!v
}
}
} else {
true
}
if !sample {
return __safe_invoke(primary, call)
}
let pair = [primary, shadow]
let results = parallel each pair { c ->
__safe_invoke(c, call)
}
let primary_env = results[0]
let shadow_env = results[1]
let primary_text = __envelope_text(primary_env)
let shadow_text = __envelope_text(shadow_env)
let both_ok = primary_env?.ok ?? false && shadow_env?.ok ?? false
let differs = primary_text != shadow_text
let should_compare = if diff_when == "ok_only" {
both_ok && differs
} else {
differs
}
if should_compare {
__emit_event(
to_string(call?.turn?.session_id ?? ""),
"llm_shadow_diff",
{
primary_ok: primary_env?.ok ?? false,
shadow_ok: shadow_env?.ok ?? false,
primary_status: to_string(primary_env?.status ?? "ok"),
shadow_status: to_string(shadow_env?.status ?? "ok"),
primary_len: len(primary_text),
shadow_len: len(shadow_text),
},
)
if on_diff != nil {
let _ = try {
on_diff(primary_env, shadow_env)
}
}
}
return primary_env
}
}
// -------------------------------------------------------------------------------------------------
// with_prompt_rewrite
// -------------------------------------------------------------------------------------------------
/**
* with_prompt_rewrite(next, rewriter) -> caller
*
* `rewriter(prompt, system, opts) -> {prompt?, system?, opts?}` (any subset).
* Missing keys fall back to the original. Then delegates to `next` with the
* rewritten call. Preserves `call.turn`.
*/
pub fn with_prompt_rewrite(next, rewriter) {
return { call ->
let rewritten = try {
rewriter(call?.prompt, call?.system, call?.opts)
}
if is_err(rewritten) {
return {ok: false, status: "exception", error: unwrap_err(rewritten)}
}
let patch = unwrap(rewritten)
let patch_dict = if type_of(patch) == "dict" {
patch
} else {
{}
}
let new_prompt = if patch_dict?.prompt != nil {
patch_dict.prompt
} else {
call?.prompt
}
let new_system = if patch_dict?.system != nil {
patch_dict.system
} else {
call?.system
}
let new_opts = if patch_dict?.opts != nil {
patch_dict.opts
} else {
call?.opts
}
let new_call = {
prompt: new_prompt,
system: new_system,
opts: new_opts,
turn: call?.turn ?? {iteration: 0, session_id: "", attempt: 1},
}
return __safe_invoke(next, new_call)
}
}
// -------------------------------------------------------------------------------------------------
// with_logging
// -------------------------------------------------------------------------------------------------
/**
* with_logging(next, opts) -> caller
*
* Wraps `next` with structured logging per call. Options:
* - level: "debug" | "info" | "warn" (default "info")
* - include_prompt: bool (default false; PII)
* - sink: closure(record) -> nil (optional callback)
*
* Emits `llm_call_log` event when call.turn.session_id is non-empty.
*/
pub fn with_logging(next, opts = nil) {
if !__llm_handler_is_callable(next) {
return __partial(with_logging, next)
}
let cfg = __opts_dict(opts)
let level = cfg?.level ?? "info"
let include_prompt = cfg?.include_prompt ?? false
let sink = cfg?.sink
return { call ->
let start_ms = now_ms()
let envelope = __safe_invoke(next, call)
let call_opts = __opts_dict(call?.opts)
let status = if envelope?.ok ?? false {
"ok"
} else {
to_string(envelope?.status ?? "unknown")
}
var record = {
event: "llm_call_log",
level: level,
latency_ms: now_ms() - start_ms,
model: to_string(call_opts?.model ?? ""),
provider: to_string(call_opts?.provider ?? ""),
status: status,
iteration: to_int(call?.turn?.iteration ?? 0),
attempt: to_int(call?.turn?.attempt ?? 1),
}
if include_prompt {
record = record + {prompt: to_string(call?.prompt ?? ""), system: to_string(call?.system ?? "")}
}
__emit_event(to_string(call?.turn?.session_id ?? ""), "llm_call_log", record)
if sink != nil {
let _ = try {
sink(record)
}
}
return envelope
}
}
fn __budget_check(limit_name, max_value, observed, on_exceed) {
if max_value == nil || observed < to_int(max_value) {
return nil
}
if on_exceed == "throw" {
throw "with_budget: " + limit_name + " exceeded"
}
return {
ok: false,
status: "budget_exhausted",
error: {limit: limit_name, value: max_value, observed: observed},
}
}
// -------------------------------------------------------------------------------------------------
// with_budget
// -------------------------------------------------------------------------------------------------
/**
* with_budget(next, opts) -> caller
*
* Tracks per-caller-instance usage across calls.
*
* IMPORTANT: Harn closures capture by VALUE, so we cannot persist a free-form
* dict across calls. Counters are instead held as `atomic` handles, whose
* underlying Arc<AtomicI64> is shared across closure invocations. This means
* with_budget tracks INTEGER counters (tokens, calls, micro-cents) only.
*
* Options (all optional):
* - max_total_tokens: int (input + output)
* - max_input_tokens: int
* - max_output_tokens: int
* - max_calls: int
* - max_cost_usd: float (compared against atomic micro-cents counter)
* - on_exceed: "throw" | "short_circuit" (default "short_circuit")
*
* Cost via the unstable `pricing_per_1k_for(provider, model)` builtin. If
* that builtin is not yet exposed to Harn (currently Rust-only), with_budget
* silently skips cost accounting — only token/call limits are enforced.
*
* On exceed:
* - "short_circuit": returns {ok: false, status: "budget_exhausted"}
* - "throw": throws (propagates to caller)
*/
pub fn with_budget(next, opts = nil) {
if !__llm_handler_is_callable(next) {
return __partial(with_budget, next)
}
let cfg = __opts_dict(opts)
let max_total_tokens = cfg?.max_total_tokens
let max_input_tokens = cfg?.max_input_tokens
let max_output_tokens = cfg?.max_output_tokens
let max_calls = cfg?.max_calls
let on_exceed = cfg?.on_exceed ?? "short_circuit"
let total_in = atomic(0)
let total_out = atomic(0)
let total_calls = atomic(0)
return { call ->
let calls_so_far = atomic_get(total_calls)
let in_so_far = atomic_get(total_in)
let out_so_far = atomic_get(total_out)
let exhaustion = __budget_check("max_calls", max_calls, calls_so_far, on_exceed)
?? __budget_check("max_input_tokens", max_input_tokens, in_so_far, on_exceed)
?? __budget_check("max_output_tokens", max_output_tokens, out_so_far, on_exceed)
?? __budget_check("max_total_tokens", max_total_tokens, in_so_far + out_so_far, on_exceed)
if exhaustion != nil {
return exhaustion
}
let envelope = __safe_invoke(next, call)
let _ = atomic_add(total_calls, 1)
if envelope?.ok ?? false && type_of(envelope?.value) == "dict" {
let usage = __opts_dict(envelope.value?.usage)
let in_tokens = to_int(usage?.input_tokens ?? envelope.value?.input_tokens ?? 0)
let out_tokens = to_int(usage?.output_tokens ?? envelope.value?.output_tokens ?? 0)
let _ = atomic_add(total_in, in_tokens)
let _ = atomic_add(total_out, out_tokens)
}
return envelope
}
}
// -------------------------------------------------------------------------------------------------
// with_cache
// -------------------------------------------------------------------------------------------------
/** llm_cache_key returns the canonical sha256 cache key used by with_cache. */
pub fn llm_cache_key(prompt, system = nil, options = nil) -> string {
return __llm_cache_key(prompt, system, options ?? {})
}
/**
* with_cache supports both public cache forms:
*
* with_cache(prompt, system?, options?) -> llm_call envelope
* with_cache(next, opts?) -> caller (composable middleware)
*
* Both forms share the same on-disk cache namespaced under
* `llm.with_cache` by default and key on the canonical
* `llm_cache_key(prompt, system, opts)` sha256. When a session_id is
* available — `call.turn.session_id` for the caller form,
* `options.session_id` for the direct form — both forms emit
* `cache_hit` / `cache_miss` events on the agent event tape. On hits,
* the wrapper records "model calls avoided" + "tokens saved" +
* "latency saved" receipts pulled from the cached envelope so the
* persona value ledger (harn-cloud#58) and crystallization receipts
* can read them back.
*/
pub fn with_cache(first, second = nil, third = nil) {
if __llm_handler_is_callable(first) {
let next = first
let cfg = __opts_dict(second)
return { call ->
let opts = __opts_dict(call?.opts)
if __llm_handler_should_skip_cache(call?.prompt, call?.system, opts) {
return __safe_invoke(next, call)
}
let merged_opts = opts + cfg
let session_id = to_string(call?.turn?.session_id ?? merged_opts?.session_id ?? "")
return __with_cache_run(
call?.prompt,
call?.system,
merged_opts,
session_id,
fn() { return __safe_invoke(next, call) },
__llm_handler_should_store_result,
)
}
}
let prompt = first
let system = second
let opts = third ?? {}
let session_id = to_string(opts?.session_id ?? "")
return __with_cache_run(
prompt,
system,
opts,
session_id,
fn() { return llm_call(prompt, system, llm_call_options(opts)) },
fn(result) { return __llm_handler_should_store_direct_result(result) },
)
}
fn __with_cache_run(prompt, system, opts, session_id, compute, should_store) {
if __llm_handler_should_skip_cache(prompt, system, opts) {
return compute()
}
let key = llm_cache_key(prompt, system, opts)
let cache_options = __llm_handler_cache_options(opts)
let cached = cache_get(key, cache_options)
if cached.hit {
__with_cache_emit_hit(session_id, key, cached, opts)
return cached.value
}
let started_ms = monotonic_ms()
let result = compute()
let compute_ms = monotonic_ms() - started_ms
if should_store(result) {
cache_put(key, result, cache_options)
__with_cache_emit_miss(session_id, key, cached, compute_ms)
}
return result
}
fn __llm_handler_should_store_direct_result(result) -> bool {
// Direct-call form returns the raw llm_call envelope; cache anything
// that is not an error envelope (i.e. has no `status` field).
if type_of(result) != "dict" {
return false
}
return !contains(result.keys(), "status")
}
fn __llm_handler_should_store_result(result) -> bool {
// The caller seam returns `{ok: true, value}` on success and
// `{ok: false, status: ...}` on failure. Only cache successes; failure
// envelopes are transient retry state, not durable answers.
if type_of(result) != "dict" {
return false
}
return result?.ok ?? false
}
fn __with_cache_hit_metrics_from_envelope(cached_value) -> dict {
var metrics = {model_calls_avoided: 1}
if type_of(cached_value) != "dict" {
return metrics
}
// The caller seam wraps the LLM dict in {ok: true, value: ...}; the
// direct-call form caches the LLM dict directly. Unwrap once so this
// helper reads the same shape either way.
let llm = if cached_value?.value != nil && type_of(cached_value.value) == "dict" {
cached_value.value
} else {
cached_value
}
let usage = llm?.usage
if type_of(usage) == "dict" {
var tokens_saved = 0
if usage?.input_tokens != nil {
tokens_saved = tokens_saved + to_int(usage.input_tokens)
}
if usage?.output_tokens != nil {
tokens_saved = tokens_saved + to_int(usage.output_tokens)
}
if tokens_saved > 0 {
metrics = metrics + {tokens_saved: tokens_saved}
}
}
if llm?.latency_ms != nil {
metrics = metrics + {latency_saved_ms: to_int(llm.latency_ms)}
}
return metrics
}
fn __with_cache_emit_hit(session_id, key, cached, opts) {
if session_id == "" {
return
}
__emit_event(
session_id,
"cache_hit",
{
key: key,
backend: cached?.backend ?? "",
namespace: cached?.namespace ?? "",
provider: opts?.provider ?? "",
model: opts?.model ?? "",
metrics: __with_cache_hit_metrics_from_envelope(cached?.value),
},
)
}
fn __with_cache_emit_miss(session_id, key, cached, compute_ms) {
if session_id == "" {
return
}
__emit_event(
session_id,
"cache_miss",
{
key: key,
backend: cached?.backend ?? "",
namespace: cached?.namespace ?? "",
metrics: {compute_ms: compute_ms},
},
)
}
// -------------------------------------------------------------------------------------------------
// with_circuit_breaker
// -------------------------------------------------------------------------------------------------
/**
* Wrap an LLM call handler with circuit-breaker protection.
*
* By default each invocation uses a circuit derived from the call's
* `(opts.provider, opts.model)` pair, so one failing upstream cannot poison
* other models routed through the same wrapper. Pass `name` to intentionally
* share one circuit across calls.
*/
pub fn with_circuit_breaker(handler, options = nil) {
if !__llm_handler_is_callable(handler) {
return __partial(with_circuit_breaker, handler)
}
let opts = options ?? {}
let threshold = opts?.threshold ?? 5
let reset_ms = opts?.reset_ms ?? 30000
return { call ->
let name = opts?.name ?? __circuit_name_for(call)
__ensure_llm_handler_circuit(name, threshold, reset_ms)
let state = circuit_check(name)
if state == "open" {
throw __circuit_open_error(call, name)
}
let outcome = try {
handler(call)
}
if is_err(outcome) {
circuit_record_failure(name)
throw unwrap_err(outcome)
}
let result = unwrap(outcome)
if type_of(result) == "dict" && contains(result.keys(), "ok") && !(result?.ok ?? false) {
circuit_record_failure(name)
} else {
circuit_record_success(name)
}
return result
}
}
// -------------------------------------------------------------------------------------------------
// with_repair
// -------------------------------------------------------------------------------------------------
fn __repair_default_nudge(error_dict) {
var lines = [
"Your previous response did not pass schema validation.",
"Re-emit valid JSON only:",
"- Use lowercase keys exactly as specified.",
"- Do not wrap in markdown fences.",
"- Do not include prose, commentary, or trailing text.",
]
if type_of(error_dict) == "dict" {
let detail = to_string(error_dict?.message ?? error_dict?.error ?? "")
if detail != "" {
lines = lines.push("Validator said: " + detail)
}
}
return join(lines, "\n")
}
fn __repair_apply_strategy(call, envelope, strategy, max_tokens, temperature) {
let nudge = if __llm_handler_is_callable(strategy) {
let r = try {
strategy(call?.prompt, call?.system, call?.opts, envelope)
}
if is_err(r) {
__repair_default_nudge(envelope?.error)
} else {
let v = unwrap(r)
if type_of(v) == "string" && v != "" {
v
} else {
__repair_default_nudge(envelope?.error)
}
}
} else if type_of(strategy) == "string" && strategy != "" {
strategy
} else {
__repair_default_nudge(envelope?.error)
}
let original_prompt = to_string(call?.prompt ?? "")
var repaired_opts = __opts_dict(call?.opts)
if max_tokens != nil {
repaired_opts = repaired_opts + {max_tokens: to_int(max_tokens)}
}
if temperature != nil {
repaired_opts = repaired_opts + {temperature: temperature}
}
return {
prompt: original_prompt + "\n\n" + nudge,
system: call?.system,
opts: repaired_opts,
turn: call?.turn ?? {iteration: 0, session_id: "", attempt: 1},
}
}
/**
* with_repair(next, opts) -> caller
*
* One-shot repair pass on `schema_validation` failures. When `next(call)`
* returns `{ok: false, status: "schema_validation", error}`, the wrapper
* appends a corrective nudge to `call.prompt`, lowers `max_tokens` and
* `temperature` for the repair pass, and asks `next` once more. The
* second envelope is returned with `repair_attempted: true`.
*
* Options:
* - strategy: string | closure(prompt, system, opts, envelope) -> string
* Custom corrective nudge. Falls back to a deterministic
* schema-failure message that lists the validator's hint.
* - max_tokens: int (default 600) — applied to the repair-pass opts
* - temperature: float (default 0.0)
* - enabled: bool (default true) — when false, behaves as a no-op pass
*
* Other failure statuses pass through unchanged. Successful responses
* pass through untouched. Schema-retry loops live in
* `llm_call_structured_result`; this handler is the caller-seam form for
* unstructured callers that surface `schema_validation` themselves.
*/
pub fn with_repair(next, opts = nil) {
if !__llm_handler_is_callable(next) {
return __partial(with_repair, next)
}
let cfg = __opts_dict(opts)
let enabled = cfg?.enabled ?? true
let strategy = cfg?.strategy
let max_tokens = cfg?.max_tokens ?? 600
let temperature = cfg?.temperature ?? 0.0
return { call ->
let first = __safe_invoke(next, call)
if !enabled {
return first
}
if first?.ok ?? false {
return first
}
let status = to_string(first?.status ?? "")
if status != "schema_validation" {
return first
}
let repair_call = __repair_apply_strategy(call, first, strategy, max_tokens, temperature)
let second = __safe_invoke(next, repair_call)
return second + {repair_attempted: true}
}
}
// -------------------------------------------------------------------------------------------------
// with_coerce
// -------------------------------------------------------------------------------------------------
fn __coerce_value_dict(value, lower_keys) {
if !lower_keys {
return value
}
return with_case_insensitive_keys(value)
}
/**
* with_coerce(next, opts) -> caller
*
* Normalize successful envelopes for downstream consumers. By default
* recursively lowercases keys on `value.data` (when present) so callers
* can read fields case-insensitively without per-site `dict_get_ci`
* dances. The original `value.data` is preserved on the envelope as
* `raw_data`; the lowercase form lives at `value.data` and is also
* surfaced at the top-level `value` field for parity with
* `safe_structured_call`.
*
* Options:
* - lower_keys: bool (default true) — recursively lowercase dict keys
* - on_text_json: bool (default false) — when value.data is missing
* and value.text looks like JSON, parse + normalize it
* into envelope.value
*
* Failure envelopes pass through unchanged.
*/
pub fn with_coerce(next, opts = nil) {
if !__llm_handler_is_callable(next) {
return __partial(with_coerce, next)
}
let cfg = __opts_dict(opts)
let lower_keys = cfg?.lower_keys ?? true
let on_text_json = cfg?.on_text_json ?? false
return { call ->
let envelope = __safe_invoke(next, call)
if !(envelope?.ok ?? false) {
return envelope
}
let value = if type_of(envelope?.value) == "dict" {
envelope.value
} else {
{}
}
var augmented = envelope
if type_of(value?.data) == "dict" {
let normalized = __coerce_value_dict(value.data, lower_keys)
augmented = augmented + {value: value + {data: normalized, raw_data: value.data}}
} else if on_text_json {
let text = to_string(value?.text ?? "")
if text != "" {
let parsed = try {
json_parse(text)
}
if !is_err(parsed) {
let unwrapped = unwrap(parsed)
if type_of(unwrapped) == "dict" {
let normalized = __coerce_value_dict(unwrapped, lower_keys)
augmented = augmented + {value: value + {data: normalized}}
}
}
}
}
return augmented
}
}
// -------------------------------------------------------------------------------------------------
// with_timeout
// -------------------------------------------------------------------------------------------------
fn __timeout_resolve_ms(opts) {
if type_of(opts) == "int" {
return opts
}
if type_of(opts) == "float" {
return to_int(opts)
}
if type_of(opts) != "dict" {
return 0
}
if opts?.ms != nil {
return to_int(opts.ms)
}
if opts?.seconds != nil {
return to_int(opts.seconds * 1000.0)
}
if opts?.duration_ms != nil {
return to_int(opts.duration_ms)
}
return 0
}
/**
* with_timeout(next, opts) -> caller
*
* Soft, clock-aware deadline. Three things happen on each call:
*
* 1. The deadline is forwarded to `call.opts.timeout_ms` so providers
* that respect it (HTTP clients, Ollama keep-alive, Anthropic
* `request_timeout`) can cancel mid-flight.
* 2. `now_ms()` measures elapsed time; success envelopes that overran
* the deadline are converted to
* `{ok: false, status: "timeout", error: {timeout_ms, elapsed_ms}}`.
* 3. Failure envelopes whose elapsed time exceeded the budget are
* relabeled `status: "timeout"` so retry / fallback policies treat
* them uniformly.
*
* Honors the unified clock — `now_ms()` is mockable in tests via
* `mock_time` / `advance_time`.
*
* Options:
* - ms / seconds / duration_ms: deadline (one of). Bare int/float is
* interpreted as milliseconds for ergonomic call sites.
* - relabel_failures: bool (default true) — relabel slow failures to
* `timeout`. Set false to keep the original status code.
*/
pub fn with_timeout(next, opts = nil) {
if !__llm_handler_is_callable(next) {
return __partial(with_timeout, next)
}
let timeout_ms = __timeout_resolve_ms(opts)
let cfg = if type_of(opts) == "dict" {
opts
} else {
{}
}
let relabel = cfg?.relabel_failures ?? true
return { call ->
let original_opts = __opts_dict(call?.opts)
let injected = if timeout_ms > 0 && original_opts?.timeout_ms == nil {
original_opts + {timeout_ms: timeout_ms}
} else {
original_opts
}
let deadline_call = call + {opts: injected}
let start = now_ms()
let envelope = __safe_invoke(next, deadline_call)
let elapsed = now_ms() - start
if timeout_ms <= 0 {
return envelope
}
if elapsed <= timeout_ms {
return envelope
}
if envelope?.ok ?? false {
return {
ok: false,
status: "timeout",
error: {timeout_ms: timeout_ms, elapsed_ms: elapsed},
elapsed_ms: elapsed,
}
}
if !relabel {
return envelope + {elapsed_ms: elapsed}
}
return envelope + {status: "timeout", elapsed_ms: elapsed}
}
}
// -------------------------------------------------------------------------------------------------
// with_routing
// -------------------------------------------------------------------------------------------------
fn __routing_resolve(routes, call) {
var idx = 0
let total = if type_of(routes) == "list" {
len(routes)
} else {
0
}
while idx < total {
let route = routes[idx]
let matched = if type_of(route) == "dict" {
let predicate = route?.when
if predicate == nil {
true
} else if __llm_handler_is_callable(predicate) {
let r = try {
predicate(call)
}
if is_err(r) {
false
} else {
let v = unwrap(r)
if v == nil {
false
} else {
!!v
}
}
} else {
!!predicate
}
} else {
false
}
if matched {
return {index: idx, caller: route?.caller, name: to_string(route?.name ?? "")}
}
idx = idx + 1
}
return {index: -1, caller: nil, name: ""}
}
/**
* with_routing(opts) -> caller
*
* Pre-call routing: pick the right caller before the request goes out.
* The canonical use case is **cheap-model-by-default with frontier
* escalation on ambiguity**:
*
* let cheap = with_retry(default_llm_caller(), {max_attempts: 2})
* let strong = with_retry(default_llm_caller(), {max_attempts: 3})
* let router = with_routing({
* default: cheap,
* routes: [
* {name: "frontier",
* when: { call -> call?.opts?.task_kind == "judge" || (call?.opts?.escalate ?? false) },
* caller: strong},
* ],
* })
*
* The first matching route wins; otherwise `opts.default` runs.
* Emits an `llm_routing_decision` event tagged with the route name and
* index so receipt sinks can audit cost decisions per call. Differs
* from `with_fallback`, which is post-failure: routing chooses ONE
* caller before any provider work happens.
*/
pub fn with_routing(opts) {
let cfg = __opts_dict(opts)
let default_caller = cfg?.default
let routes = if type_of(cfg?.routes) == "list" {
cfg.routes
} else {
[]
}
if !__llm_handler_is_callable(default_caller) {
throw "with_routing: opts.default must be a callable caller"
}
return { call ->
let pick = __routing_resolve(routes, call)
let chosen = if __llm_handler_is_callable(pick?.caller) {
pick.caller
} else {
default_caller
}
let route_name = if pick.index >= 0 {
pick.name
} else {
to_string(cfg?.default_name ?? "default")
}
__emit_event(
to_string(call?.turn?.session_id ?? ""),
"llm_routing_decision",
{route_index: pick.index, route_name: route_name, used_default: pick.index < 0},
)
let envelope = __safe_invoke(chosen, call)
return envelope + {route_index: pick.index, route_name: route_name}
}
}
// -------------------------------------------------------------------------------------------------
// compose
// -------------------------------------------------------------------------------------------------
/**
* compose(wrappers) -> fn(base) -> caller
*
* Harn does not (yet) support user-defined variadic functions, so `compose`
* accepts a LIST of wrappers. Each wrapper is `fn(next) -> caller`.
*
* Wrappers apply right-to-left so that
*
* compose([a, b, c])(base) == a(b(c(base)))
*
* Equivalently, the leftmost wrapper is the outermost.
*/
pub fn compose(wrappers) {
let list = if type_of(wrappers) == "list" {
wrappers
} else {
[]
}
return { base ->
var caller = base
var i = len(list) - 1
while i >= 0 {
let w = list[i]
caller = w(caller)
i = i - 1
}
return caller
}
}