// @harn-entrypoint-category agent.stdlib
import { agent_autocompact_if_needed } from "std/agent/autocompact"
import { agent_budget_post_call_blocked, agent_budget_pre_call_blocked } from "std/agent/budget"
import {
agent_loop_apply_command,
agent_loop_control_invoke,
agent_loop_snapshot_state,
} from "std/agent/control"
import { agent_daemon_snapshot, agent_daemon_step } from "std/agent/daemon"
import { agent_verify_or_continue } from "std/agent/judge"
import { agent_mcp_bootstrap_if_needed } from "std/agent/mcp"
import { agent_loop_options } from "std/agent/options"
import {
agent_apply_tool_surface_narrowing,
agent_compute_post_turn,
agent_reset_tool_surface_narrowing,
} from "std/agent/postturn"
import { agent_build_turn_messages, agent_build_turn_system_fragments } from "std/agent/preflight"
import { agent_dispatch_tool_batch, agent_dispatch_tool_call } from "std/agent/primitives"
import { agent_progress_apply_options } from "std/agent/progress"
import { native_tool_contract_feedback_prompt } from "std/agent/prompts"
import {
agent_required_tools_enforce,
agent_required_tools_inject_feedback,
agent_required_tools_missing_from_session,
} from "std/agent/required_tools"
import { agent_skills_match } from "std/agent/skills"
import {
agent_stall_apply_result,
agent_stall_done_judge_due,
agent_stall_initial_state,
agent_stall_inject_feedback,
agent_stall_observe_tool_calls,
} from "std/agent/stall"
import {
agent_emit_event,
agent_record_native_tool_fallback,
agent_reminder_providers_fire,
agent_session_drain_bridge_injections,
agent_session_drain_feedback,
agent_session_finalize,
agent_session_init,
agent_session_inject_feedback,
agent_session_messages,
agent_session_pop_last_assistant,
agent_session_record_assistant,
agent_session_record_tool_results,
agent_session_record_usage,
agent_session_totals,
} from "std/agent/state"
import { agent_step_judge } from "std/agent/step_judge"
import {
agent_tool_search_emit_queries,
agent_tool_search_inject_if_needed,
agent_tool_search_record_results,
} from "std/agent/tool_search"
import {
agent_await_resumption,
agent_lifecycle_tools,
list_agents,
suspend_agent,
} from "std/agent/workers"
import { omit } from "std/json"
import { llm_call_options } from "std/llm/options"
fn __default_invoke_llm(message, turn_system, llm_opts) {
let result = try {
llm_call(message, turn_system, llm_call_options(llm_opts))
}
if !is_err(result) {
return {ok: true, value: unwrap(result)}
}
let err = unwrap_err(result)
let normalized = __llm_provider_error(err, llm_opts)
let reason = if type_of(err) == "dict" {
err?.reason ?? ""
} else {
""
}
if reason == "budget_exceeded" {
return {ok: false, status: "budget_exhausted", stop_reason: "budget_exhausted", error: normalized}
}
return {ok: false, status: "provider_error", stop_reason: "provider_error", error: normalized}
}
fn __messages_include_tool_result(messages) {
if type_of(messages) != "list" {
return false
}
for msg in messages {
let role = to_string(msg?.role ?? "")
if role == "tool" || role == "tool_result" {
return true
}
}
return false
}
fn __llm_error_field(err, key, fallback) {
if type_of(err) == "dict" {
let value = err[key]
if value != nil {
return to_string(value)
}
}
return fallback
}
fn __llm_provider_error(err, llm_opts) {
let message = __llm_error_field(err, "message", to_string(err))
let category = __llm_error_field(err, "category", error_category(err))
let reason = __llm_error_field(err, "reason", "")
let kind = __llm_error_field(err, "kind", "")
let provider = __llm_error_field(err, "provider", to_string(llm_opts?.provider ?? ""))
let model = __llm_error_field(err, "model", to_string(llm_opts?.model ?? ""))
return {
category: category,
reason: reason,
kind: kind,
provider: provider,
model: model,
message: message,
phase: "llm_call",
tool_format: to_string(llm_opts?.tool_format ?? ""),
after_tool_result: __messages_include_tool_result(llm_opts?.messages),
}
}
fn __validate_caller_result(r) {
if type_of(r) != "dict" {
throw "agent_loop: llm_caller must return a dict; got " + type_of(r)
}
if r?.ok == nil {
throw "agent_loop: llm_caller result missing `ok`"
}
if r.ok && type_of(r?.value) != "dict" {
throw "agent_loop: llm_caller returned ok=true but `value` is not a dict"
}
if !r.ok && type_of(r?.status) != "string" {
throw "agent_loop: llm_caller returned ok=false but `status` is not a string"
}
}
fn __invoke_llm(message, turn_system, llm_opts) {
let caller = llm_opts?._llm_caller
if caller == nil {
return __default_invoke_llm(message, turn_system, llm_opts)
}
let call = {
prompt: message,
system: turn_system,
opts: llm_opts,
turn: {iteration: llm_opts?._iteration ?? 0, session_id: llm_opts?.session_id ?? "", attempt: 1},
}
let result = try {
caller(call)
}
if is_err(result) {
throw unwrap_err(result)
}
let r = unwrap(result)
__validate_caller_result(r)
return r
}
// -------------------------------------------------------------------------------------------------
// Tool middleware seam — composable tool_caller (mirrors __invoke_llm).
//
// Each tool dispatch is funneled through `tool_caller(envelope, next)` when
// the agent_loop options carry one. The envelope normalizes the call shape
// so middleware doesn't have to peek at the underlying registry/schema:
//
// envelope = {
// tool_name, tool_args, call_id,
// declared_executor?, schema?, description?,
// turn: {iteration, session_id},
// }
//
// The middleware returns a dispatch-shape dict. Calling `next(envelope)`
// runs the default dispatch (with any envelope mutations the middleware
// applied — typically `tool_args` rewrites or argument stripping). Callers
// can short-circuit by returning their own dict without invoking `next`.
//
// See std/llm/tool_middleware for the userspace primitives + the bundled
// middleware library (with_required_reason, with_audit_log, …).
// -------------------------------------------------------------------------------------------------
fn __tool_registry_entry(tools, tool_name) {
if tools == nil {
return nil
}
let entries = tools?.tools
if type_of(entries) != "list" {
return nil
}
for entry in entries {
if type_of(entry) != "dict" {
continue
}
let entry_name = if entry?.name != nil {
to_string(entry.name)
} else {
let func = entry?.function
if type_of(func) == "dict" {
to_string(func?.name ?? "")
} else {
""
}
}
if entry_name == tool_name {
return entry
}
}
return nil
}
fn __tool_envelope(call, tools, options) {
let tool_name = to_string(call?.name ?? call?.tool_name ?? "")
let tool_args_raw = call?.arguments ?? call?.tool_args
let tool_args = if type_of(tool_args_raw) == "dict" {
tool_args_raw
} else {
{}
}
let entry = __tool_registry_entry(tools, tool_name)
let declared_executor = if entry == nil {
nil
} else {
let direct = entry?.executor
if direct != nil {
to_string(direct)
} else {
let func = entry?.function
if type_of(func) == "dict" && func?.executor != nil {
to_string(func.executor)
} else {
nil
}
}
}
let schema = if entry == nil {
nil
} else {
entry?.parameters ?? entry?.input_schema ?? entry?.inputSchema
}
let annotations = if entry == nil {
nil
} else {
entry?.annotations
}
let description = if entry == nil {
""
} else {
let direct = entry?.description
if direct != nil {
to_string(direct)
} else {
let func = entry?.function
if type_of(func) == "dict" && func?.description != nil {
to_string(func.description)
} else {
""
}
}
}
return {
tool_name: tool_name,
tool_args: tool_args,
call_id: to_string(call?.id ?? call?.tool_call_id ?? ""),
declared_executor: declared_executor,
schema: schema,
annotations: annotations,
description: description,
turn: {
iteration: options?._iteration ?? 0,
session_id: to_string(options?.session_id ?? ""),
run_id: options?.run_id ?? options?._run_id,
model: options?.model,
provider: options?.provider,
tool_call_index: options?._tool_call_index ?? 0,
max_concurrent_tools: options?._max_concurrent_tools ?? 1,
prefetch_next_turn: options?._prefetch_next_turn ?? false,
},
}
}
fn __default_invoke_tool(envelope, original_call, tools, options) {
let next_call = original_call
+ {name: envelope.tool_name, tool_name: envelope.tool_name, arguments: envelope.tool_args}
return agent_dispatch_tool_call(next_call, tools, options)
}
fn __validate_tool_caller_result(r) {
if type_of(r) != "dict" {
throw "agent_loop: tool_caller must return a dict; got " + type_of(r)
}
let name = r?.tool_name ?? r?.name
if name == nil || to_string(name) == "" {
throw "agent_loop: tool_caller result missing `tool_name`"
}
let ok = r?.ok
if ok == nil {
let success = r?.success
if success == nil {
let status = r?.status
if status == nil {
throw "agent_loop: tool_caller result missing `ok`/`success`/`status`"
}
}
} else if type_of(ok) != "bool" {
throw "agent_loop: tool_caller result `ok` must be a bool; got " + type_of(ok)
}
}
fn __middleware_exception_result(envelope, err) {
let err_text = to_string(err)
let observation = "[error from " + envelope.tool_name + "]\n" + err_text
+ "\n[end of "
+ envelope.tool_name
+ " error]\n"
return {
ok: false,
status: "error",
tool_name: envelope.tool_name,
tool_call_id: envelope.call_id,
arguments: envelope.tool_args,
result: nil,
rendered_result: err_text,
observation: observation,
error: err_text,
error_category: "tool_middleware_exception",
executor: nil,
}
}
fn __structural_validator_tool_name() -> string {
return "__structural_validator_turn__"
}
fn __structural_validator_pass_result(envelope) {
return {
ok: true,
status: "ok",
tool_name: envelope.tool_name,
tool_call_id: envelope.call_id,
arguments: envelope.tool_args,
result: {configured: false, vetoed: false, skipped: true, reason: "not_configured"},
rendered_result: "",
observation: "",
error: nil,
error_category: nil,
executor: "harn",
}
}
fn __run_structural_validator(
caller,
session_id,
llm_result,
tool_calls,
parsed,
llm_opts,
turn_opts,
prior_successful_tools,
prior_rejected_tools,
attempts,
) {
if caller == nil {
return {configured: false, vetoed: false, skipped: true, reason: "not_configured"}
}
let envelope = {
tool_name: __structural_validator_tool_name(),
tool_args: {
session_id: session_id,
iteration: turn_opts?._iteration ?? 0,
attempts: attempts,
tool_calls: tool_calls,
tools: turn_opts?.tools,
policy: turn_opts?.policy,
assistant_text: llm_result?.visible_text ?? llm_result?.text ?? "",
raw_text: llm_result?.raw_text ?? llm_result?.text ?? "",
parsed_done_marker: parsed?.done_marker ?? "",
tool_parse_errors: parsed?.tool_parse_errors ?? [],
protocol_violations: parsed?.protocol_violations ?? [],
tool_format: turn_opts?.tool_format ?? llm_opts?.tool_format ?? "",
output_tokens: llm_result?.output_tokens ?? 0,
max_output_tokens: llm_opts?.max_tokens ?? turn_opts?.max_tokens ?? 0,
provider: llm_result?.provider ?? "",
model: llm_result?.model ?? "",
prior_successful_tools: prior_successful_tools,
prior_rejected_tools: prior_rejected_tools,
},
call_id: "structural-validator-turn-" + to_string(turn_opts?._iteration ?? 0),
declared_executor: "harn",
schema: nil,
annotations: nil,
description: "Internal structural validator probe",
turn: {
iteration: turn_opts?._iteration ?? 0,
session_id: session_id,
run_id: turn_opts?.run_id ?? turn_opts?._run_id,
model: turn_opts?.model,
provider: turn_opts?.provider,
tool_call_index: 0,
max_concurrent_tools: 1,
prefetch_next_turn: false,
},
}
let next = { env_in -> __structural_validator_pass_result(env_in) }
let outcome = try {
caller(envelope, next)
}
if is_err(outcome) {
let err = unwrap_err(outcome)
if error_category(err) == "cancelled" {
throw err
}
throw "agent_loop: structural validator failed: " + to_string(err)
}
let result = unwrap(outcome)
if type_of(result) != "dict" {
throw "agent_loop: structural validator must return a dict; got " + type_of(result)
}
return if type_of(result?.result) == "dict" {
result.result
} else {
{}
}
}
fn __invoke_tool(call, tools, options) {
let caller = options?._tool_caller
if caller == nil {
return agent_dispatch_tool_call(call, tools, options)
}
let envelope = __tool_envelope(call, tools, options)
let next = { env_in -> __default_invoke_tool(env_in, call, tools, options) }
let outcome = try {
caller(envelope, next)
}
if is_err(outcome) {
let err = unwrap_err(outcome)
if error_category(err) == "cancelled" {
throw err
}
__maybe_emit_tool_audit(
envelope.turn.session_id,
envelope,
{layer: "tool_caller", status: "exception", error: to_string(err)},
)
return __middleware_exception_result(envelope, err)
}
let r = unwrap(outcome)
__validate_tool_caller_result(r)
__maybe_emit_tool_audit(envelope.turn.session_id, envelope, r?.audit, r?.receipt)
return r
}
fn __maybe_emit_tool_audit(session_id, envelope, audit, receipt = nil) {
if audit == nil && receipt == nil {
return
}
if session_id == "" {
return
}
let payload = if receipt == nil {
{tool_call_id: envelope.call_id, tool_name: envelope.tool_name, audit: audit}
} else {
{
tool_call_id: envelope.call_id,
tool_name: envelope.tool_name,
audit: audit ?? {},
receipt: receipt,
}
}
let _ = try {
agent_emit_event(session_id, "tool_call_audit", payload)
}
}
fn __visible_text(parsed, raw_text) {
if parsed?.user_response != nil && parsed.user_response != "" {
return parsed.user_response
}
if parsed?.prose != nil && parsed.prose != "" {
return parsed.prose
}
return raw_text
}
fn __resolve_tool_calls(llm_result, parsed) {
let native_calls = llm_result?.native_tool_calls ?? llm_result?.tool_calls ?? []
if len(native_calls) > 0 {
return native_calls
}
return parsed?.calls ?? []
}
fn __agent_await_resumption_call(tool_calls) {
for call in tool_calls {
if __tool_call_name(call) == "agent_await_resumption" {
return call
}
}
return nil
}
fn __agent_await_resumption_args(call) {
let args = __tool_call_args(call)
return agent_await_resumption(args?.reason ?? "", args?.conditions ?? nil, args?.resume_by ?? nil)
}
fn __agent_loop_await_resumption(session, iteration, call, opts) {
let parsed = __agent_await_resumption_args(call)
let worker = __agent_loop_current_worker()
if worker == nil {
return __agent_loop_await_resumption_top_level(session, iteration, call, parsed, opts)
}
agent_emit_event(
session.session_id,
"tool_call_audit",
{
tool_call_id: to_string(call?.id ?? call?.tool_call_id ?? ""),
tool_name: "agent_await_resumption",
audit: {
layer: "agent_lifecycle",
status: "suspended",
initiator: "self",
reason: parsed.reason,
worker_id: worker?.id,
conditions: parsed.conditions,
},
},
)
suspend_agent(worker, parsed.reason, {initiator: "self", conditions: parsed.conditions})
let checkpoint = __agent_loop_suspend_checkpoint(session, iteration)
if checkpoint == nil {
throw "agent_await_resumption: suspend checkpoint did not yield"
}
return checkpoint
}
fn __agent_loop_await_resumption_top_level(session, iteration, call, parsed, opts) {
agent_session_inject(
session.session_id,
transcript_reminder_event(
{
body: __agent_loop_suspend_reminder_body(parsed.reason),
source: "in_pipeline",
tags: ["agent_loop", "top_level_suspend"],
dedupe_key: "top_level_suspend:" + session.session_id,
ttl_turns: 1,
fired_at_turn: iteration + 1,
},
),
)
let handle = __host_top_level_agent_suspend(
session.session_id,
session.task,
session.system,
opts,
parsed.reason,
parsed.conditions,
iteration,
)
agent_emit_event(
session.session_id,
"tool_call_audit",
{
tool_call_id: to_string(call?.id ?? call?.tool_call_id ?? ""),
tool_name: "agent_await_resumption",
audit: {
layer: "agent_lifecycle",
status: "suspended",
initiator: "self",
reason: parsed.reason,
worker_id: handle?.id,
conditions: parsed.conditions,
},
},
)
return {
status: "suspended",
handle: handle,
worker: handle,
reason: parsed.reason,
initiator: "self",
conditions: parsed.conditions,
resume_by: parsed?.resume_by,
iterations_completed: iteration,
session_id: session.session_id,
}
}
fn __tool_call_name(call) {
return to_string(call?.name ?? call?.tool_name ?? "")
}
fn __tool_call_args(call) {
let raw = call?.arguments ?? call?.tool_args
if type_of(raw) == "dict" {
return raw
}
return {}
}
fn __native_fallback_feedback(policy, fallback_index) {
return native_tool_contract_feedback_prompt({policy: policy, fallback_index: fallback_index})
}
fn __detect_native_fallback(
llm_result,
parsed,
turn_opts,
fallback_index,
session_id,
iteration_index,
) {
let native_calls = llm_result?.native_tool_calls ?? []
let parsed_calls = parsed?.calls ?? []
let format = turn_opts?.tool_format ?? ""
if format != "native" || len(native_calls) > 0 || len(parsed_calls) == 0 {
return {triggered: false, accepted: false, fallback_index: fallback_index, calls: nil}
}
let new_index = fallback_index + 1
let policy = turn_opts?.native_tool_fallback ?? "reject"
let accepted = if policy == "allow" {
true
} else if policy == "allow_once" {
new_index == 1
} else {
false
}
agent_record_native_tool_fallback(
session_id,
{
iteration: iteration_index + 1,
accepted: accepted,
policy: policy,
fallback_index: new_index,
tool_call_count: len(parsed_calls),
},
)
if !accepted {
agent_session_inject_feedback(
session_id,
"native_tool_contract",
__native_fallback_feedback(policy, new_index),
)
}
let resolved_calls = if accepted {
parsed_calls
} else {
[]
}
return {triggered: true, accepted: accepted, fallback_index: new_index, calls: resolved_calls}
}
fn __resolve_max_concurrent_tools(turn_opts) {
let raw = turn_opts?.max_concurrent_tools
if type_of(raw) == "int" && raw > 1 {
return raw
}
return 1
}
fn __callable(value) {
let kind = type_of(value)
return kind == "closure" || kind == "function" || kind == "fn"
}
fn __audit_flushes_from_result(result) {
var flushes = []
let single = result?._audit_flush
if __callable(single) {
flushes = flushes.push(single)
}
let many = result?._audit_flushes
if type_of(many) == "list" {
for flush in many {
if __callable(flush) {
flushes = flushes.push(flush)
}
}
}
return flushes
}
fn __collect_audit_flushes(dispatch) {
var flushes = []
for result in __dispatch_results_list(dispatch) {
for flush in __audit_flushes_from_result(result) {
flushes = flushes.push(flush)
}
}
return flushes
}
fn __strip_internal_tool_result(result) {
if type_of(result) != "dict" {
return result
}
var clean = {}
for key in result.keys() {
if !starts_with(key, "_") {
clean = clean + {[key]: result[key]}
}
}
return clean
}
fn __strip_internal_dispatch(dispatch) {
if type_of(dispatch) == "list" {
var clean_results = []
for result in dispatch {
clean_results = clean_results.push(__strip_internal_tool_result(result))
}
return clean_results
}
if type_of(dispatch) != "dict" {
return dispatch
}
var clean_dispatch = {}
for key in dispatch.keys() {
if !starts_with(key, "_") {
clean_dispatch = clean_dispatch + {[key]: dispatch[key]}
}
}
if type_of(dispatch?.results) == "list" {
var clean_results = []
for result in dispatch.results {
clean_results = clean_results.push(__strip_internal_tool_result(result))
}
clean_dispatch = clean_dispatch + {results: clean_results}
}
return clean_dispatch
}
fn __spawn_audit_flushes(tasks, flushes) {
var out = tasks
for flush in flushes {
let task = spawn {
let _ = try {
flush()
}
}
out = out.push(task)
}
return out
}
fn __drain_audit_flushes(tasks) {
for task in tasks {
let _ = try {
await(task)
}
}
}
fn __dispatch_tool_calls(session_id, tool_calls, turn_opts) {
if len(tool_calls) == 0 {
return {dispatch: nil, turn_opts: turn_opts, audit_flushes: []}
}
agent_tool_search_emit_queries(session_id, tool_calls, turn_opts)
let tools = turn_opts?.tools
let cap = __resolve_max_concurrent_tools(turn_opts)
let dispatch_options = {
session_id: session_id,
tool_format: turn_opts.tool_format,
policy: turn_opts?.policy,
approval_policy: turn_opts?.approval_policy,
command_policy: turn_opts?.command_policy,
permissions: turn_opts?.permissions,
reminders: turn_opts?.reminders,
_iteration: turn_opts?._iteration ?? 0,
_tool_caller: turn_opts?._tool_caller,
_max_concurrent_tools: cap,
_prefetch_next_turn: turn_opts?.prefetch_next_turn ?? false,
}
let caller = turn_opts?._tool_caller
let raw_dispatch = if caller == nil {
agent_dispatch_tool_batch(tool_calls, tools, dispatch_options)
} else {
__dispatch_tool_calls_with_middleware(tool_calls, tools, dispatch_options, cap)
}
let audit_flushes = __collect_audit_flushes(raw_dispatch)
let dispatch = __strip_internal_dispatch(raw_dispatch)
agent_session_record_tool_results(session_id, dispatch)
return {
dispatch: dispatch,
turn_opts: agent_tool_search_record_results(session_id, tool_calls, dispatch, turn_opts),
audit_flushes: audit_flushes,
}
}
fn __invoke_tool_with_index(call, index, tools, options) {
return __invoke_tool(call, tools, options + {_tool_call_index: index})
}
fn __dispatch_tool_calls_with_middleware(tool_calls, tools, options, cap) {
// Middleware-enabled path. Each call invokes its own caller chain
// inside a fresh closure scope, so `audit.layers` histories stay
// independent across siblings. When `max_concurrent_tools > 1`,
// dispatch siblings concurrently via `parallel settle` with the
// requested cap; results come back in source order regardless of
// completion order so text tool-call parsers that key on
// declaration order still match.
if cap <= 1 || len(tool_calls) <= 1 {
var results = []
for (index, call) in iter(tool_calls).enumerate() {
results = results.push(__invoke_tool_with_index(call, index, tools, options))
}
return results
}
var indexed = []
for (index, call) in iter(tool_calls).enumerate() {
indexed = indexed.push({index: index, call: call})
}
let settled = parallel settle indexed with { max_concurrent: cap } { entry ->
__invoke_tool_with_index(entry.call, entry.index, tools, options)
}
var results = []
for r in settled.results {
if is_ok(r) {
results = results.push(unwrap(r))
} else {
// `__invoke_tool` traps its own middleware exceptions, so a thrown
// value here is a VM-level bug (e.g. parallel-task plumbing). Surface
// it as a synthetic error result rather than tear down the loop.
let err = unwrap_err(r)
if error_category(err) == "cancelled" {
throw err
}
results = results
.push(
{
ok: false,
status: "error",
tool_name: "",
tool_call_id: "",
arguments: {},
result: nil,
rendered_result: to_string(err),
observation: to_string(err),
error: to_string(err),
error_category: "tool_parallel_dispatch_exception",
executor: nil,
},
)
}
}
return results
}
fn __sync_tool_search_state(opts, turn_opts) {
if turn_opts?._tool_search_client == nil {
return opts
}
return opts + {_tool_search_client: turn_opts._tool_search_client}
}
fn __dispatch_results_list(dispatch) {
if dispatch == nil {
return []
}
if type_of(dispatch) == "list" {
return dispatch
}
return dispatch?.results ?? []
}
fn __tool_result_ok(result) {
if result?.ok != nil {
return result.ok ? true : false
}
if result?.success != nil {
return result.success ? true : false
}
let status = result?.status ?? ""
return status == "ok" || status == "success"
}
fn __tool_result_name(result) {
return result?.tool_name ?? result?.name ?? ""
}
fn __tool_names_by_status(dispatch, want_ok) {
let results = __dispatch_results_list(dispatch)
var names = []
for result in results {
let name = __tool_result_name(result)
if name != "" && __tool_result_ok(result) == want_ok {
names = names.push(name)
}
}
return names
}
fn __merge_tool_names(existing, additions) {
var merged = existing ?? []
let values = additions ?? []
for name in values {
if name != "" && !contains(merged, name) {
merged = merged.push(name)
}
}
return merged
}
fn __merge_hook_dict(base, patch, label) {
if patch == nil {
return base
}
if type_of(patch) != "dict" {
throw "agent_loop: post_turn_callback `" + label + "` must be a dict"
}
return base + patch
}
fn __strip_internal_keys(patch) {
if patch == nil {
return patch
}
if type_of(patch) != "dict" {
return patch
}
var clean = {}
for key in patch.keys() {
if !starts_with(key, "_") {
clean = clean + {[key]: patch[key]}
}
}
return clean
}
fn __apply_post_turn_options(opts, outcome) {
var updated = opts
let next_patch = __strip_internal_keys(outcome?.next_options)
updated = __merge_hook_dict(updated, next_patch, "next_options")
let narrowing = outcome?.narrowing
if narrowing != nil {
updated = updated
+ {
_tool_surface_narrowing_history: narrowing?.history ?? [],
_tool_surface_narrowed_tools: narrowing?.narrowed_tools,
}
}
let llm_patch = outcome?.llm_options
if llm_patch != nil {
if type_of(llm_patch) != "dict" {
throw "agent_loop: post_turn_callback `llm_options` must be a dict"
}
let base_llm_options = updated?.llm_options ?? {}
updated = updated + {llm_options: base_llm_options + llm_patch}
}
return updated
}
fn __next_text_only_count(tool_count, consecutive_text_only) {
if tool_count == 0 {
return consecutive_text_only + 1
}
return 0
}
/**
* Build the `iteration_info` payload for the `iteration_end` event from
* the LLM result plus the loop's per-iteration aggregates. Carries `provider`,
* `model`, `response_ms`, token counts, and `thinking_chars` so live
* pulse-check consumers (fleet hooks, ACP clients) can attribute
* latency and surface "still working" indicators without re-parsing
* the transcript JSONL. Empty/missing fields are dropped so the event
* stays small for providers that don't report telemetry.
*/
fn __iteration_info_payload(llm_result, tool_count, visible_text, aggregates = nil) {
let telemetry = llm_result?.provider_telemetry ?? {}
let thinking = llm_result?.thinking ?? ""
let thinking_chars = if type_of(thinking) == "string" {
len(thinking)
} else {
0
}
let extra = aggregates ?? {}
return {
tool_count: tool_count,
text: visible_text,
provider: llm_result?.provider ?? "",
model: llm_result?.model ?? "",
response_ms: telemetry?.client_wall_ms ?? 0,
input_tokens: llm_result?.input_tokens ?? 0,
output_tokens: llm_result?.output_tokens ?? 0,
thinking_chars: thinking_chars,
}
+ extra
}
fn __agent_loop_elapsed_ms(start_ms) {
let elapsed = now_ms() - start_ms
if elapsed < 0 {
return 0
}
return elapsed
}
fn __agent_loop_cost_usd(totals) {
let value = to_float(totals?.cost_usd ?? 0.0)
if value == nil {
return 0.0
}
return value
}
fn __agent_loop_budget_aggregates(session_id, totals, loop_start_ms) {
let resolved_totals = totals ?? agent_session_totals(session_id)
return {
cost_usd: __agent_loop_cost_usd(resolved_totals),
wall_clock_ms: __agent_loop_elapsed_ms(loop_start_ms),
}
}
fn __agent_loop_iteration_info(
session_id,
llm_result,
tool_count,
visible_text,
totals,
loop_start_ms,
) {
return __iteration_info_payload(
llm_result,
tool_count,
visible_text,
__agent_loop_budget_aggregates(session_id, totals, loop_start_ms),
)
}
fn __agent_loop_budget_exhaustion(
session_id,
budget,
iteration,
totals,
loop_start_ms,
max_iterations,
) {
let aggregates = __agent_loop_budget_aggregates(session_id, totals, loop_start_ms)
if budget?.wall_clock_ms != nil && aggregates.wall_clock_ms >= budget.wall_clock_ms {
return aggregates
+ {exhausted: true, kind: "wall_clock", iteration: iteration, max_iterations: max_iterations}
}
if budget?.total_cost_usd != nil && aggregates.cost_usd >= budget.total_cost_usd {
return aggregates
+ {exhausted: true, kind: "total_cost", iteration: iteration, max_iterations: max_iterations}
}
return aggregates + {exhausted: false, kind: "", iteration: iteration, max_iterations: max_iterations}
}
fn __agent_loop_emit_budget_exhausted(session_id, exhaustion) {
agent_emit_event(
session_id,
"budget_exhausted",
{
kind: exhaustion?.kind ?? "budget_exhausted",
max_iterations: exhaustion?.max_iterations ?? 0,
iteration: exhaustion?.iteration ?? 0,
cost_usd: exhaustion?.cost_usd ?? 0.0,
wall_clock_ms: exhaustion?.wall_clock_ms ?? 0,
},
)
}
fn __agent_loop_record_budget_stop(decisions, iteration, current_max, reason) {
return decisions
.push(
{
iteration: iteration,
action: "stop",
old_limit: current_max,
new_limit: current_max,
reason: reason,
status: "budget_exhausted",
},
)
}
fn __agent_loop_state_budget_fields(
session_id,
totals,
loop_start_ms,
budget,
consecutive_failure_count,
) {
let aggregates = __agent_loop_budget_aggregates(session_id, totals, loop_start_ms)
let failure_config = __agent_loop_consecutive_failure_config(budget)
return {
wall_clock_ms: aggregates.wall_clock_ms,
wall_clock_limit_ms: budget?.wall_clock_ms,
cost_usd: aggregates.cost_usd,
total_cost_limit_usd: budget?.total_cost_usd,
consecutive_failures: consecutive_failure_count,
consecutive_failure_limit: failure_config?.max,
}
}
fn __agent_loop_failure_matches_kind(error, wanted) {
let category = to_string(error?.category ?? "")
let reason = to_string(error?.reason ?? "")
let kind = to_string(error?.kind ?? "")
let status = to_int(error?.status ?? error?.status_code ?? 0) ?? 0
if wanted == "transient" {
return kind == "transient" || category == "transient_network" || category == "timeout"
|| reason == "timeout"
}
if wanted == "rate_limit" {
return kind == "rate_limit" || category == "rate_limit" || category == "rate_limited"
|| reason == "rate_limit"
|| status == 429
}
if wanted == "provider_5xx" {
return kind == "provider_5xx" || category == "server_error" || category == "overloaded"
|| reason == "server_error"
|| (status >= 500 && status < 600)
}
return category == wanted || reason == wanted || kind == wanted
}
fn __agent_loop_consecutive_failure_config(budget) {
let config = budget?.consecutive_failures
if type_of(config) != "dict" {
return nil
}
return config
}
fn __agent_loop_tracks_failure(error, config) {
if config == nil {
return false
}
for kind in config?.kinds ?? ["transient", "rate_limit", "provider_5xx"] {
if __agent_loop_failure_matches_kind(error, kind) {
return true
}
}
return false
}
fn __agent_loop_finalize_failed(session, iteration) {
try {
agent_session_finalize(
session.session_id,
{final_status: "failed", stop_reason: "error", iterations: iteration},
)
} catch (e) {
}
}
fn __agent_loop_current_worker() {
let ctx = runtime_context()
let worker_id = ctx?.worker_id
if type_of(worker_id) != "string" || worker_id == "" {
return nil
}
for worker in list_agents() {
if worker?.id == worker_id {
return worker
}
}
return nil
}
fn __agent_loop_suspend_reminder_body(reason) {
if type_of(reason) == "string" && reason != "" {
return "Worker suspended before the next turn: " + reason
}
return "Worker suspended before the next turn."
}
fn __agent_loop_suspend_initiator(value) {
let text = to_string(value ?? "operator")
if text == "self_initiated" {
return "self"
}
return text
}
fn __agent_loop_suspend_checkpoint(session, iteration) {
let worker = __agent_loop_current_worker()
if worker == nil || worker?.status != "suspended" {
return nil
}
let suspension = worker?.suspension ?? {}
let reason = to_string(suspension?.reason ?? "")
let initiator = __agent_loop_suspend_initiator(suspension?.initiator)
let conditions = suspension?.conditions
let payload = {
status: "suspended",
handle: worker,
worker: worker,
reason: reason,
initiator: initiator,
conditions: conditions,
iterations_completed: iteration,
session_id: session.session_id,
}
agent_session_inject(
session.session_id,
transcript_reminder_event(
{
body: __agent_loop_suspend_reminder_body(reason),
source: "in_pipeline",
tags: ["agent_loop", "worker_suspend"],
dedupe_key: "worker_suspend:" + worker.id,
ttl_turns: 1,
fired_at_turn: iteration + 1,
},
),
)
return payload
}
/**
* Mode filter table for `__agent_loop_checkpoint`. Each row encodes the
* invariant for one seam: which bridge modes are eligible to drain, and
* whether the host should pull the agent_inbox feedback queue.
*
* iteration_start, post_tool_dispatch, iteration_end → drain `interrupt_immediate`
* and `finish_step`; the model will see whatever lands on its next
* prompt build, so both modes get the same opportunity here.
* pre_tool_dispatch → drain `interrupt_immediate` only. This is the
* "stop before the tool fires" seam — `finish_step` semantics
* would defeat the point (it means "after the current tool batch").
* If anything arrives, the caller skips the pending tool batch.
* pre_compact, post_compact → bracket the compactor with an
* agent_inbox drain so async producers (tool completions, MCP
* notifications, command-policy feedback) land in the transcript
* before the summarizer sees it and the next prompt is built.
* daemon_idle_pre, daemon_idle_post → drain `interrupt_immediate`
* only; the daemon path doesn't queue `finish_step`-mode injections.
* loop_exit → drain `audit_only`. The other two modes were already
* drained earlier in the loop body. `audit_only` reminders land in
* the transcript at this seam but are NEVER rendered into a model
* prompt — no further LLM call runs after `loop_exit` (harn#2212).
* Hosts that need the model to see a reminder before the agent
* terminates must use `finish_step` (drained at every iteration
* boundary, including the last `iteration_end` before the loop breaks).
*/
fn __agent_loop_checkpoint_modes(kind) {
if kind == "pre_tool_dispatch" || kind == "daemon_idle_pre" || kind == "daemon_idle_post" {
return {immediate: true, finish_step: false, audit_only: false, inbox: false}
}
if kind == "iteration_start" || kind == "post_tool_dispatch" || kind == "iteration_end" {
return {immediate: true, finish_step: true, audit_only: false, inbox: false}
}
if kind == "pre_compact" || kind == "post_compact" {
return {immediate: false, finish_step: false, audit_only: false, inbox: true}
}
if kind == "loop_exit" {
return {immediate: false, finish_step: false, audit_only: true, inbox: false}
}
return {immediate: false, finish_step: false, audit_only: false, inbox: false}
}
/**
* Single source of truth for "the loop is at a safe injection seam."
* Every drain site in the agent loop body and the daemon idle path
* routes through here so plugin authors and replayers see one canonical
* event (`LoopCheckpoint`) instead of having to enumerate inline calls.
*
* Returns a result dict carrying `delivered` (bridge injections drained
* at this seam), `inbox_delivered` (inbox feedback notes drained), and
* `dispatch_skipped` (`pre_tool_dispatch` short-circuit: an
* `interrupt_immediate` arrival here means the pending tool batch is
* cancelled and the loop iterates once more so the LLM sees the
* injection before the tool would have fired). Callers branch on
* `delivered` for the "continue if a steer arrived" behavior the
* stalled-done-judge and post-turn paths rely on.
*/
fn __agent_loop_checkpoint(session_id, kind, opts = nil) {
let modes = __agent_loop_checkpoint_modes(kind)
var delivered = 0
if modes.immediate {
let result = agent_session_drain_bridge_injections(session_id, "interrupt_immediate")
delivered = delivered + result?.delivered ?? 0
}
let immediate_count = delivered
if modes.finish_step {
let result = agent_session_drain_bridge_injections(session_id, "finish_step")
delivered = delivered + result?.delivered ?? 0
}
if modes.audit_only {
let result = agent_session_drain_bridge_injections(session_id, "audit_only")
delivered = delivered + result?.delivered ?? 0
}
var inbox_delivered = 0
if modes.inbox {
let pending = agent_session_drain_feedback(session_id)
for note in pending {
agent_session_inject_feedback(session_id, note.kind, note.content)
inbox_delivered = inbox_delivered + 1
}
}
let dispatch_skipped = kind == "pre_tool_dispatch" && immediate_count > 0
let iteration = to_int(opts?.iteration ?? 0)
agent_emit_event(
session_id,
"loop_checkpoint",
{
iteration: iteration,
kind: kind,
delivered: delivered,
inbox_delivered: inbox_delivered,
dispatch_skipped: dispatch_skipped,
},
)
__host_fire_session_hook(
"loop_checkpoint",
{
session_id: session_id,
iteration: iteration,
kind: kind,
delivered: delivered,
inbox_delivered: inbox_delivered,
dispatch_skipped: dispatch_skipped,
},
)
return {
delivered: delivered,
inbox_delivered: inbox_delivered,
dispatch_skipped: dispatch_skipped,
kind: kind,
}
}
fn __agent_loop_fire_resume_continuity(session, opts) {
let payload = opts?._resume_continuity
if type_of(payload) != "dict" {
return opts
}
let _ = agent_reminder_providers_fire(
session.session_id,
"worker_resumed",
payload
+ {
session_id: session.session_id,
session: {id: session.session_id},
turn: payload?.turn ?? 0,
iteration: payload?.iteration ?? 0,
},
opts,
)
return omit(opts, ["_resume_continuity"])
}
fn __scope_classifier_recent_context(messages, limit) {
if type_of(messages) != "list" {
return []
}
let cap = if type_of(limit) == "int" && limit > 0 {
limit
} else {
3
}
let total = len(messages)
var start = total - cap
if start < 0 {
start = 0
}
var out = []
var i = start
while i < total {
let msg = messages[i]
out = out
.push({role: to_string(msg?.role ?? ""), content: msg?.content ?? msg?.text ?? ""})
i = i + 1
}
return out
}
fn __scope_classifier_confidence(value, fallback) {
let parsed = to_float(value ?? fallback)
if parsed == nil {
return fallback
}
if parsed < 0.0 {
return 0.0
}
if parsed > 1.0 {
return 1.0
}
return parsed
}
fn __scope_classifier_label(value) {
let label = lowercase(trim(to_string(value ?? "")))
if label == "in_scope" || label == "inscope" || label == "in-scope" {
return "in_scope"
}
if label == "out_of_scope" || label == "outscope" || label == "out-of-scope" {
return "out_of_scope"
}
if label == "escalate" || label == "ambiguous" || label == "uncertain" {
return "escalate"
}
return "escalate"
}
fn __scope_classifier_normalize(raw, session_id, iteration) {
if raw == nil {
return nil
}
if type_of(raw) != "dict" {
return {
label: "escalate",
original_label: "invalid",
confidence: 0.0,
confidence_threshold: 0.65,
evidence: "scope classifier returned " + type_of(raw) + ", not a dict",
session_id: session_id,
iteration: iteration,
skip_main_turn: false,
}
}
let threshold = __scope_classifier_confidence(raw?.confidence_threshold ?? raw?.threshold, 0.65)
let original_label = __scope_classifier_label(raw?.original_label ?? raw?.label)
let confidence = __scope_classifier_confidence(raw?.confidence, 0.0)
let label = if original_label != "escalate" && confidence < threshold {
"escalate"
} else {
original_label
}
let evidence = trim(to_string(raw?.evidence ?? raw?.reason ?? raw?.reasoning ?? ""))
return raw
+ {
label: label,
original_label: original_label,
confidence: confidence,
confidence_threshold: threshold,
evidence: if evidence == "" {
"no evidence provided"
} else {
evidence
},
session_id: session_id,
iteration: iteration,
skip_main_turn: raw?.skip_main_turn ?? true,
}
}
fn __scope_classifier_fail_open(session_id, iteration, err) {
return {
label: "escalate",
original_label: "error",
confidence: 0.0,
confidence_threshold: 0.65,
evidence: "scope classifier failed: " + to_string(err),
error: to_string(err),
session_id: session_id,
iteration: iteration,
skip_main_turn: false,
}
}
fn __run_pre_turn_scope_classifier(classifier, session, message, turn_opts, iteration_index) {
if classifier == nil {
return nil
}
let iteration = iteration_index + 1
let messages = agent_session_messages(session.session_id)
let anchor = agent_session_workspace_anchor(session.session_id)
let payload = {
session_id: session.session_id,
session: {id: session.session_id},
iteration: iteration,
user_message: message,
task: session?.task ?? message,
messages: messages,
recent_context: __scope_classifier_recent_context(messages, 3),
workspace_anchor: anchor,
provider: turn_opts?.provider ?? "",
model: turn_opts?.model ?? "",
}
let outcome = try {
classifier(payload)
}
let verdict = if is_err(outcome) {
let err = unwrap_err(outcome)
if error_category(err) == "cancelled" {
throw err
}
__scope_classifier_fail_open(session.session_id, iteration, err)
} else {
__scope_classifier_normalize(unwrap(outcome), session.session_id, iteration)
}
if verdict != nil {
agent_emit_event(session.session_id, "scope_classifier_verdict", verdict)
}
return verdict
}
fn __scope_classifier_mounted_roots(anchor) {
let roots = anchor?.additional_roots ?? []
if type_of(roots) != "list" || len(roots) == 0 {
return " (none)"
}
var lines = []
for root in roots {
lines = lines
.push(
" - " + to_string(root?.path ?? root?.root ?? "")
+ " (mount_mode: "
+ to_string(root?.mount_mode ?? "")
+ ")",
)
}
return join(lines, "\n")
}
fn __scope_classifier_alert_body(verdict, session) {
let anchor = verdict?.workspace_anchor ?? agent_session_workspace_anchor(session.session_id)
let primary = to_string(anchor?.primary ?? "(none)")
return "<scope-alert>\nThe latest user turn appears outside the current workspace anchor. "
+ to_string(verdict?.evidence ?? "")
+ "\n\nCurrent anchor: "
+ primary
+ "\nMounted roots:\n"
+ __scope_classifier_mounted_roots(anchor)
+ "\n\nThree options:\n"
+ " - add_root: mount the target repo into this session with `agent_session_add_root(session_id, root, {mount_mode})`\n"
+ " - reanchor: switch the session primary anchor with `agent_session_reanchor(session_id, new_anchor)`\n"
+ " - fork: spawn a sub-agent against the target repo with `spawn_agent({anchor: new_anchor, ...})`\n\n"
+ "Ask the user which handoff they prefer before doing workspace-mutating work.\n</scope-alert>"
}
fn __scope_classifier_assistant_text(verdict) {
let evidence = trim(to_string(verdict?.evidence ?? ""))
let suffix = if evidence == "" {
""
} else {
" " + evidence
}
return "This task appears to be outside the current workspace anchor."
+ suffix
+ " Options: 1) Add Root, 2) Re-anchor, 3) Fork to a new session. Which would you prefer?"
}
fn __scope_classifier_skip_main(verdict) {
return verdict != nil && verdict?.label == "out_of_scope" && verdict?.skip_main_turn ?? true
}
fn __scope_classifier_record_skip_turn(session, verdict, iteration_index) {
let text = __scope_classifier_assistant_text(verdict)
agent_session_record_assistant(
session.session_id,
{
text: text,
visible_text: text,
provider: "harn",
model: "scope_classifier",
input_tokens: 0,
output_tokens: 0,
scope_classifier_verdict: verdict,
},
)
agent_session_inject(
session.session_id,
transcript_reminder_event(
{
body: __scope_classifier_alert_body(verdict, session),
source: "in_pipeline",
tags: ["scope_alert", "pre_turn_scope_classifier"],
dedupe_key: "scope_alert:pre_turn:" + substring(sha256(session.session_id + text), 0, 16),
ttl_turns: 3,
fired_at_turn: iteration_index + 1,
},
),
)
agent_emit_event(
session.session_id,
"iteration_end",
{
iteration: iteration_index + 1,
iteration_info: __iteration_info_payload(
{
text: text,
visible_text: text,
provider: "harn",
model: "scope_classifier",
input_tokens: 0,
output_tokens: 0,
},
0,
text,
)
+ {
dispatch_skipped: true,
skip_reason: "scope_classifier_out_of_scope",
scope_classifier_verdict: verdict,
},
},
)
let _ = __agent_loop_checkpoint(session.session_id, "iteration_end", {iteration: iteration_index + 1})
return text
}
fn __agent_loop_pop_structural_veto_turn(session_id) {
let messages = agent_session_messages(session_id)
if len(messages) == 0 {
return false
}
let last = messages[len(messages) - 1]
if last?.role != "assistant" {
return false
}
return agent_session_pop_last_assistant(session_id)
}
@complexity(allow)
fn __agent_loop_run(message, session, initial_opts) {
var opts = initial_opts
var iteration = 0
var session_finalized = false
// Publish the resolved provider/model so any `.harn.prompt` rendered
// during turn-system construction (loop contract, tool contract,
// skills, …) can branch on `llm.capabilities.*` without manual
// option threading. Skipped when provider is empty — the render
// sees `llm = nil` and the existing branch falls through.
let __llm_ctx_provider = to_string(initial_opts?.provider ?? "")
let __llm_ctx_model = to_string(initial_opts?.model ?? "")
let __llm_ctx_pushed = __push_llm_render_context(__llm_ctx_provider, __llm_ctx_model)
var audit_background_tasks = []
defer {
if __llm_ctx_pushed {
__pop_llm_render_context()
}
}
let run = try {
opts = agent_mcp_bootstrap_if_needed(session, opts)
var stop_reason = nil
var final_status = ""
var terminal_error = nil
var verify_attempts = 0
var done_judge_invocations = 0
var step_judge_attempts = 0
var structural_validator_attempts = 0
var consecutive_text_only = 0
var fallback_index = 0
var successful_tools_seen = []
var rejected_tools_seen = []
var suspend_result = nil
var stall_state = agent_stall_initial_state()
var stall_enabled_seen = false
let max_verify_attempts = opts?.max_verify_attempts ?? 20
let budget = opts?.iteration_budget
?? {
mode: "fixed",
initial: opts?.max_iterations ?? 50,
max: opts?.max_iterations ?? 50,
extend_by: 0,
expose_decisions: false,
}
var current_max = budget.initial
var extensions_used = 0
var budget_decisions = []
var consecutive_failure_count = 0
var budget_exhausted_emitted = false
var last_tool_count = 0
let loop_start_ms = now_ms()
while iteration < current_max {
let boundary_exhaustion = __agent_loop_budget_exhaustion(
session.session_id,
budget,
iteration,
nil,
loop_start_ms,
current_max,
)
if boundary_exhaustion.exhausted {
__agent_loop_emit_budget_exhausted(session.session_id, boundary_exhaustion)
budget_exhausted_emitted = true
budget_decisions = __agent_loop_record_budget_stop(budget_decisions, iteration, current_max, boundary_exhaustion.kind)
final_status = "budget_exhausted"
stop_reason = boundary_exhaustion.kind
break
}
let checkpoint = __agent_loop_suspend_checkpoint(session, iteration)
if checkpoint != nil {
__drain_audit_flushes(audit_background_tasks)
audit_background_tasks = []
suspend_result = checkpoint
final_status = "suspended"
stop_reason = "suspended"
break
}
if agent_budget_pre_call_blocked(session, opts) {
final_status = "budget_exhausted"
break
}
let iteration_index = iteration
agent_emit_event(
session.session_id,
"iteration_start",
{iteration: iteration_index + 1, provider: opts?.provider ?? "", model: opts?.model ?? ""},
)
try {
__host_drain_file_edits(session.session_id)
} catch (e) {
nil
}
__agent_loop_checkpoint(session.session_id, "iteration_start", {iteration: iteration_index + 1})
var turn_opts = agent_skills_match(session, opts, iteration_index)
if turn_opts?._skill_activated_this_turn ?? false {
opts = agent_reset_tool_surface_narrowing(opts)
turn_opts = agent_reset_tool_surface_narrowing(turn_opts)
}
turn_opts = agent_tool_search_inject_if_needed(turn_opts)
turn_opts = agent_apply_tool_surface_narrowing(turn_opts)
__agent_loop_checkpoint(session.session_id, "pre_compact", {iteration: iteration_index + 1})
agent_autocompact_if_needed(session, turn_opts)
__agent_loop_checkpoint(session.session_id, "post_compact", {iteration: iteration_index + 1})
let scope_verdict = __run_pre_turn_scope_classifier(
turn_opts?._pre_turn_scope_classifier ?? opts?._pre_turn_scope_classifier,
session,
message,
turn_opts,
iteration_index,
)
if __scope_classifier_skip_main(scope_verdict) {
__scope_classifier_record_skip_turn(session, scope_verdict, iteration_index)
iteration = iteration + 1
final_status = "scope_alert"
stop_reason = "out_of_scope"
break
}
let turn_system_fragments = agent_build_turn_system_fragments(session, turn_opts, iteration_index)
var turn_system_parts = []
for fragment in turn_system_fragments {
turn_system_parts = turn_system_parts.push(fragment.body)
}
let turn_system = join(turn_system_parts, "\n\n")
let turn_messages = agent_build_turn_messages(session, turn_opts, iteration_index)
let llm_overrides = turn_opts?.llm_options ?? {}
let base_opts = turn_opts + llm_overrides
let llm_opts = base_opts
+ {
messages: turn_messages,
session_id: session.session_id,
tool_format: turn_opts.tool_format,
_iteration: iteration_index + 1,
_system_fragments: turn_system_fragments,
}
let call = __invoke_llm(message, turn_system, llm_opts)
if !call.ok {
let failure_config = __agent_loop_consecutive_failure_config(budget)
if __agent_loop_tracks_failure(call?.error, failure_config) {
iteration = iteration + 1
consecutive_failure_count = consecutive_failure_count + 1
let failure_aggregates = __agent_loop_budget_aggregates(session.session_id, nil, loop_start_ms)
agent_emit_event(
session.session_id,
"iteration_end",
{
iteration: iteration_index + 1,
iteration_info: failure_aggregates
+ {
dispatch_skipped: true,
skip_reason: "provider_failure",
provider_error: call?.error ?? {},
consecutive_failures: consecutive_failure_count,
},
},
)
if consecutive_failure_count >= failure_config.max {
let paused_for_ms = failure_config?.paused_for_ms ?? 0
agent_emit_event(
session.session_id,
"budget_circuit_breaker",
{
kind: "consecutive_failures",
consecutive_count: consecutive_failure_count,
paused_for_ms: paused_for_ms,
},
)
if paused_for_ms > 0 {
sleep_ms(paused_for_ms)
}
let exhaustion = __agent_loop_budget_aggregates(session.session_id, nil, loop_start_ms)
+ {exhausted: true, kind: "consecutive_failures", iteration: iteration, max_iterations: current_max}
__agent_loop_emit_budget_exhausted(session.session_id, exhaustion)
budget_exhausted_emitted = true
budget_decisions = __agent_loop_record_budget_stop(budget_decisions, iteration, current_max, exhaustion.kind)
final_status = "budget_exhausted"
stop_reason = "circuit_breaker"
terminal_error = call?.error
break
}
continue
}
final_status = call.status
stop_reason = call?.stop_reason ?? call.status
terminal_error = call?.error
break
}
let llm_result = call.value
consecutive_failure_count = 0
iteration = iteration + 1
let raw_text = llm_result?.text ?? ""
let parsed = agent_parse_tool_calls(raw_text, turn_opts?.tools)
let visible_text = __visible_text(parsed, raw_text)
let normalized = llm_result + {text: visible_text, visible_text: visible_text}
let fallback_outcome = __detect_native_fallback(
llm_result,
parsed,
turn_opts,
fallback_index,
session.session_id,
iteration_index,
)
fallback_index = fallback_outcome.fallback_index
let tool_calls = if fallback_outcome.triggered {
fallback_outcome.calls
} else {
__resolve_tool_calls(llm_result, parsed)
}
let recorded_assistant = if fallback_outcome.triggered && fallback_outcome.accepted {
normalized + {tool_calls: tool_calls, native_tool_calls: tool_calls}
} else {
normalized
}
agent_session_record_assistant(session.session_id, recorded_assistant)
let await_call = __agent_await_resumption_call(tool_calls)
if await_call != nil {
suspend_result = __agent_loop_await_resumption(session, iteration, await_call, opts)
let _totals = agent_session_record_usage(session.session_id, llm_result, turn_opts, iteration_index + 1)
agent_emit_event(
session.session_id,
"iteration_end",
{
iteration: iteration_index + 1,
iteration_info: __agent_loop_iteration_info(
session.session_id,
llm_result,
len(tool_calls),
visible_text,
_totals,
loop_start_ms,
),
},
)
final_status = "suspended"
stop_reason = "suspended"
break
}
let stall_judge_due = agent_stall_done_judge_due(turn_opts, done_judge_invocations, iteration_index + 1)
let stall_observation = agent_stall_observe_tool_calls(
session.session_id,
tool_calls,
iteration_index + 1,
turn_opts?.stall_diagnostics,
stall_state,
stall_judge_due,
)
stall_state = stall_observation.state
stall_enabled_seen = stall_enabled_seen || stall_observation.enabled
let stall_warning = stall_observation.warning
let structural_verdict = __run_structural_validator(
opts?._structural_validator,
session.session_id,
normalized + {raw_text: raw_text},
tool_calls,
parsed,
llm_opts,
turn_opts,
successful_tools_seen,
rejected_tools_seen,
structural_validator_attempts,
)
if structural_verdict.vetoed {
let on_failure = structural_verdict?.on_failure ?? "regenerate_with_feedback"
if on_failure == "raise" {
throw structural_verdict?.diagnostic
?? "structural validator rejected assistant turn"
}
structural_validator_attempts = structural_validator_attempts + 1
__agent_loop_pop_structural_veto_turn(session.session_id)
let feedback = to_string(structural_verdict?.feedback ?? "")
if feedback != "" {
agent_session_inject_feedback(session.session_id, "structural_validator", feedback)
}
let structural_totals = agent_session_record_usage(session.session_id, llm_result, turn_opts, iteration_index + 1)
agent_emit_event(
session.session_id,
"iteration_end",
{
iteration: iteration_index + 1,
iteration_info: __agent_loop_iteration_info(
session.session_id,
llm_result,
0,
visible_text,
structural_totals,
loop_start_ms,
)
+ {
dispatch_skipped: true,
skip_reason: "structural_validator_revise",
structural_validator_attempts: structural_validator_attempts,
structural_validator_rule: structural_verdict?.rule ?? "",
},
},
)
let structural_exhaustion = __agent_loop_budget_exhaustion(
session.session_id,
budget,
iteration,
structural_totals,
loop_start_ms,
current_max,
)
if structural_exhaustion.exhausted {
__agent_loop_emit_budget_exhausted(session.session_id, structural_exhaustion)
budget_exhausted_emitted = true
budget_decisions = __agent_loop_record_budget_stop(
budget_decisions,
iteration,
current_max,
structural_exhaustion.kind,
)
final_status = "budget_exhausted"
stop_reason = structural_exhaustion.kind
break
}
continue
} else if !(structural_verdict?.skipped ?? false) {
structural_validator_attempts = 0
}
if stall_judge_due && stall_warning != nil {
let stall_verify_opts = turn_opts
+ {_done_judge_due: true, _done_judge_trigger: "stalled"}
let stall_verdict = agent_verify_or_continue(session, stall_verify_opts, "stalled", visible_text, iteration_index + 1)
if stall_verdict?.done_judge_invoked ?? false {
done_judge_invocations = done_judge_invocations + 1
}
if stall_verdict.vetoed {
if stall_observation.feedback_deferred {
stall_state = agent_stall_inject_feedback(
session.session_id,
stall_warning,
stall_observation.config,
stall_state,
)
}
} else {
let stall_totals = agent_session_record_usage(session.session_id, llm_result, turn_opts, iteration_index + 1)
let tool_count = len(tool_calls)
agent_emit_event(
session.session_id,
"iteration_end",
{
iteration: iteration_index + 1,
iteration_info: __agent_loop_iteration_info(
session.session_id,
llm_result,
tool_count,
visible_text,
stall_totals,
loop_start_ms,
),
},
)
let stall_exhaustion = __agent_loop_budget_exhaustion(
session.session_id,
budget,
iteration,
stall_totals,
loop_start_ms,
current_max,
)
if stall_exhaustion.exhausted {
__agent_loop_emit_budget_exhausted(session.session_id, stall_exhaustion)
budget_exhausted_emitted = true
budget_decisions = __agent_loop_record_budget_stop(budget_decisions, iteration, current_max, stall_exhaustion.kind)
final_status = "budget_exhausted"
stop_reason = stall_exhaustion.kind
break
}
let stalled_done_checkpoint = __agent_loop_checkpoint(session.session_id, "iteration_end", {iteration: iteration_index + 1})
if stalled_done_checkpoint.delivered > 0 {
continue
}
final_status = "done"
stop_reason = "stalled_done_judge"
break
}
}
if turn_opts?.step_judge != nil {
let remaining_iterations = current_max - iteration_index
let step_verdict = agent_step_judge(
session,
llm_result,
turn_opts,
iteration_index + 1,
stall_warning,
step_judge_attempts,
remaining_iterations,
)
if step_verdict.vetoed {
step_judge_attempts = step_judge_attempts + 1
let on_veto = step_verdict?.on_veto ?? "replace"
if on_veto == "replace" {
agent_session_pop_last_assistant(session.session_id)
}
let critique = step_verdict?.feedback ?? step_verdict?.critique ?? ""
if critique != "" {
agent_session_inject_feedback(session.session_id, "step_judge", critique)
}
let step_totals = agent_session_record_usage(session.session_id, llm_result, turn_opts, iteration_index + 1)
agent_emit_event(
session.session_id,
"iteration_end",
{
iteration: iteration_index + 1,
iteration_info: __agent_loop_iteration_info(
session.session_id,
llm_result,
0,
visible_text,
step_totals,
loop_start_ms,
)
+ {
dispatch_skipped: true,
skip_reason: "step_judge_revise",
on_veto: on_veto,
step_judge_attempts: step_judge_attempts,
},
},
)
let step_exhaustion = __agent_loop_budget_exhaustion(
session.session_id,
budget,
iteration,
step_totals,
loop_start_ms,
current_max,
)
if step_exhaustion.exhausted {
__agent_loop_emit_budget_exhausted(session.session_id, step_exhaustion)
budget_exhausted_emitted = true
budget_decisions = __agent_loop_record_budget_stop(budget_decisions, iteration, current_max, step_exhaustion.kind)
final_status = "budget_exhausted"
stop_reason = step_exhaustion.kind
break
}
continue
} else if !(step_verdict?.skipped ?? false) {
step_judge_attempts = 0
}
}
let pre_dispatch_checkpoint = __agent_loop_checkpoint(session.session_id, "pre_tool_dispatch", {iteration: iteration_index + 1})
if pre_dispatch_checkpoint.dispatch_skipped {
let tool_count_skipped = len(tool_calls)
let totals_skipped = agent_session_record_usage(session.session_id, llm_result, turn_opts, iteration_index + 1)
agent_emit_event(
session.session_id,
"iteration_end",
{
iteration: iteration_index + 1,
iteration_info: __agent_loop_iteration_info(
session.session_id,
llm_result,
tool_count_skipped,
visible_text,
totals_skipped,
loop_start_ms,
)
+ {dispatch_skipped: true, skip_reason: "interrupt_immediate"},
},
)
let skipped_exhaustion = __agent_loop_budget_exhaustion(
session.session_id,
budget,
iteration,
totals_skipped,
loop_start_ms,
current_max,
)
if skipped_exhaustion.exhausted {
__agent_loop_emit_budget_exhausted(session.session_id, skipped_exhaustion)
budget_exhausted_emitted = true
budget_decisions = __agent_loop_record_budget_stop(budget_decisions, iteration, current_max, skipped_exhaustion.kind)
final_status = "budget_exhausted"
stop_reason = skipped_exhaustion.kind
break
}
if agent_budget_post_call_blocked(totals_skipped, turn_opts) {
final_status = "budget_exhausted"
break
}
consecutive_text_only = __next_text_only_count(0, consecutive_text_only)
last_tool_count = 0
continue
}
let dispatched = __dispatch_tool_calls(
session.session_id,
tool_calls,
turn_opts + {_iteration: iteration_index + 1, _tool_caller: opts?._tool_caller},
)
let dispatch = dispatched.dispatch
audit_background_tasks = __spawn_audit_flushes(audit_background_tasks, dispatched.audit_flushes)
opts = __sync_tool_search_state(opts, dispatched.turn_opts)
successful_tools_seen = __merge_tool_names(successful_tools_seen, __tool_names_by_status(dispatch, true))
rejected_tools_seen = __merge_tool_names(rejected_tools_seen, __tool_names_by_status(dispatch, false))
let totals = agent_session_record_usage(session.session_id, llm_result, turn_opts, iteration_index + 1)
let tool_count = len(tool_calls)
agent_emit_event(
session.session_id,
"iteration_end",
{
iteration: iteration_index + 1,
iteration_info: __agent_loop_iteration_info(
session.session_id,
llm_result,
tool_count,
visible_text,
totals,
loop_start_ms,
),
},
)
let post_dispatch_checkpoint = __agent_loop_checkpoint(session.session_id, "post_tool_dispatch", {iteration: iteration_index + 1})
let bridge_step_delivered = post_dispatch_checkpoint.delivered
let exhaustion = __agent_loop_budget_exhaustion(
session.session_id,
budget,
iteration,
totals,
loop_start_ms,
current_max,
)
if exhaustion.exhausted {
__agent_loop_emit_budget_exhausted(session.session_id, exhaustion)
budget_exhausted_emitted = true
budget_decisions = __agent_loop_record_budget_stop(budget_decisions, iteration, current_max, exhaustion.kind)
final_status = "budget_exhausted"
stop_reason = exhaustion.kind
break
}
if agent_budget_post_call_blocked(totals, turn_opts) {
final_status = "budget_exhausted"
break
}
consecutive_text_only = __next_text_only_count(tool_count, consecutive_text_only)
last_tool_count = tool_count
let turn_successful = __tool_names_by_status(dispatch, true)
let turn_rejected = __tool_names_by_status(dispatch, false)
let turn_max_nudges = turn_opts?.max_nudges ?? 8
let turn_loop_until_done = turn_opts?.loop_until_done ?? false
if turn_loop_until_done && tool_count == 0 && consecutive_text_only > turn_max_nudges {
final_status = "stuck"
stop_reason = "max_nudges"
break
}
let missing_required_for_loop = agent_required_tools_missing_from_session(opts, successful_tools_seen)
let cadence_loop_state = agent_loop_snapshot_state(
{
iteration: iteration,
current_limit: current_max,
budget_max: budget.max,
extensions_used: extensions_used,
tool_count: tool_count,
turn_successful: turn_successful,
turn_rejected: turn_rejected,
visible_text: visible_text,
turn_native_fallback_used: fallback_outcome.triggered && fallback_outcome.accepted,
session_successful: successful_tools_seen,
session_rejected: rejected_tools_seen,
missing_required_tools: missing_required_for_loop,
completion_proposed: false,
verdict: nil,
}
+ __agent_loop_state_budget_fields(
session.session_id,
totals,
loop_start_ms,
budget,
consecutive_failure_count,
),
)
let post_turn_opts = turn_opts
+ {
_session_successful_tools: successful_tools_seen,
_session_rejected_tools: rejected_tools_seen,
_consecutive_text_only: consecutive_text_only,
_done_judge_invocations: done_judge_invocations,
_done_judge_loop_state: cadence_loop_state,
}
let outcome = agent_compute_post_turn(
session,
normalized + {raw_text: raw_text, parsed_done_marker: parsed?.done_marker ?? ""},
dispatch,
post_turn_opts,
iteration_index,
)
opts = __apply_post_turn_options(opts, outcome)
var should_continue = outcome.kind == "continue" || bridge_step_delivered > 0
var verdict_record = nil
if should_continue {
if opts?.daemon && len(tool_calls) == 0 {
agent_daemon_step(session, opts, iteration)
}
} else {
if outcome.needs_verify {
if verify_attempts >= max_verify_attempts {
final_status = "verify_exhausted"
stop_reason = outcome.stop_reason
break
}
let verify_opts = turn_opts + {_done_judge_due: outcome?.done_judge_due ?? true}
let verdict = agent_verify_or_continue(
session,
verify_opts,
outcome.stop_reason,
llm_result.text,
iteration_index,
)
verdict_record = verdict
if verdict?.done_judge_invoked ?? false {
done_judge_invocations = done_judge_invocations + 1
}
if verdict.vetoed {
verify_attempts = verify_attempts + 1
should_continue = true
}
}
let missing_now = agent_required_tools_missing_from_session(opts, successful_tools_seen)
if !should_continue && len(missing_now) > 0 {
agent_required_tools_inject_feedback(session.session_id, missing_now)
should_continue = true
}
if !should_continue {
stop_reason = outcome.stop_reason
break
}
}
let loop_state = agent_loop_snapshot_state(
{
iteration: iteration,
current_limit: current_max,
budget_max: budget.max,
extensions_used: extensions_used,
tool_count: tool_count,
turn_successful: turn_successful,
turn_rejected: turn_rejected,
visible_text: visible_text,
turn_native_fallback_used: fallback_outcome.triggered && fallback_outcome.accepted,
session_successful: successful_tools_seen,
session_rejected: rejected_tools_seen,
missing_required_tools: missing_required_for_loop,
completion_proposed: outcome.kind == "break",
verdict: verdict_record,
}
+ __agent_loop_state_budget_fields(
session.session_id,
totals,
loop_start_ms,
budget,
consecutive_failure_count,
),
)
let command = agent_loop_control_invoke(opts, budget, loop_state)
let applied = agent_loop_apply_command(
{
command: command,
session_id: session.session_id,
iteration: iteration,
current_max: current_max,
extensions_used: extensions_used,
decisions: budget_decisions,
budget: budget,
},
)
current_max = applied.current_max
extensions_used = applied.extensions_used
budget_decisions = applied.decisions
if applied.stop {
final_status = applied.final_status
stop_reason = applied.stop_reason
break
}
}
if final_status == "" && iteration >= current_max && stop_reason == nil {
final_status = "budget_exhausted"
}
if final_status == "budget_exhausted" && !budget_exhausted_emitted {
let terminal_exhaustion = __agent_loop_budget_aggregates(session.session_id, nil, loop_start_ms)
+ {
exhausted: true,
kind: stop_reason ?? "max_iterations",
iteration: iteration,
max_iterations: current_max,
}
__agent_loop_emit_budget_exhausted(session.session_id, terminal_exhaustion)
budget_exhausted_emitted = true
budget_decisions = __agent_loop_record_budget_stop(budget_decisions, iteration, current_max, terminal_exhaustion.kind)
}
if opts?.daemon && final_status != "" {
agent_daemon_snapshot(session, opts, final_status, iteration)
}
try {
__host_drain_file_edits(session.session_id)
} catch (e) {
nil
}
__agent_loop_checkpoint(session.session_id, "loop_exit", {iteration: iteration})
__drain_audit_flushes(audit_background_tasks)
audit_background_tasks = []
let result = agent_session_finalize(
session.session_id,
{
final_status: final_status,
stop_reason: stop_reason ?? "",
iterations: iteration,
error: terminal_error,
},
)
session_finalized = true
let result_with_stalls = agent_stall_apply_result(result, stall_enabled_seen, stall_state)
let enforced = if suspend_result != nil {
result_with_stalls
} else {
agent_required_tools_enforce(result_with_stalls, opts)
}
var final_result = enforced
if budget.expose_decisions {
final_result = final_result
+ {
adaptive_budget: {
mode: budget.mode,
initial: budget.initial,
max: budget.max,
final_limit: current_max,
extensions_used: extensions_used,
decisions: budget_decisions,
},
}
}
if suspend_result != nil {
final_result = final_result + suspend_result
}
final_result
}
if is_err(run) {
__drain_audit_flushes(audit_background_tasks)
if !session_finalized {
__agent_loop_finalize_failed(session, iteration)
}
throw unwrap_err(run)
}
return unwrap(run)
}
// Host pushed an `interrupt_immediate` injection between the
// model emitting tool calls and the dispatcher starting. Honor
// the host's "stop, do this instead" by skipping the pending
// tool batch — the injection is already in the transcript, so
// the next iteration's LLM call will see it and the model can
// decide whether to re-issue, modify, or abandon the call. The
// post-turn machinery (judge, required-tools, done sentinel) is
// intentionally bypassed because nothing dispatched; budget is
// still honored because the LLM call's tokens were spent.
// Drain async events (long-running tool completions, MCP server
// progress, command-policy feedback, file-edited nudges, trigger
// events) BEFORE compaction so the summarizer sees the freshest
// transcript. Without this, anything that landed between turns
// would be elided from the summary.
// Drain again AFTER compaction: when Tier-2 LLM summarization
// runs it can take 5-30s, and any async producer (tool worker,
// MCP `notifications/progress`, GitHub PR merge trigger,
// `command_policy` post-hook) firing in that window must be
// visible in *this* turn's prompt, not the one after.
/**
* agent_loop.
*
* @effects: [host, agent]
* @allocation: heap
* @errors: []
* @api_stability: experimental
* @example: agent_loop(message, system_prompt, options)
*/
pub fn agent_loop(message, system_prompt = nil, options = nil) {
var opts = agent_loop_options(agent_progress_apply_options(options))
opts = opts + {tools: agent_lifecycle_tools(opts?.tools, opts)}
if system_prompt != nil && system_prompt != "" {
opts = opts + {system: system_prompt}
}
let session = agent_session_init(message, system_prompt, opts)
if session?.done {
return session.result
}
if opts?._tool_format_override != nil {
agent_emit_event(session.session_id, "tool_format_override", opts._tool_format_override)
}
if opts?._tool_format_capability_gap != nil {
agent_emit_event(session.session_id, "capability_gap", opts._tool_format_capability_gap)
}
defer {
try {
__host_mcp_disconnect(session.session_id)
} catch (e) {
}
}
opts = __agent_loop_fire_resume_continuity(session, opts)
return __agent_loop_run(message, session, opts)
}