/**
* std/context/maintenance - portable receipts for background context jobs.
*
* Hosts own the concrete queue and worker process. These helpers keep hook
* handlers, replay fixtures, and host UI code aligned on one receipt shape.
*/
import { filter_nil } from "std/collections"
type ContextMaintenanceJobStatus = "queued" | "running" | "succeeded" | "failed" | "skipped"
type ContextMaintenanceReplayMode = "include" | "skip"
type ContextMaintenanceRetryHint = {
retryable: bool,
after_ms?: int,
max_attempts?: int,
reason?: string,
}
type ContextMaintenanceReplayHint = {
mode: ContextMaintenanceReplayMode,
determinism_key?: string,
reason?: string,
}
type ContextMaintenanceJobReceipt = {
schema: "harn.context_maintenance.job_receipt.v1",
id: string,
dedupe_key: string,
job_id: string,
run_id: string,
status: ContextMaintenanceJobStatus,
lifecycle_event?: string,
queued_at?: string,
started_at?: string,
completed_at?: string,
affected_paths: list<string>,
artifact_ids: list<string>,
duration_ms: int,
retry_hint: ContextMaintenanceRetryHint,
replay: ContextMaintenanceReplayHint,
error?: string,
metadata: dict,
}
let CONTEXT_MAINTENANCE_RECEIPT_SCHEMA = "harn.context_maintenance.job_receipt.v1"
let CONTEXT_MAINTENANCE_STATUSES = ["queued", "running", "succeeded", "failed", "skipped"]
let CONTEXT_MAINTENANCE_REPLAY_MODES = ["include", "skip"]
fn __cm_text(value) {
if value == nil {
return ""
}
return trim(to_string(value))
}
fn __cm_required_text(value, field) {
let text = __cm_text(value)
if text == "" {
throw "std/context/maintenance: " + field + " is required"
}
return text
}
fn __cm_list(value, field) {
if value == nil {
return []
}
if type_of(value) != "list" {
throw "std/context/maintenance: " + field + " must be a list"
}
return value
}
fn __cm_string_list(value, field) {
var out = []
for item in __cm_list(value, field) {
let text = __cm_text(item)
if text != "" && !out.contains(text) {
out = out.push(text)
}
}
return out.sort()
}
fn __cm_paths(input) {
let opts = input ?? {}
var paths = opts?.affected_paths ?? opts?.paths ?? opts?.changed_paths
if paths == nil && opts?.path != nil {
paths = [opts.path]
}
return __cm_string_list(paths, "affected_paths")
}
fn __cm_artifact_ids(input) {
let opts = input ?? {}
return __cm_string_list(opts?.artifact_ids ?? opts?.artifacts ?? [], "artifact_ids")
}
fn __cm_status(value) {
let status = __cm_required_text(value, "status")
if !CONTEXT_MAINTENANCE_STATUSES.contains(status) {
throw "std/context/maintenance: unsupported status `" + status + "`"
}
return status
}
fn __cm_replay_mode(value) {
let mode = __cm_text(value ?? "include")
if !CONTEXT_MAINTENANCE_REPLAY_MODES.contains(mode) {
throw "std/context/maintenance: unsupported replay mode `" + mode + "`"
}
return mode
}
fn __cm_retry_hint(value) {
let hint = value ?? {}
if type_of(hint) != "dict" {
throw "std/context/maintenance: retry_hint must be a dict"
}
return filter_nil(
{
retryable: hint?.retryable ?? false,
after_ms: hint?.after_ms ?? hint?.retry_after_ms,
max_attempts: hint?.max_attempts,
reason: __cm_text(hint?.reason),
},
)
}
fn __cm_replay_hint(value, dedupe_key) {
let hint = value ?? {}
if type_of(hint) != "dict" {
throw "std/context/maintenance: replay must be a dict"
}
return filter_nil(
{
mode: __cm_replay_mode(hint?.mode ?? hint?.replay_mode ?? "include"),
determinism_key: __cm_text(hint?.determinism_key ?? dedupe_key),
reason: __cm_text(hint?.reason),
},
)
}
fn __cm_event_name(event) {
return __cm_text(event?.lifecycle_event ?? event?.event ?? event?.hook_event)
}
fn __cm_source_id(input) {
let opts = input ?? {}
return __cm_text(opts?.source_id ?? opts?.session_id ?? opts?.session?.id ?? opts?.event_id)
}
/** Build a stable dedupe key for one maintenance job observation. */
pub fn context_maintenance_dedupe_key(
job_id: string,
lifecycle_event = nil,
affected_paths = [],
source_id = nil,
) -> string {
let seed = __cm_required_text(job_id, "job_id")
+ "\n"
+ __cm_text(lifecycle_event)
+ "\n"
+ join(__cm_string_list(affected_paths, "affected_paths"), "\n")
+ "\n"
+ __cm_text(source_id)
return "context_maintenance:" + substring(sha256(seed), 0, 32)
}
/** Normalize one background maintenance job receipt. */
pub fn context_maintenance_receipt(
job_id: string,
status: ContextMaintenanceJobStatus = "queued",
input = nil,
) -> ContextMaintenanceJobReceipt {
let opts = input ?? {}
let resolved_job_id = __cm_required_text(job_id, "job_id")
let resolved_status = __cm_status(status)
let lifecycle_event = __cm_event_name(opts)
let affected_paths = __cm_paths(opts)
let source_id = __cm_source_id(opts)
let dedupe_key = opts?.dedupe_key
?? context_maintenance_dedupe_key(resolved_job_id, lifecycle_event, affected_paths, source_id)
let run_id = opts?.run_id ?? ("ctx_run_" + substring(sha256(dedupe_key), 0, 16))
return filter_nil(
{
schema: CONTEXT_MAINTENANCE_RECEIPT_SCHEMA,
id: opts?.id ?? ("ctx_job_" + substring(sha256(dedupe_key + "\n" + resolved_status), 0, 24)),
dedupe_key: dedupe_key,
job_id: resolved_job_id,
run_id: run_id,
status: resolved_status,
lifecycle_event: lifecycle_event,
queued_at: opts?.queued_at,
started_at: opts?.started_at,
completed_at: opts?.completed_at,
affected_paths: affected_paths,
artifact_ids: __cm_artifact_ids(opts),
duration_ms: opts?.duration_ms ?? 0,
retry_hint: __cm_retry_hint(opts?.retry_hint ?? opts?.retry),
replay: __cm_replay_hint(opts?.replay, dedupe_key),
error: __cm_text(opts?.error),
metadata: opts?.metadata ?? {},
},
)
}
/** Build the queued/skipped receipt a lifecycle hook should return after enqueueing. */
pub fn context_maintenance_queue_receipt(job_id: string, event, options = nil) -> ContextMaintenanceJobReceipt {
let opts = options ?? {}
let source_event = event ?? {}
let status = opts?.status ?? "queued"
return context_maintenance_receipt(
job_id,
status,
opts
.merge(
{
lifecycle_event: opts?.lifecycle_event ?? __cm_event_name(source_event),
affected_paths: opts?.affected_paths ?? __cm_paths(source_event),
source_id: opts?.source_id ?? __cm_source_id(source_event),
},
),
)
}
/** Transition a receipt to the next worker-observed state. */
pub fn context_maintenance_transition(
receipt: ContextMaintenanceJobReceipt,
status: ContextMaintenanceJobStatus,
input = nil,
) -> ContextMaintenanceJobReceipt {
let opts = (receipt ?? {}).merge(input ?? {})
return context_maintenance_receipt(
opts.job_id,
status,
opts
.merge(
{
dedupe_key: receipt?.dedupe_key,
run_id: receipt?.run_id,
lifecycle_event: opts?.lifecycle_event ?? receipt?.lifecycle_event,
affected_paths: opts?.affected_paths ?? receipt?.affected_paths,
artifact_ids: opts?.artifact_ids ?? receipt?.artifact_ids,
replay: opts?.replay ?? receipt?.replay,
},
),
)
}
/** Decide whether replay should run the job again or emit a deterministic skip. */
pub fn context_maintenance_replay_decision(receipt: ContextMaintenanceJobReceipt, options = nil) {
let opts = options ?? {}
let mode = __cm_replay_mode(opts?.mode ?? opts?.replay_mode ?? receipt?.replay?.mode ?? "include")
if mode == "skip" {
let reason = __cm_text(opts?.reason ?? receipt?.replay?.reason ?? "deterministic_replay_skip")
return {
mode: "skip",
should_run: false,
receipt: context_maintenance_transition(
receipt,
"skipped",
{replay: {mode: "skip", determinism_key: receipt?.dedupe_key, reason: reason}, error: reason},
),
}
}
return {mode: "include", should_run: true, receipt: receipt}
}