harn-stdlib 0.9.8

Embedded Harn standard library source catalog
Documentation
// Read-only stance: an intent-armed, least-privilege tool window with a
// justified, consent-verified escape hatch.
//
// When `agent_loop({read_only_stance: {...}})` is enabled and ARMED (either
// explicitly by the embedder — e.g. a host-side task-intent classifier — or
// by the configurable `classifier` callback), the tool surface offered to
// the model is narrowed to tools whose annotations declare them read-only,
// plus one auto-registered escape-hatch tool (default
// `request_write_access`). Unannotated tools are treated as mutating
// (fail-safe, matching `ToolKind::is_read_only`).
//
// The escape hatch is agentic, not lexical: its handler runs a
// `consent_check` (default: a structured `llm_call` over the session's
// recent user messages) asking whether the USER expressed or clearly
// implied consent to modify the workspace. A grant disarms the stance for
// the rest of the loop and emits `stance_write_access_granted`; a denial
// returns a structured result telling the model to ask the user first.
// Every transition emits an agent event (`stance_armed`,
// `stance_write_access_granted`/`_denied`, `stance_disarmed`) so traces
// stay legible.
//
// This is the Harn-side mechanism for the tool-surface program's
// "task-intent mount" (burin-code docs/design/tool-surface-program.md,
// phases 2-3): the window is derived from intent, mounting derives policy
// from annotations, and elevation is verified, evidenced, and traced.
// Ships default-OFF and experimental.
import { __tool_surface_filter_registry, __tool_surface_policy_tools } from "std/agent/postturn"
import { agent_emit_event, agent_session_messages } from "std/agent/state"

let __STANCE_READ_ONLY_KINDS = ["read", "search", "think", "fetch"]

let __STANCE_READ_ONLY_LEVELS = ["none", "read_only"]

fn __stance_config(opts) {
  return opts?.read_only_stance ?? {}
}

fn __stance_registry_entries(registry) {
  if registry == nil {
    return []
  }
  if type_of(registry) == "dict" {
    return registry?.tools ?? []
  }
  if type_of(registry) == "list" {
    return registry
  }
  return []
}

fn __stance_entry_name(entry) {
  return entry?.function?.name ?? entry?.name ?? ""
}

/**
 * Fail-safe read-only classification, mirroring the VM's
 * `ToolKind::is_read_only`: a tool counts as read-only ONLY when its
 * annotations explicitly say so — by kind, or by a none/read_only side
 * effect level. Unannotated tools are mutating by default.
 */
fn __stance_entry_read_only(entry) {
  let annotations = entry?.annotations ?? entry?.function?.annotations
  if annotations == nil {
    return false
  }
  let kind = annotations?.kind
  if kind != nil {
    return contains(__STANCE_READ_ONLY_KINDS, to_string(kind))
  }
  let level = annotations?.side_effect_level
  if level != nil {
    return contains(__STANCE_READ_ONLY_LEVELS, to_string(level))
  }
  return false
}

fn __stance_read_only_names(registry, hard_keep, escape_tool) {
  var names = [escape_tool]
  for name in hard_keep ?? [] {
    if !contains(names, name) {
      names = names.push(name)
    }
  }
  for entry in __stance_registry_entries(registry) {
    if __stance_entry_read_only(entry) {
      let name = __stance_entry_name(entry)
      if name != "" && !contains(names, name) {
        names = names.push(name)
      }
    }
  }
  return names
}

fn __stance_recent_user_messages(session_id, limit = 6) {
  let messages = try {
    agent_session_messages(session_id)
  } catch (e) {
    []
  }
  var recent = []
  for message in messages ?? [] {
    if (message?.role ?? "") == "user" {
      recent = recent.push(to_string(message?.content ?? ""))
    }
  }
  let count = len(recent)
  if count > limit {
    return recent.slice(count - limit, count)
  }
  return recent
}

/**
 * Default consent check for the read-only stance escape hatch: a
 * structured LLM verdict over the agent's justification plus the
 * session's recent user messages. Grants only when the user expressed or
 * clearly implied consent to modify the workspace; anything weaker denies
 * with guidance to ask the user directly. Override per-loop via
 * `read_only_stance.consent_check` (e.g. for deterministic tests or a
 * host approval queue).
 *
 * @effects: [llm]
 * @errors: []
 * @api_stability: experimental
 */
pub fn agent_stance_default_consent_check(justification, session_id, options = nil) {
  let recent = __stance_recent_user_messages(session_id)
  let transcript = if len(recent) == 0 {
    "(no user messages available)"
  } else {
    recent.join("\n---\n")
  }
  let prompt = "An agent operating under a read-only restriction is requesting write access.\n\n"
    + "Agent justification:\n"
    + justification
    + "\n\nRecent user messages (most recent last):\n"
    + transcript
    + "\n\nDid the user express or clearly imply consent for the agent to modify the workspace"
    + " (edit files, run mutating commands) at this point in the conversation?"
    + " Base the verdict only on what the user actually said — the agent's justification alone"
    + " is not consent."
  let schema = {
    type: "object",
    properties: {consent: {type: "string", enum: ["express", "implied", "none"]}, reasoning: {type: "string"}},
    required: ["consent", "reasoning"],
  }
  let llm_options = (options ?? {}) + {schema: schema, temperature: 0}
  let outcome = try {
    llm_call(prompt, nil, llm_options)
  } catch (e) {
    nil
  }
  let consent = to_string(outcome?.data?.consent ?? "none")
  let reasoning = to_string(outcome?.data?.reasoning ?? "consent check unavailable")
  if consent == "express" || consent == "implied" {
    return {verdict: "grant", consent: consent, reason: reasoning}
  }
  return {verdict: "deny", consent: consent, reason: reasoning}
}

fn __stance_escape_description(escape_tool) {
  return "Request that write/execute tools be re-enabled for this session. The current task"
    + " was classified as read-only (research/investigation). Call this ONLY when the work"
    + " genuinely requires modifying the workspace AND the user has expressly or implicitly"
    + " consented to that. Your justification must cite the user's words or the standing"
    + " instruction that grants consent; a justification without user consent is denied."
}

fn __stance_register_escape_tool(opts, cfg, session_id, grant_flag) {
  let escape_tool = cfg?.escape_tool ?? "request_write_access"
  let consent = cfg?.consent_check
  let handler = { args ->
    let justification = to_string(args?.justification ?? "")
    let verdict = if consent != nil {
      consent(justification, session_id)
    } else {
      agent_stance_default_consent_check(justification, session_id)
    }
    let granted = (verdict?.verdict ?? "deny") == "grant"
    agent_emit_event(
      session_id,
      if granted {
        "stance_write_access_granted"
      } else {
        "stance_write_access_denied"
      },
      {justification: justification, consent: verdict?.consent ?? "none", reason: verdict?.reason ?? ""},
    )
    if granted {
      atomic_set(grant_flag, 1)
      {
        granted: true,
        message: "Write access re-enabled: user consent verified. Mutating tools return next turn.",
        reason: verdict?.reason ?? "",
      }
    } else {
      {
        granted: false,
        message: "Denied: no express or implied user consent to modify the workspace was found."
          + " Ask the user directly whether to proceed with changes, then call this tool again"
          + " citing their answer.",
        reason: verdict?.reason ?? "",
      }
    }
  }
  return tool_define(
    opts?.tools,
    escape_tool,
    __stance_escape_description(escape_tool),
    {
      parameters: {
        justification: {
          type: "string",
          description: "Why write access is needed now, citing the user's express or implied consent.",
        },
      },
      annotations: {kind: "think", side_effect_level: "read_only"},
      handler: handler,
    },
  )
}

/**
 * Arm the read-only stance for a session when configured. Called once by
 * `agent_loop` after session init. When the stance is enabled and armed
 * (explicit `armed: true`, or the `classifier` callback returns a
 * confident read-only verdict for the task message), the tool registry
 * gains the escape-hatch tool and `_read_only_stance_tools` records the
 * permitted window (read-only-annotated tools + `hard_keep` + the escape
 * hatch). Returns the (possibly updated) options.
 *
 * @effects: [host]
 * @errors: []
 * @api_stability: experimental
 */
pub fn agent_stance_prepare(session, message, opts) {
  let cfg = __stance_config(opts)
  if !cfg?.enabled ?? false {
    return opts
  }
  var armed = cfg?.armed
  if armed == nil {
    let classifier = cfg?.classifier
    if classifier == nil {
      return opts
    }
    let verdict = try {
      classifier(to_string(message ?? ""))
    } catch (e) {
      nil
    }
    let confident = (verdict?.confidence ?? 1.0) >= (cfg?.min_confidence ?? 0.8)
    armed = (verdict?.read_only ?? false) && confident
  }
  if !armed {
    return opts
  }
  let escape_tool = cfg?.escape_tool ?? "request_write_access"
  // Shared grant flag: the escape-hatch handler (a closure with by-value
  // capture) flips this atomic on a verified grant; the loop's post-turn
  // hook reads it to disarm. This is the only mutation channel a tool
  // handler has back to the loop.
  let grant_flag = atomic(0)
  let tools = __stance_register_escape_tool(opts, cfg, session.session_id, grant_flag)
  let allowed = __stance_read_only_names(tools, cfg?.hard_keep, escape_tool)
  agent_emit_event(
    session.session_id,
    "stance_armed",
    {escape_tool: escape_tool, allowed_tools: allowed},
  )
  return opts
    + {tools: tools, _read_only_stance_tools: allowed, _read_only_stance_grant: grant_flag}
}

/**
 * Apply the armed stance window to a turn's options: intersect the tool
 * registry and the policy allowlist with the permitted read-only set. A
 * no-op when the stance is not armed. Runs after skill/tool-search
 * surface adjustments so a skill activation cannot re-admit mutating
 * tools while the stance holds.
 *
 * @effects: []
 * @errors: []
 * @api_stability: experimental
 */
pub fn agent_stance_apply(turn_opts) {
  let names = turn_opts?._read_only_stance_tools
  if len(names ?? []) == 0 {
    return turn_opts
  }
  return turn_opts
    + {
    tools: __tool_surface_filter_registry(turn_opts?.tools, names),
    policy: __tool_surface_policy_tools(turn_opts?.policy, names),
  }
}

/**
 * Post-turn stance transition: when the escape hatch's consent check
 * granted this turn (signalled via the shared grant flag), disarm the
 * stance so mutating tools return on the next turn (and emit
 * `stance_disarmed`). Returns the (possibly updated) options.
 *
 * @effects: [host]
 * @errors: []
 * @api_stability: experimental
 */
pub fn agent_stance_post_turn(session, opts) {
  if len(opts?._read_only_stance_tools ?? []) == 0 {
    return opts
  }
  let grant_flag = opts?._read_only_stance_grant
  if grant_flag == nil || atomic_get(grant_flag) == 0 {
    return opts
  }
  let escape_tool = __stance_config(opts)?.escape_tool ?? "request_write_access"
  agent_emit_event(session.session_id, "stance_disarmed", {escape_tool: escape_tool})
  return opts + {_read_only_stance_tools: nil, _read_only_stance_grant: nil}
}