harn-stdlib 0.9.2

Embedded Harn standard library source catalog
Documentation
import { agent_session_inject_feedback, agent_session_inject_reminder } from "std/agent/state"
import { read_json } from "std/fs"
import { ext, is_absolute } from "std/path"

fn __canon_text(value) -> string {
  return trim(to_string(value ?? ""))
}

fn __canon_path(root, rel) -> string {
  let raw = __canon_text(rel)
  if raw == "" {
    return ""
  }
  if is_absolute(raw) || root == nil || __canon_text(root) == "" {
    return raw
  }
  return path_join(__canon_text(root), raw)
}

fn __canon_push_unique(items, value) {
  let text = __canon_text(value)
  if text == "" || contains(items, text) {
    return items
  }
  return items.push(text)
}

let __CANON_EXTENSION_PACKS = {
  bash: ["shell"],
  c: ["c"],
  cc: ["cpp"],
  cjs: ["javascript"],
  cpp: ["cpp"],
  cs: ["csharp"],
  css: ["css"],
  cts: ["typescript"],
  cxx: ["cpp"],
  dart: ["dart"],
  dockerfile: ["dockerfile"],
  ex: ["elixir"],
  exs: ["elixir"],
  go: ["go"],
  gql: ["graphql"],
  graphql: ["graphql"],
  h: ["c", "cpp"],
  harn: ["harn"],
  hh: ["cpp"],
  hpp: ["cpp"],
  hs: ["haskell"],
  htm: ["html"],
  html: ["html"],
  hxx: ["cpp"],
  java: ["java"],
  js: ["javascript"],
  json: ["json"],
  jsx: ["javascript"],
  ksh: ["shell"],
  kt: ["kotlin"],
  kts: ["kotlin"],
  less: ["css"],
  lua: ["lua"],
  markdown: ["markdown"],
  md: ["markdown"],
  mjs: ["javascript"],
  mts: ["typescript"],
  php: ["php"],
  proto: ["protobuf"],
  py: ["python"],
  pyw: ["python"],
  qmd: ["r"],
  r: ["r"],
  rb: ["ruby"],
  rmd: ["r"],
  rs: ["rust"],
  sass: ["css"],
  sc: ["scala"],
  scala: ["scala"],
  scss: ["css"],
  sh: ["shell"],
  sql: ["sql"],
  swift: ["swift"],
  tf: ["terraform"],
  tfvars: ["terraform"],
  toml: ["toml"],
  ts: ["typescript"],
  tsx: ["typescript"],
  xml: ["xml"],
  xsd: ["xml"],
  xsl: ["xml"],
  xslt: ["xml"],
  yaml: ["yaml"],
  yml: ["yaml"],
  zig: ["zig"],
  zsh: ["shell"],
}

fn __canon_pack_for_extension(extension, name) -> list<string> {
  let e = lowercase(extension)
  let base = lowercase(name)
  if base == "dockerfile" || base == "containerfile" || e == "dockerfile" {
    return ["dockerfile"]
  }
  return __CANON_EXTENSION_PACKS[e] ?? []
}

/**
 * Infer harn-canon pack ids for one file path.
 *
 * @effects: []
 * @errors: []
 * @api_stability: experimental
 */
pub fn canon_pack_ids_for_path(path) -> list<string> {
  let raw = __canon_text(path)
  if raw == "" {
    return []
  }
  return __canon_pack_for_extension(ext(raw), basename(raw))
}

/**
 * Infer harn-canon pack ids for a list of `{path, text?}` file records.
 *
 * @effects: []
 * @errors: []
 * @api_stability: experimental
 */
pub fn canon_pack_ids_for_files(files) -> list<string> {
  var ids = []
  for file in files ?? [] {
    let path = __canon_text(file?.path ?? file?.name)
    for id in canon_pack_ids_for_path(path) {
      ids = __canon_push_unique(ids, id)
    }
  }
  return ids.sort()
}

/**
 * Build the Flow `slice` shape used by harn-canon packs from file records.
 *
 * @effects: []
 * @errors: []
 * @api_stability: experimental
 */
pub fn canon_slice_from_files(files, metadata = nil) -> dict {
  var normalized = []
  for file in files ?? [] {
    let path = __canon_text(file?.path ?? file?.name)
    if path != "" {
      normalized = normalized
        .push({path: path, text: to_string(file?.text ?? file?.content ?? "")})
    }
  }
  return {files: normalized, metadata: metadata ?? {}}
}

/**
 * Load `canon-packs.json` from a configured harn-canon root.
 *
 * @effects: [fs.read]
 * @errors: [runtime]
 * @api_stability: experimental
 */
pub fn canon_load_manifest(canon_root: string) -> dict {
  let root = __canon_text(canon_root)
  if root == "" {
    throw "canon_load_manifest: canon_root is required"
  }
  let path = path_join(root, "canon-packs.json")
  let manifest = read_json(path, nil)
  if manifest == nil {
    throw "canon_load_manifest: failed to read " + path
  }
  return manifest
}

fn __canon_manifest_pack(manifest, id) {
  let wanted = __canon_text(id)
  for pack in manifest?.packs ?? [] {
    if __canon_text(pack?.id) == wanted {
      return pack
    }
  }
  return nil
}

fn __canon_requested_needs_manifest(items) -> bool {
  for item in items ?? [] {
    if type_of(item) == "string" {
      return true
    }
  }
  return false
}

fn __canon_pack_from_item(item, manifest, canon_root) {
  if type_of(item) == "string" {
    let id = __canon_text(item)
    let found = __canon_manifest_pack(manifest, id)
    if found == nil {
      return {id: id, title: id, invariants: __canon_path(canon_root, path_join(id, "invariants.harn"))}
    }
    return {
      id: id,
      title: __canon_text(found?.title ?? id),
      invariants: __canon_path(canon_root, found?.invariants),
    }
  }
  if type_of(item) != "dict" {
    throw "canon_resolve_packs: pack entries must be strings or dicts"
  }
  let id = __canon_text(item?.id ?? item?.pack_id)
  let title = __canon_text(item?.title ?? id)
  let invariants = __canon_text(item?.invariants ?? item?.path ?? item?.module_path)
  if id == "" || invariants == "" {
    throw "canon_resolve_packs: dict pack entries need `id` and `invariants`"
  }
  return {
    id: id,
    title: if title == "" {
      id
    } else {
      title
    },
    invariants: __canon_path(canon_root, invariants),
    predicates: item?.predicates,
  }
}

/**
 * Resolve harn-canon pack config from explicit pack ids, dict entries, or files.
 *
 * Options:
 *   canon_root: directory containing `canon-packs.json`
 *   manifest: parsed manifest override
 *   packs | pack_ids: strings or `{id, invariants, title?, predicates?}` dicts
 *   files: file records used for language inference when packs are omitted
 *
 * @effects: [fs.read]
 * @errors: [runtime]
 * @api_stability: experimental
 */
pub fn canon_resolve_packs(options = nil) -> list<dict> {
  let opts = options ?? {}
  let canon_root = opts?.canon_root
  var requested = opts?.packs ?? opts?.pack_ids ?? []
  if type_of(requested) == "string" {
    requested = [requested]
  }
  let needs_manifest = len(requested) == 0 || __canon_requested_needs_manifest(requested)
  let manifest = opts?.manifest
    ?? if needs_manifest && canon_root != nil && __canon_text(canon_root) != "" {
    canon_load_manifest(canon_root)
  } else {
    nil
  }
  if len(requested) == 0 {
    requested = canon_pack_ids_for_files(opts?.files ?? opts?.slice?.files ?? [])
  }
  var resolved = []
  for item in requested {
    resolved = resolved.push(__canon_pack_from_item(item, manifest, canon_root))
  }
  return resolved
}

/**
 * Evaluate a Flow slice against selected harn-canon packs.
 *
 * Returns `{ok, status, packs, selected_pack_ids}` where each pack result carries
 * the raw Flow report and a bounded feedback string.
 *
 * @effects: [fs.read]
 * @errors: [runtime]
 * @api_stability: experimental
 */
pub fn canon_evaluate_slice(slice, options = nil) -> dict {
  let opts = options ?? {}
  let packs = canon_resolve_packs(opts + {slice: slice})
  var reports = []
  var selected = []
  var ok = true
  for pack in packs {
    selected = selected.push(pack.id)
    let report = flow_evaluate_invariants(
      "",
      slice,
      {
        path: pack.invariants,
        predicates: pack?.predicates ?? opts?.predicates,
        include_semantic: opts?.include_semantic ?? false,
        budget_ms: opts?.budget_ms ?? 50,
        ctx: opts?.ctx ?? opts?.predicate_ctx ?? {},
        repo_at_base: opts?.repo_at_base,
      },
    )
    ok = ok && (report?.ok ?? false)
    reports = reports
      .push(
      {
        pack_id: pack.id,
        title: pack.title,
        invariants: pack.invariants,
        report: report,
        feedback: flow_invariant_feedback(report, opts?.feedback ?? {}),
      },
    )
  }
  let status = if len(packs) == 0 {
    "no_packs"
  } else if ok {
    "pass"
  } else {
    "fail"
  }
  return {ok: ok, status: status, packs: reports, selected_pack_ids: selected}
}

/**
 * Render compact agent-facing feedback from `canon_evaluate_slice`.
 *
 * @effects: []
 * @errors: []
 * @api_stability: experimental
 */
pub fn canon_feedback_text(result, options = nil) -> string {
  let opts = options ?? {}
  var sections = []
  for pack in result?.packs ?? [] {
    let body = trim(to_string(pack?.feedback ?? ""))
    if body != "" {
      sections = sections.push("[" + pack.pack_id + "] " + pack.title + "\n" + body)
    }
  }
  if len(sections) == 0 {
    return ""
  }
  return __canon_text(opts?.header ?? "harn-canon feedback") + "\n\n" + join(sections, "\n\n")
}

/**
 * Evaluate harn-canon packs and inject bounded feedback into an agent session.
 *
 * Options:
 *   inject: "feedback" | "reminder" | "none" (default "feedback")
 *   feedback_kind: feedback kind when `inject == "feedback"` (default "harn_canon")
 *   reminder: reminder options merged with the generated body when `inject == "reminder"`
 *
 * @effects: [fs.read, host]
 * @errors: [runtime]
 * @api_stability: experimental
 */
pub fn canon_inject_feedback(session_id: string, slice, options = nil) -> dict {
  let opts = options ?? {}
  let result = canon_evaluate_slice(slice, opts)
  let feedback = canon_feedback_text(result, opts)
  let mode = lowercase(__canon_text(opts?.inject ?? "feedback"))
  var reminder_id = nil
  if feedback != "" && mode == "feedback" {
    agent_session_inject_feedback(session_id, opts?.feedback_kind ?? "harn_canon", feedback)
  } else if feedback != "" && mode == "reminder" {
    let reminder = (opts?.reminder ?? {})
      + {
      body: feedback,
      tags: opts?.reminder?.tags ?? ["harn-canon"],
      dedupe_key: opts?.reminder?.dedupe_key ?? ("harn-canon/" + join(result.selected_pack_ids, ",")),
      ttl_turns: opts?.reminder?.ttl_turns ?? 1,
      preserve_on_compact: opts?.reminder?.preserve_on_compact ?? true,
    }
    reminder_id = agent_session_inject_reminder(session_id, reminder)
  } else if mode != "none" && mode != "feedback" && mode != "reminder" {
    throw "canon_inject_feedback: inject must be feedback|reminder|none"
  }
  return result
    + {feedback_text: feedback, injected: feedback != "" && mode != "none", reminder_id: reminder_id}
}