// Read-only stance: an intent-armed, least-privilege tool window with a
// justified, consent-verified escape hatch.
//
// When `agent_loop({read_only_stance: {...}})` is enabled and ARMED (either
// explicitly by the embedder — e.g. a host-side task-intent classifier — or
// by the configurable `classifier` callback), the tool surface offered to
// the model is narrowed to tools whose annotations declare them read-only,
// plus one auto-registered escape-hatch tool (default
// `request_write_access`). Unannotated tools are treated as mutating
// (fail-safe, matching `ToolKind::is_read_only`).
//
// The escape hatch is agentic, not lexical: its handler runs a
// `consent_check` (default: a structured `llm_call` over the session's
// recent user messages) asking whether the USER expressed or clearly
// implied consent to modify the workspace. A grant disarms the stance for
// the rest of the loop and emits `stance_write_access_granted`; a denial
// returns a structured result telling the model to ask the user first.
// Every transition emits an agent event (`stance_armed`,
// `stance_write_access_granted`/`_denied`, `stance_disarmed`) so traces
// stay legible.
//
// This is the Harn-side mechanism for the tool-surface program's
// "task-intent mount" (burin-code docs/design/tool-surface-program.md,
// phases 2-3): the window is derived from intent, mounting derives policy
// from annotations, and elevation is verified, evidenced, and traced.
// Ships default-OFF and experimental.
import { __tool_surface_filter_registry, __tool_surface_policy_tools } from "std/agent/postturn"
import { agent_emit_event, agent_session_messages } from "std/agent/state"
let __STANCE_READ_ONLY_KINDS = ["read", "search", "think", "fetch"]
let __STANCE_READ_ONLY_LEVELS = ["none", "read_only"]
fn __stance_config(opts) {
return opts?.read_only_stance ?? {}
}
fn __stance_registry_entries(registry) {
if registry == nil {
return []
}
if type_of(registry) == "dict" {
return registry?.tools ?? []
}
if type_of(registry) == "list" {
return registry
}
return []
}
fn __stance_entry_name(entry) {
return entry?.function?.name ?? entry?.name ?? ""
}
/**
* Fail-safe read-only classification, mirroring the VM's
* `ToolKind::is_read_only`: a tool counts as read-only ONLY when its
* annotations explicitly say so — by kind, or by a none/read_only side
* effect level. Unannotated tools are mutating by default.
*/
fn __stance_entry_read_only(entry) {
let annotations = entry?.annotations ?? entry?.function?.annotations
if annotations == nil {
return false
}
let kind = annotations?.kind
if kind != nil {
return contains(__STANCE_READ_ONLY_KINDS, to_string(kind))
}
let level = annotations?.side_effect_level
if level != nil {
return contains(__STANCE_READ_ONLY_LEVELS, to_string(level))
}
return false
}
fn __stance_read_only_names(registry, hard_keep, escape_tool) {
var names = [escape_tool]
for name in hard_keep ?? [] {
if !contains(names, name) {
names = names.push(name)
}
}
for entry in __stance_registry_entries(registry) {
if __stance_entry_read_only(entry) {
let name = __stance_entry_name(entry)
if name != "" && !contains(names, name) {
names = names.push(name)
}
}
}
return names
}
fn __stance_recent_user_messages(session_id, limit = 6) {
let messages = try {
agent_session_messages(session_id)
} catch (e) {
[]
}
var recent = []
for message in messages ?? [] {
if (message?.role ?? "") == "user" {
recent = recent.push(to_string(message?.content ?? ""))
}
}
let count = len(recent)
if count > limit {
return recent.slice(count - limit, count)
}
return recent
}
/**
* Default consent check for the read-only stance escape hatch: a
* structured LLM verdict over the agent's justification plus the
* session's recent user messages. Grants only when the user expressed or
* clearly implied consent to modify the workspace; anything weaker denies
* with guidance to ask the user directly. Override per-loop via
* `read_only_stance.consent_check` (e.g. for deterministic tests or a
* host approval queue).
*
* @effects: [llm]
* @errors: []
* @api_stability: experimental
*/
pub fn agent_stance_default_consent_check(justification, session_id, options = nil) {
let recent = __stance_recent_user_messages(session_id)
let transcript = if len(recent) == 0 {
"(no user messages available)"
} else {
recent.join("\n---\n")
}
let prompt = "An agent operating under a read-only restriction is requesting write access.\n\n"
+ "Agent justification:\n"
+ justification
+ "\n\nRecent user messages (most recent last):\n"
+ transcript
+ "\n\nDid the user express or clearly imply consent for the agent to modify the workspace"
+ " (edit files, run mutating commands) at this point in the conversation?"
+ " Base the verdict only on what the user actually said — the agent's justification alone"
+ " is not consent."
let schema = {
type: "object",
properties: {consent: {type: "string", enum: ["express", "implied", "none"]}, reasoning: {type: "string"}},
required: ["consent", "reasoning"],
}
let llm_options = (options ?? {}) + {schema: schema, temperature: 0}
let outcome = try {
llm_call(prompt, nil, llm_options)
} catch (e) {
nil
}
let consent = to_string(outcome?.data?.consent ?? "none")
let reasoning = to_string(outcome?.data?.reasoning ?? "consent check unavailable")
if consent == "express" || consent == "implied" {
return {verdict: "grant", consent: consent, reason: reasoning}
}
return {verdict: "deny", consent: consent, reason: reasoning}
}
fn __stance_escape_description(escape_tool) {
return "Request that write/execute tools be re-enabled for this session. The current task"
+ " was classified as read-only (research/investigation). Call this ONLY when the work"
+ " genuinely requires modifying the workspace AND the user has expressly or implicitly"
+ " consented to that. Your justification must cite the user's words or the standing"
+ " instruction that grants consent; a justification without user consent is denied."
}
fn __stance_register_escape_tool(opts, cfg, session_id, grant_flag) {
let escape_tool = cfg?.escape_tool ?? "request_write_access"
let consent = cfg?.consent_check
let handler = { args ->
let justification = to_string(args?.justification ?? "")
let verdict = if consent != nil {
consent(justification, session_id)
} else {
agent_stance_default_consent_check(justification, session_id)
}
let granted = (verdict?.verdict ?? "deny") == "grant"
agent_emit_event(
session_id,
if granted {
"stance_write_access_granted"
} else {
"stance_write_access_denied"
},
{justification: justification, consent: verdict?.consent ?? "none", reason: verdict?.reason ?? ""},
)
if granted {
atomic_set(grant_flag, 1)
{
granted: true,
message: "Write access re-enabled: user consent verified. Mutating tools return next turn.",
reason: verdict?.reason ?? "",
}
} else {
{
granted: false,
message: "Denied: no express or implied user consent to modify the workspace was found."
+ " Ask the user directly whether to proceed with changes, then call this tool again"
+ " citing their answer.",
reason: verdict?.reason ?? "",
}
}
}
return tool_define(
opts?.tools,
escape_tool,
__stance_escape_description(escape_tool),
{
parameters: {
justification: {
type: "string",
description: "Why write access is needed now, citing the user's express or implied consent.",
},
},
annotations: {kind: "think", side_effect_level: "read_only"},
handler: handler,
},
)
}
/**
* Arm the read-only stance for a session when configured. Called once by
* `agent_loop` after session init. When the stance is enabled and armed
* (explicit `armed: true`, or the `classifier` callback returns a
* confident read-only verdict for the task message), the tool registry
* gains the escape-hatch tool and `_read_only_stance_tools` records the
* permitted window (read-only-annotated tools + `hard_keep` + the escape
* hatch). Returns the (possibly updated) options.
*
* @effects: [host]
* @errors: []
* @api_stability: experimental
*/
pub fn agent_stance_prepare(session, message, opts) {
let cfg = __stance_config(opts)
if !cfg?.enabled ?? false {
return opts
}
var armed = cfg?.armed
if armed == nil {
let classifier = cfg?.classifier
if classifier == nil {
return opts
}
let verdict = try {
classifier(to_string(message ?? ""))
} catch (e) {
nil
}
let confident = (verdict?.confidence ?? 1.0) >= (cfg?.min_confidence ?? 0.8)
armed = (verdict?.read_only ?? false) && confident
}
if !armed {
return opts
}
let escape_tool = cfg?.escape_tool ?? "request_write_access"
// Shared grant flag: the escape-hatch handler (a closure with by-value
// capture) flips this atomic on a verified grant; the loop's post-turn
// hook reads it to disarm. This is the only mutation channel a tool
// handler has back to the loop.
let grant_flag = atomic(0)
let tools = __stance_register_escape_tool(opts, cfg, session.session_id, grant_flag)
let allowed = __stance_read_only_names(tools, cfg?.hard_keep, escape_tool)
agent_emit_event(
session.session_id,
"stance_armed",
{escape_tool: escape_tool, allowed_tools: allowed},
)
return opts
+ {tools: tools, _read_only_stance_tools: allowed, _read_only_stance_grant: grant_flag}
}
/**
* Apply the armed stance window to a turn's options: intersect the tool
* registry and the policy allowlist with the permitted read-only set. A
* no-op when the stance is not armed. Runs after skill/tool-search
* surface adjustments so a skill activation cannot re-admit mutating
* tools while the stance holds.
*
* @effects: []
* @errors: []
* @api_stability: experimental
*/
pub fn agent_stance_apply(turn_opts) {
let names = turn_opts?._read_only_stance_tools
if len(names ?? []) == 0 {
return turn_opts
}
return turn_opts
+ {
tools: __tool_surface_filter_registry(turn_opts?.tools, names),
policy: __tool_surface_policy_tools(turn_opts?.policy, names),
}
}
/**
* Post-turn stance transition: when the escape hatch's consent check
* granted this turn (signalled via the shared grant flag), disarm the
* stance so mutating tools return on the next turn (and emit
* `stance_disarmed`). Returns the (possibly updated) options.
*
* @effects: [host]
* @errors: []
* @api_stability: experimental
*/
pub fn agent_stance_post_turn(session, opts) {
if len(opts?._read_only_stance_tools ?? []) == 0 {
return opts
}
let grant_flag = opts?._read_only_stance_grant
if grant_flag == nil || atomic_get(grant_flag) == 0 {
return opts
}
let escape_tool = __stance_config(opts)?.escape_tool ?? "request_write_access"
agent_emit_event(session.session_id, "stance_disarmed", {escape_tool: escape_tool})
return opts + {_read_only_stance_tools: nil, _read_only_stance_grant: nil}
}