import { agent_session_inject_feedback, agent_session_inject_reminder } from "std/agent/state"
import { read_json } from "std/fs"
import { ext, is_absolute } from "std/path"
fn __canon_text(value) -> string {
return trim(to_string(value ?? ""))
}
fn __canon_path(root, rel) -> string {
let raw = __canon_text(rel)
if raw == "" {
return ""
}
if is_absolute(raw) || root == nil || __canon_text(root) == "" {
return raw
}
return path_join(__canon_text(root), raw)
}
fn __canon_push_unique(items, value) {
let text = __canon_text(value)
if text == "" || contains(items, text) {
return items
}
return items.push(text)
}
let __CANON_EXTENSION_PACKS = {
bash: ["shell"],
c: ["c"],
cc: ["cpp"],
cjs: ["javascript"],
cpp: ["cpp"],
cs: ["csharp"],
css: ["css"],
cts: ["typescript"],
cxx: ["cpp"],
dart: ["dart"],
dockerfile: ["dockerfile"],
ex: ["elixir"],
exs: ["elixir"],
go: ["go"],
gql: ["graphql"],
graphql: ["graphql"],
h: ["c", "cpp"],
harn: ["harn"],
hh: ["cpp"],
hpp: ["cpp"],
hs: ["haskell"],
htm: ["html"],
html: ["html"],
hxx: ["cpp"],
java: ["java"],
js: ["javascript"],
json: ["json"],
jsx: ["javascript"],
ksh: ["shell"],
kt: ["kotlin"],
kts: ["kotlin"],
less: ["css"],
lua: ["lua"],
markdown: ["markdown"],
md: ["markdown"],
mjs: ["javascript"],
mts: ["typescript"],
php: ["php"],
proto: ["protobuf"],
py: ["python"],
pyw: ["python"],
qmd: ["r"],
r: ["r"],
rb: ["ruby"],
rmd: ["r"],
rs: ["rust"],
sass: ["css"],
sc: ["scala"],
scala: ["scala"],
scss: ["css"],
sh: ["shell"],
sql: ["sql"],
swift: ["swift"],
tf: ["terraform"],
tfvars: ["terraform"],
toml: ["toml"],
ts: ["typescript"],
tsx: ["typescript"],
xml: ["xml"],
xsd: ["xml"],
xsl: ["xml"],
xslt: ["xml"],
yaml: ["yaml"],
yml: ["yaml"],
zig: ["zig"],
zsh: ["shell"],
}
fn __canon_pack_for_extension(extension, name) -> list<string> {
let e = lowercase(extension)
let base = lowercase(name)
if base == "dockerfile" || base == "containerfile" || e == "dockerfile" {
return ["dockerfile"]
}
return __CANON_EXTENSION_PACKS[e] ?? []
}
fn __canon_manifest_has_routing(manifest) -> bool {
for pack in manifest?.packs ?? [] {
if len(pack?.extensions ?? []) > 0 || len(pack?.file_names ?? []) > 0 {
return true
}
}
return false
}
fn __canon_selector_matches(values, wanted) -> bool {
let target = lowercase(__canon_text(wanted))
if target == "" {
return false
}
for value in values ?? [] {
if lowercase(__canon_text(value)) == target {
return true
}
}
return false
}
fn __canon_pack_ids_for_path_from_manifest(path, manifest) -> list<string> {
let raw = __canon_text(path)
if raw == "" {
return []
}
let extension = lowercase(ext(raw))
let name = lowercase(basename(raw))
var ids = []
for pack in manifest?.packs ?? [] {
let id = __canon_text(pack?.id)
if id == "" {
continue
}
let matches_extension = __canon_selector_matches(pack?.extensions, extension)
let matches_name = __canon_selector_matches(pack?.file_names, name)
if matches_extension || matches_name {
ids = __canon_push_unique(ids, id)
}
}
return ids.sort()
}
/**
* Infer harn-canon pack ids for one file path.
*
* @effects: []
* @errors: []
* @api_stability: experimental
*/
pub fn canon_pack_ids_for_path(path, manifest = nil) -> list<string> {
let raw = __canon_text(path)
if raw == "" {
return []
}
if __canon_manifest_has_routing(manifest) {
return __canon_pack_ids_for_path_from_manifest(raw, manifest)
}
return __canon_pack_for_extension(ext(raw), basename(raw))
}
/**
* Infer harn-canon pack ids for a list of `{path, text?}` file records.
*
* When a parsed `canon-packs.json` manifest is supplied, its routing selectors
* are authoritative; the built-in extension table is only a legacy fallback for
* callers that have not loaded harn-canon metadata yet.
*
* @effects: []
* @errors: []
* @api_stability: experimental
*/
pub fn canon_pack_ids_for_files(files, manifest = nil) -> list<string> {
var ids = []
for file in files ?? [] {
let path = __canon_text(file?.path ?? file?.name)
for id in canon_pack_ids_for_path(path, manifest) {
ids = __canon_push_unique(ids, id)
}
}
return ids.sort()
}
/**
* Build the Flow `slice` shape used by harn-canon packs from file records.
*
* @effects: []
* @errors: []
* @api_stability: experimental
*/
pub fn canon_slice_from_files(files, metadata = nil) -> dict {
var normalized = []
for file in files ?? [] {
let path = __canon_text(file?.path ?? file?.name)
if path != "" {
normalized = normalized
.push({path: path, text: to_string(file?.text ?? file?.content ?? "")})
}
}
return {files: normalized, metadata: metadata ?? {}}
}
/**
* Load `canon-packs.json` from a configured harn-canon root.
*
* @effects: [fs.read]
* @errors: [runtime]
* @api_stability: experimental
*/
pub fn canon_load_manifest(canon_root: string) -> dict {
let root = __canon_text(canon_root)
if root == "" {
throw "canon_load_manifest: canon_root is required"
}
let path = path_join(root, "canon-packs.json")
let manifest = read_json(path, nil)
if manifest == nil {
throw "canon_load_manifest: failed to read " + path
}
return manifest
}
fn __canon_manifest_pack(manifest, id) {
let wanted = __canon_text(id)
for pack in manifest?.packs ?? [] {
if __canon_text(pack?.id) == wanted {
return pack
}
}
return nil
}
fn __canon_requested_needs_manifest(items) -> bool {
for item in items ?? [] {
if type_of(item) == "string" {
return true
}
}
return false
}
fn __canon_pack_from_item(item, manifest, canon_root) {
if type_of(item) == "string" {
let id = __canon_text(item)
let found = __canon_manifest_pack(manifest, id)
if found == nil {
return {id: id, title: id, invariants: __canon_path(canon_root, path_join(id, "invariants.harn"))}
}
return {
id: id,
title: __canon_text(found?.title ?? id),
invariants: __canon_path(canon_root, found?.invariants),
}
}
if type_of(item) != "dict" {
throw "canon_resolve_packs: pack entries must be strings or dicts"
}
let id = __canon_text(item?.id ?? item?.pack_id)
let title = __canon_text(item?.title ?? id)
let invariants = __canon_text(item?.invariants ?? item?.path ?? item?.module_path)
if id == "" || invariants == "" {
throw "canon_resolve_packs: dict pack entries need `id` and `invariants`"
}
return {
id: id,
title: if title == "" {
id
} else {
title
},
invariants: __canon_path(canon_root, invariants),
predicates: item?.predicates,
}
}
/**
* Resolve harn-canon pack config from explicit pack ids, dict entries, or files.
*
* Options:
* canon_root: directory containing `canon-packs.json`
* manifest: parsed manifest override
* packs | pack_ids: strings or `{id, invariants, title?, predicates?}` dicts
* files: file records used for language inference when packs are omitted
*
* @effects: [fs.read]
* @errors: [runtime]
* @api_stability: experimental
*/
pub fn canon_resolve_packs(options = nil) -> list<dict> {
let opts = options ?? {}
let canon_root = opts?.canon_root
var requested = opts?.packs ?? opts?.pack_ids ?? []
if type_of(requested) == "string" {
requested = [requested]
}
let needs_manifest = len(requested) == 0 || __canon_requested_needs_manifest(requested)
let manifest = opts?.manifest
?? if needs_manifest && canon_root != nil && __canon_text(canon_root) != "" {
canon_load_manifest(canon_root)
} else {
nil
}
if len(requested) == 0 {
requested = canon_pack_ids_for_files(opts?.files ?? opts?.slice?.files ?? [], manifest)
}
var resolved = []
for item in requested {
resolved = resolved.push(__canon_pack_from_item(item, manifest, canon_root))
}
return resolved
}
/**
* Evaluate a Flow slice against selected harn-canon packs.
*
* Returns `{ok, status, packs, selected_pack_ids}` where each pack result carries
* the raw Flow report and a bounded feedback string.
*
* @effects: [fs.read]
* @errors: [runtime]
* @api_stability: experimental
*/
pub fn canon_evaluate_slice(slice, options = nil) -> dict {
let opts = options ?? {}
let packs = canon_resolve_packs(opts + {slice: slice})
var reports = []
var selected = []
var ok = true
for pack in packs {
selected = selected.push(pack.id)
let report = flow_evaluate_invariants(
"",
slice,
{
path: pack.invariants,
predicates: pack?.predicates ?? opts?.predicates,
include_semantic: opts?.include_semantic ?? false,
budget_ms: opts?.budget_ms ?? 50,
ctx: opts?.ctx ?? opts?.predicate_ctx ?? {},
repo_at_base: opts?.repo_at_base,
},
)
ok = ok && (report?.ok ?? false)
reports = reports
.push(
{
pack_id: pack.id,
title: pack.title,
invariants: pack.invariants,
report: report,
feedback: flow_invariant_feedback(report, opts?.feedback ?? {}),
},
)
}
let status = if len(packs) == 0 {
"no_packs"
} else if ok {
"pass"
} else {
"fail"
}
return {ok: ok, status: status, packs: reports, selected_pack_ids: selected}
}
/**
* Render compact agent-facing feedback from `canon_evaluate_slice`.
*
* @effects: []
* @errors: []
* @api_stability: experimental
*/
pub fn canon_feedback_text(result, options = nil) -> string {
let opts = options ?? {}
var sections = []
for pack in result?.packs ?? [] {
let body = trim(to_string(pack?.feedback ?? ""))
if body != "" {
sections = sections.push("[" + pack.pack_id + "] " + pack.title + "\n" + body)
}
}
if len(sections) == 0 {
return ""
}
return __canon_text(opts?.header ?? "harn-canon feedback") + "\n\n" + join(sections, "\n\n")
}
/**
* Evaluate harn-canon packs and inject bounded feedback into an agent session.
*
* Options:
* inject: "feedback" | "reminder" | "none" (default "feedback")
* feedback_kind: feedback kind when `inject == "feedback"` (default "harn_canon")
* reminder: reminder options merged with the generated body when `inject == "reminder"`
*
* @effects: [fs.read, host]
* @errors: [runtime]
* @api_stability: experimental
*/
pub fn canon_inject_feedback(session_id: string, slice, options = nil) -> dict {
let opts = options ?? {}
let result = canon_evaluate_slice(slice, opts)
let feedback = canon_feedback_text(result, opts)
let mode = lowercase(__canon_text(opts?.inject ?? "feedback"))
var reminder_id = nil
if feedback != "" && mode == "feedback" {
agent_session_inject_feedback(session_id, opts?.feedback_kind ?? "harn_canon", feedback)
} else if feedback != "" && mode == "reminder" {
let reminder = (opts?.reminder ?? {})
+ {
body: feedback,
tags: opts?.reminder?.tags ?? ["harn-canon"],
dedupe_key: opts?.reminder?.dedupe_key ?? ("harn-canon/" + join(result.selected_pack_ids, ",")),
ttl_turns: opts?.reminder?.ttl_turns ?? 1,
preserve_on_compact: opts?.reminder?.preserve_on_compact ?? true,
}
reminder_id = agent_session_inject_reminder(session_id, reminder)
} else if mode != "none" && mode != "feedback" && mode != "reminder" {
throw "canon_inject_feedback: inject must be feedback|reminder|none"
}
return result
+ {feedback_text: feedback, injected: feedback != "" && mode != "none", reminder_id: reminder_id}
}