harn-stdlib 0.7.59

Embedded Harn standard library source catalog
Documentation
import { completion_judge_feedback_prompt, completion_judge_system_prompt } from "std/agent/prompts"

fn __profile_defaults(profile) {
  if profile == "tool_using" {
    return {max_iterations: 50, max_nudges: 8, tool_retries: 0, llm_retries: 2, schema_retries: 0}
  }
  if profile == "researcher" {
    return {max_iterations: 30, max_nudges: 4, tool_retries: 0, llm_retries: 2, schema_retries: 0}
  }
  if profile == "verifier" {
    return {max_iterations: 5, max_nudges: 0, tool_retries: 0, llm_retries: 2, schema_retries: 3}
  }
  if profile == "completer" {
    return {max_iterations: 1, max_nudges: 0, tool_retries: 0, llm_retries: 2, schema_retries: 0}
  }
  throw "agent_loop: profile must be one of tool_using, researcher, verifier, completer"
}

fn __completion_judge_defaults(value, opts) {
  if type_of(value) == "bool" && value {
    return {
      system: completion_judge_system_prompt(opts),
      feedback_fallback: completion_judge_feedback_prompt(opts),
    }
  }
  return value
}

fn __has_key(opts, key) {
  return contains(opts.keys(), key)
}

fn __judge_enabled(value) {
  if value == nil {
    return false
  }
  if type_of(value) == "bool" {
    return value
  }
  if type_of(value) == "dict" {
    let enabled = value?.enabled
    if enabled == nil {
      return true
    }
    return enabled
  }
  return true
}

fn __client_tool_search_requested(value) {
  return type_of(value) == "dict" && value?.mode == "client"
}

fn __model_tool_format(opts) {
  if opts?.model == nil {
    return nil
  }
  let resolved = try {
    llm_resolve_model(opts.model)
  }
  if is_err(resolved) {
    return nil
  }
  return unwrap(resolved)?.tool_format
}

fn __native_tools_complete_naturally(opts) {
  return opts?.tools != nil && opts?.tool_format == "native"
}

fn __task_ledger_deliverable_text(value) {
  let text = if type_of(value) == "string" {
    value
  } else {
    to_string(value)
  }
  return trim(text)
}

fn __task_ledger_from_shorthand(opts) {
  let root_task = trim(opts?.root_task ?? "")
  var deliverables = []
  let shorthand = opts?.deliverables
  if type_of(shorthand) == "list" {
    for (index, item) in iter(shorthand).enumerate() {
      let text = __task_ledger_deliverable_text(item)
      if text != "" {
        deliverables = deliverables
          .push(
          {id: "deliverable-" + to_string(index + 1), text: text, status: "open", note: nil},
        )
      }
    }
  }
  if root_task == "" && len(deliverables) == 0 {
    return nil
  }
  return {root_task: root_task, deliverables: deliverables, rationale: "", observations: []}
}

fn __with_task_ledger_shorthand(opts) {
  if __has_key(opts, "task_ledger") {
    return opts
  }
  let ledger = __task_ledger_from_shorthand(opts)
  if ledger == nil {
    return opts
  }
  return opts + {task_ledger: ledger}
}

fn __validate_done_sentinel(value) {
  if value == nil {
    return
  }
  let kind = type_of(value)
  if kind != "string" {
    throw "agent_loop: `done_sentinel` must be a non-empty string or nil; got " + kind
  }
  if trim(value) == "" {
    throw "agent_loop: `done_sentinel` must be a non-empty string or nil; got empty string"
  }
}

fn __validate_verify_completion(value) {
  if value == nil {
    return
  }
  if type_of(value) != "closure" {
    throw "agent_loop: `verify_completion` must be a closure or nil; got " + type_of(value)
  }
}

fn __validate_judge_dict_or_bool(label, value) {
  if value == nil {
    return
  }
  let kind = type_of(value)
  if kind != "dict" && kind != "bool" {
    throw "agent_loop: `" + label + "` must be a dict, bool, or nil; got " + kind
  }
}

fn __autonomy_tier(value) {
  let tier = value ?? "act_auto"
  if tier == "shadow" || tier == "suggest" || tier == "act_with_approval" || tier == "act_auto" {
    return tier
  }
  throw "autonomy_policy: tier must be one of shadow, suggest, act_with_approval, act_auto"
}

/** autonomy_policy returns the VM-enforced autonomy assignment for an agent. */
pub fn autonomy_policy(tier = "act_auto", options = nil) {
  let opts = options ?? {}
  return {
    agent_id: opts?.agent_id ?? opts?.agent,
    autonomy_tier: __autonomy_tier(tier),
    action_tiers: opts?.action_tiers ?? {},
    agent_tiers: opts?.agent_tiers ?? {},
    agent_action_tiers: opts?.agent_action_tiers ?? {},
    reviewers: opts?.reviewers ?? [],
  }
}

/** agent_loop_options. */
pub fn agent_loop_options(options = nil) {
  var opts = options ?? {}
  if __has_key(opts, "persistent") {
    throw "agent_loop: `persistent` was removed; use `loop_until_done` for completion looping and `session_id` for transcript persistence"
  }
  if __has_key(opts, "done_sentinel") {
    __validate_done_sentinel(opts.done_sentinel)
  }
  if __has_key(opts, "verify_completion") {
    __validate_verify_completion(opts.verify_completion)
  }
  if __has_key(opts, "verify_completion_judge") {
    __validate_judge_dict_or_bool("verify_completion_judge", opts.verify_completion_judge)
  }
  if __has_key(opts, "done_judge") {
    __validate_judge_dict_or_bool("done_judge", opts.done_judge)
  }
  let profile = opts?.profile ?? "tool_using"
  let defaults = __profile_defaults(profile)
  opts = defaults + opts
  if !__has_key(opts, "tool_format") && opts?.tools != nil {
    let model_tool_format = __model_tool_format(opts)
    if model_tool_format != nil {
      opts = opts + {tool_format: model_tool_format}
    }
  }
  if !__has_key(opts, "tool_format")
    && (opts?.tools == nil || __client_tool_search_requested(opts?.tool_search)) {
    opts = opts + {tool_format: "text"}
  }
  if opts?.loop_until_done ?? false && !__has_key(opts, "done_sentinel") {
    opts = opts
      + {
      done_sentinel: if __native_tools_complete_naturally(opts) {
        nil
      } else {
        "##DONE##"
      },
    }
  }
  if __has_key(opts, "done_judge") && __judge_enabled(opts?.done_judge)
    && !__has_key(opts, "done_sentinel")
    && !__native_tools_complete_naturally(opts) {
    opts = opts + {done_sentinel: "##DONE##"}
  }
  if __has_key(opts, "verify_completion_judge") {
    if __judge_enabled(opts?.verify_completion_judge) {
      opts = opts
        + {verify_completion_judge: __completion_judge_defaults(opts?.verify_completion_judge, opts)}
    } else {
      opts = opts + {verify_completion_judge: nil}
    }
  }
  if __has_key(opts, "done_judge") {
    if __judge_enabled(opts?.done_judge) {
      opts = opts + {done_judge: __completion_judge_defaults(opts?.done_judge, opts)}
    } else {
      opts = opts + {done_judge: nil}
    }
  }
  opts = __with_task_ledger_shorthand(opts)
  return opts
}