harn-stdlib 0.8.22

Embedded Harn standard library source catalog
Documentation
/**
 * std/context/maintenance - portable receipts for background context jobs.
 *
 * Hosts own the concrete queue and worker process. These helpers keep hook
 * handlers, replay fixtures, and host UI code aligned on one receipt shape.
 */
import { filter_nil } from "std/collections"

type ContextMaintenanceJobStatus = "queued" | "running" | "succeeded" | "failed" | "skipped"

type ContextMaintenanceReplayMode = "include" | "skip"

type ContextMaintenanceRetryHint = {
  retryable: bool,
  after_ms?: int,
  max_attempts?: int,
  reason?: string,
}

type ContextMaintenanceReplayHint = {
  mode: ContextMaintenanceReplayMode,
  determinism_key?: string,
  reason?: string,
}

type ContextMaintenanceJobReceipt = {
  schema: "harn.context_maintenance.job_receipt.v1",
  id: string,
  dedupe_key: string,
  job_id: string,
  run_id: string,
  status: ContextMaintenanceJobStatus,
  lifecycle_event?: string,
  queued_at?: string,
  started_at?: string,
  completed_at?: string,
  affected_paths: list<string>,
  artifact_ids: list<string>,
  duration_ms: int,
  retry_hint: ContextMaintenanceRetryHint,
  replay: ContextMaintenanceReplayHint,
  error?: string,
  metadata: dict,
}

let CONTEXT_MAINTENANCE_RECEIPT_SCHEMA = "harn.context_maintenance.job_receipt.v1"

let CONTEXT_MAINTENANCE_STATUSES = ["queued", "running", "succeeded", "failed", "skipped"]

let CONTEXT_MAINTENANCE_REPLAY_MODES = ["include", "skip"]

fn __cm_text(value) {
  if value == nil {
    return ""
  }
  return trim(to_string(value))
}

fn __cm_required_text(value, field) {
  let text = __cm_text(value)
  if text == "" {
    throw "std/context/maintenance: " + field + " is required"
  }
  return text
}

fn __cm_list(value, field) {
  if value == nil {
    return []
  }
  if type_of(value) != "list" {
    throw "std/context/maintenance: " + field + " must be a list"
  }
  return value
}

fn __cm_string_list(value, field) {
  var out = []
  for item in __cm_list(value, field) {
    let text = __cm_text(item)
    if text != "" && !out.contains(text) {
      out = out.push(text)
    }
  }
  return out.sort()
}

fn __cm_paths(input) {
  let opts = input ?? {}
  var paths = opts?.affected_paths ?? opts?.paths ?? opts?.changed_paths
  if paths == nil && opts?.path != nil {
    paths = [opts.path]
  }
  return __cm_string_list(paths, "affected_paths")
}

fn __cm_artifact_ids(input) {
  let opts = input ?? {}
  return __cm_string_list(opts?.artifact_ids ?? opts?.artifacts ?? [], "artifact_ids")
}

fn __cm_status(value) {
  let status = __cm_required_text(value, "status")
  if !CONTEXT_MAINTENANCE_STATUSES.contains(status) {
    throw "std/context/maintenance: unsupported status `" + status + "`"
  }
  return status
}

fn __cm_replay_mode(value) {
  let mode = __cm_text(value ?? "include")
  if !CONTEXT_MAINTENANCE_REPLAY_MODES.contains(mode) {
    throw "std/context/maintenance: unsupported replay mode `" + mode + "`"
  }
  return mode
}

fn __cm_retry_hint(value) {
  let hint = value ?? {}
  if type_of(hint) != "dict" {
    throw "std/context/maintenance: retry_hint must be a dict"
  }
  return filter_nil(
    {
      retryable: hint?.retryable ?? false,
      after_ms: hint?.after_ms ?? hint?.retry_after_ms,
      max_attempts: hint?.max_attempts,
      reason: __cm_text(hint?.reason),
    },
  )
}

fn __cm_replay_hint(value, dedupe_key) {
  let hint = value ?? {}
  if type_of(hint) != "dict" {
    throw "std/context/maintenance: replay must be a dict"
  }
  return filter_nil(
    {
      mode: __cm_replay_mode(hint?.mode ?? hint?.replay_mode ?? "include"),
      determinism_key: __cm_text(hint?.determinism_key ?? dedupe_key),
      reason: __cm_text(hint?.reason),
    },
  )
}

fn __cm_event_name(event) {
  return __cm_text(event?.lifecycle_event ?? event?.event ?? event?.hook_event)
}

fn __cm_source_id(input) {
  let opts = input ?? {}
  return __cm_text(opts?.source_id ?? opts?.session_id ?? opts?.session?.id ?? opts?.event_id)
}

/** Build a stable dedupe key for one maintenance job observation. */
pub fn context_maintenance_dedupe_key(
  job_id: string,
  lifecycle_event = nil,
  affected_paths = [],
  source_id = nil,
) -> string {
  let seed = __cm_required_text(job_id, "job_id")
    + "\n"
    + __cm_text(lifecycle_event)
    + "\n"
    + join(__cm_string_list(affected_paths, "affected_paths"), "\n")
    + "\n"
    + __cm_text(source_id)
  return "context_maintenance:" + substring(sha256(seed), 0, 32)
}

/** Normalize one background maintenance job receipt. */
pub fn context_maintenance_receipt(
  job_id: string,
  status: ContextMaintenanceJobStatus = "queued",
  input = nil,
) -> ContextMaintenanceJobReceipt {
  let opts = input ?? {}
  let resolved_job_id = __cm_required_text(job_id, "job_id")
  let resolved_status = __cm_status(status)
  let lifecycle_event = __cm_event_name(opts)
  let affected_paths = __cm_paths(opts)
  let source_id = __cm_source_id(opts)
  let dedupe_key = opts?.dedupe_key
    ?? context_maintenance_dedupe_key(resolved_job_id, lifecycle_event, affected_paths, source_id)
  let run_id = opts?.run_id ?? ("ctx_run_" + substring(sha256(dedupe_key), 0, 16))
  return filter_nil(
    {
      schema: CONTEXT_MAINTENANCE_RECEIPT_SCHEMA,
      id: opts?.id ?? ("ctx_job_" + substring(sha256(dedupe_key + "\n" + resolved_status), 0, 24)),
      dedupe_key: dedupe_key,
      job_id: resolved_job_id,
      run_id: run_id,
      status: resolved_status,
      lifecycle_event: lifecycle_event,
      queued_at: opts?.queued_at,
      started_at: opts?.started_at,
      completed_at: opts?.completed_at,
      affected_paths: affected_paths,
      artifact_ids: __cm_artifact_ids(opts),
      duration_ms: opts?.duration_ms ?? 0,
      retry_hint: __cm_retry_hint(opts?.retry_hint ?? opts?.retry),
      replay: __cm_replay_hint(opts?.replay, dedupe_key),
      error: __cm_text(opts?.error),
      metadata: opts?.metadata ?? {},
    },
  )
}

/** Build the queued/skipped receipt a lifecycle hook should return after enqueueing. */
pub fn context_maintenance_queue_receipt(job_id: string, event, options = nil) -> ContextMaintenanceJobReceipt {
  let opts = options ?? {}
  let source_event = event ?? {}
  let status = opts?.status ?? "queued"
  return context_maintenance_receipt(
    job_id,
    status,
    opts
      .merge(
      {
        lifecycle_event: opts?.lifecycle_event ?? __cm_event_name(source_event),
        affected_paths: opts?.affected_paths ?? __cm_paths(source_event),
        source_id: opts?.source_id ?? __cm_source_id(source_event),
      },
    ),
  )
}

/** Transition a receipt to the next worker-observed state. */
pub fn context_maintenance_transition(
  receipt: ContextMaintenanceJobReceipt,
  status: ContextMaintenanceJobStatus,
  input = nil,
) -> ContextMaintenanceJobReceipt {
  let opts = (receipt ?? {}).merge(input ?? {})
  return context_maintenance_receipt(
    opts.job_id,
    status,
    opts
      .merge(
      {
        dedupe_key: receipt?.dedupe_key,
        run_id: receipt?.run_id,
        lifecycle_event: opts?.lifecycle_event ?? receipt?.lifecycle_event,
        affected_paths: opts?.affected_paths ?? receipt?.affected_paths,
        artifact_ids: opts?.artifact_ids ?? receipt?.artifact_ids,
        replay: opts?.replay ?? receipt?.replay,
      },
    ),
  )
}

/** Decide whether replay should run the job again or emit a deterministic skip. */
pub fn context_maintenance_replay_decision(receipt: ContextMaintenanceJobReceipt, options = nil) {
  let opts = options ?? {}
  let mode = __cm_replay_mode(opts?.mode ?? opts?.replay_mode ?? receipt?.replay?.mode ?? "include")
  if mode == "skip" {
    let reason = __cm_text(opts?.reason ?? receipt?.replay?.reason ?? "deterministic_replay_skip")
    return {
      mode: "skip",
      should_run: false,
      receipt: context_maintenance_transition(
        receipt,
        "skipped",
        {replay: {mode: "skip", determinism_key: receipt?.dedupe_key, reason: reason}, error: reason},
      ),
    }
  }
  return {mode: "include", should_run: true, receipt: receipt}
}