import { filter_nil } from "std/collections"
/**
* std/dashboard/jobs - host-facing job status envelopes for orchestrator
* dashboards.
*
* Harn emits workflow, schedule, approval, receipt, replay, and DLQ progress
* as typed `harn.dashboard_job_event.v1` records. Hosts render queue/job
* cards from normalized fields while raw local or cloud runtime payloads remain
* separate for audit and replay.
*/
type DashboardJobEventKind = "scheduled_job.created" | "scheduled_job.updated" | "scheduled_job.canceled" | "run.queued" | "run.started" | "run.progress" | "run.succeeded" | "run.failed" | "approval.requested" | "approval.approved" | "approval.denied" | "approval.expired" | "receipt.available" | "replay_fixture.available" | "dlq.created" | "dlq.replayed" | "dlq.dismissed" | string
type DashboardJobStatus = "scheduled" | "queued" | "running" | "waiting_approval" | "succeeded" | "failed" | "dlq" | "canceled" | "unknown" | string
type DashboardJobRef = {kind: string, label?: string, id?: string, url?: string}
type DashboardJobSource = {
runtime: string,
environment: string,
id?: string,
url?: string,
region?: string,
}
type DashboardJobSourceClock = {kind: string, timestamp: string}
type DashboardJobPrivacy = {
redacted: bool,
raw_payload_retained: bool,
contains_sensitive: bool,
flags: list<string>,
}
type DashboardJobActionIntent = {
kind: string,
label: string,
target: dict,
effect: string,
requires_approval: bool,
}
type DashboardApprovalState = {
request_id: string,
status: string,
requested_at?: string,
decided_at?: string,
reviewer?: string,
expires_at?: string,
summary?: string,
}
type DashboardDlqState = {
dlq_id: string,
state: string,
attempts?: int,
error_class?: string,
final_error?: string,
replay_of_event_id?: string,
}
type DashboardJobEvent = {
schema: "harn.dashboard_job_event.v1",
id: string,
dedupe_key: string,
kind: DashboardJobEventKind,
source: DashboardJobSource,
source_clock: DashboardJobSourceClock,
tenant_id?: string,
workspace_id?: string,
job_id: string,
run_id?: string,
workflow_id?: string,
trigger_id?: string,
binding_id?: string,
event_id?: string,
queue?: string,
status: DashboardJobStatus,
title: string,
summary: string,
priority?: string,
queue_depth?: int,
attempt?: int,
max_attempts?: int,
result?: dict,
error?: string,
approval?: DashboardApprovalState,
dlq?: DashboardDlqState,
receipt_refs: list<DashboardJobRef>,
replay_refs: list<DashboardJobRef>,
related_refs: list<DashboardJobRef>,
action_intents: list<DashboardJobActionIntent>,
privacy: DashboardJobPrivacy,
raw_payload: dict,
}
type DashboardJobEmitReceipt = {
schema: "harn.dashboard_job_event_emit_receipt.v1",
topic: string,
kind: "dashboard_job_event",
event_log_id: int,
event: DashboardJobEvent,
}
type DashboardJobCard = {
schema: "harn.dashboard_job_card.v1",
job_id: string,
title: string,
status: DashboardJobStatus,
latest_event_id: string,
updated_at: string,
run_id?: string,
workflow_id?: string,
trigger_id?: string,
binding_id?: string,
queue?: string,
queue_depth?: int,
priority?: string,
last_result?: dict,
last_error?: string,
approval?: DashboardApprovalState,
dlq?: DashboardDlqState,
receipt_refs: list<DashboardJobRef>,
replay_refs: list<DashboardJobRef>,
related_refs: list<DashboardJobRef>,
action_intents: list<DashboardJobActionIntent>,
}
let DASHBOARD_JOB_EVENT_SCHEMA = "harn.dashboard_job_event.v1"
let DASHBOARD_JOB_EVENT_KIND = "dashboard_job_event"
let DASHBOARD_JOB_EVENT_TOPIC = "dashboard.jobs.events"
let DASHBOARD_JOB_CARD_SCHEMA = "harn.dashboard_job_card.v1"
let DASHBOARD_JOBS_VIEW_SCHEMA = "harn.dashboard_jobs_view.v1"
fn __job_text(value) {
if value == nil {
return ""
}
return trim(to_string(value))
}
fn __job_first(values) {
for value in values {
let text = __job_text(value)
if text != "" {
return text
}
}
return nil
}
fn __job_list(value) {
if value == nil {
return []
}
if type_of(value) != "list" {
throw "std/dashboard/jobs: expected list value"
}
return value
}
fn __job_string_list(value) {
var out = []
for item in __job_list(value) {
let text = __job_text(item)
if text != "" {
out = out.push(text)
}
}
return out
}
fn __job_payload(input) {
return input?.raw_payload ?? input?.payload ?? input?.provider_payload ?? input ?? {}
}
fn __job_kind(kind, input, options) {
let resolved = __job_first([kind, options?.kind, input?.kind, input?.event_kind])
if resolved == nil {
throw "std/dashboard/jobs: kind is required"
}
return resolved
}
fn __job_status_for_kind(kind) {
if kind == "scheduled_job.created" || kind == "scheduled_job.updated" {
return "scheduled"
}
if kind == "scheduled_job.canceled" || kind == "dlq.dismissed" {
return "canceled"
}
if kind == "run.queued" || kind == "dlq.replayed" {
return "queued"
}
if kind == "run.started" || kind == "run.progress" || kind == "approval.approved" {
return "running"
}
if kind == "run.succeeded" {
return "succeeded"
}
if kind == "receipt.available" || kind == "replay_fixture.available" {
return "unknown"
}
if kind == "run.failed" || kind == "approval.denied" || kind == "approval.expired" {
return "failed"
}
if kind == "approval.requested" {
return "waiting_approval"
}
if kind == "dlq.created" {
return "dlq"
}
return "unknown"
}
fn __job_timestamp(input, payload, options) {
return __job_first(
[
options?.source_timestamp,
options?.timestamp,
options?.source_clock?.timestamp,
input?.source_timestamp,
input?.timestamp,
input?.source_clock?.timestamp,
input?.queued_at,
input?.started_at,
input?.completed_at,
input?.created_at,
input?.updated_at,
input?.requested_at,
input?.decided_at,
payload?.source_timestamp,
payload?.timestamp,
payload?.source_clock?.timestamp,
payload?.queued_at,
payload?.started_at,
payload?.completed_at,
payload?.created_at,
payload?.updated_at,
payload?.requested_at,
payload?.decided_at,
],
)
}
fn __job_source_clock_kind(input, payload, options) {
let raw = options?.source_clock ?? input?.source_clock ?? payload?.source_clock
if type_of(raw) == "dict" {
return __job_first([raw?.kind])
}
return __job_first([raw])
}
fn __job_source(input, payload, options) {
return filter_nil(
{
runtime: __job_first([options?.runtime, input?.runtime, payload?.runtime]) ?? "harn",
environment: __job_first([options?.environment, input?.environment, payload?.environment]) ?? "local",
id: __job_first([options?.source_id, input?.source_id, payload?.source_id]),
url: __job_first([options?.source_url, input?.source_url, input?.status_url, payload?.source_url]),
region: __job_first([options?.region, input?.region, payload?.region]),
},
)
}
fn __job_source_clock(timestamp, input, payload, options) {
let explicit = __job_source_clock_kind(input, payload, options)
let environment = __job_source(input, payload, options).environment
let fallback = environment == "cloud" ? "harn-cloud" : "harn-local"
return {kind: explicit ?? fallback, timestamp: timestamp}
}
fn __job_id(input, payload, options) {
let id = __job_first(
[
options?.job_id,
input?.job_id,
input?.schedule_id,
input?.workflow_id,
input?.trigger_id,
input?.binding_id,
payload?.job_id,
payload?.schedule_id,
payload?.workflow_id,
payload?.trigger_id,
payload?.binding_id,
],
)
if id == nil {
throw "std/dashboard/jobs: job_id is required"
}
return id
}
fn __job_run_id(input, payload, options) {
return __job_first(
[
options?.run_id,
input?.run_id,
input?.managed_run_id,
input?.dispatch_id,
payload?.run_id,
payload?.managed_run_id,
payload?.dispatch_id,
],
)
}
fn __job_refs(value) {
var out = []
for ref in __job_list(value) {
if type_of(ref) != "dict" {
throw "std/dashboard/jobs: refs must be dicts"
}
let kind = __job_text(ref?.kind)
if kind == "" {
throw "std/dashboard/jobs: ref.kind is required"
}
let id = __job_first([ref?.id])
let url = __job_first([ref?.url])
if id == nil && url == nil {
throw "std/dashboard/jobs: ref.id or ref.url is required"
}
out = out.push(filter_nil({kind: kind, label: __job_first([ref?.label]), id: id, url: url}))
}
return out
}
fn __job_related_refs(input, payload, options) {
let refs = options?.related_refs ?? input?.related_refs ?? payload?.related_refs
if refs != nil {
return __job_refs(refs)
}
let source_url = __job_first([options?.source_url, input?.source_url, input?.status_url, payload?.source_url])
if source_url != nil {
return [{kind: "source", label: "Source", url: source_url}]
}
return []
}
fn __job_receipt_refs(input, payload, options) {
let refs = options?.receipt_refs ?? input?.receipt_refs ?? payload?.receipt_refs
if refs != nil {
return __job_refs(refs)
}
let receipt_id = __job_first([options?.receipt_id, input?.receipt_id, payload?.receipt_id])
let receipt_url = __job_first([options?.receipt_url, input?.receipt_url, payload?.receipt_url])
if receipt_id != nil || receipt_url != nil {
return [filter_nil({kind: "receipt", label: "Receipt", id: receipt_id, url: receipt_url})]
}
return []
}
fn __job_replay_refs(input, payload, options) {
let refs = options?.replay_refs ?? input?.replay_refs ?? payload?.replay_refs
if refs != nil {
return __job_refs(refs)
}
let replay_id = __job_first([options?.replay_id, input?.replay_id, payload?.replay_id])
let replay_url = __job_first([options?.replay_url, input?.replay_url, payload?.replay_url])
if replay_id != nil || replay_url != nil {
return [filter_nil({kind: "replay_fixture", label: "Replay fixture", id: replay_id, url: replay_url})]
}
return []
}
fn __job_privacy(input, options) {
let privacy = input?.privacy ?? {}
return {
redacted: options?.redacted ?? privacy?.redacted ?? false,
raw_payload_retained: options?.raw_payload_retained ?? privacy?.raw_payload_retained ?? true,
contains_sensitive: options?.contains_sensitive ?? privacy?.contains_sensitive ?? false,
flags: __job_string_list(options?.privacy_flags ?? privacy?.flags ?? []),
}
}
fn __job_approval(input, payload, options) {
let raw = options?.approval ?? input?.approval ?? payload?.approval
let request_id = __job_first(
[
raw?.request_id,
options?.approval_request_id,
input?.approval_request_id,
payload?.approval_request_id,
input?.request_id,
payload?.request_id,
],
)
if raw == nil && request_id == nil {
return nil
}
return filter_nil(
{
request_id: request_id,
status: __job_first([raw?.status, options?.approval_status, input?.approval_status]) ?? "requested",
requested_at: __job_first([raw?.requested_at, input?.requested_at, payload?.requested_at]),
decided_at: __job_first([raw?.decided_at, input?.decided_at, payload?.decided_at]),
reviewer: __job_first([raw?.reviewer, input?.reviewer, payload?.reviewer]),
expires_at: __job_first([raw?.expires_at, input?.expires_at, payload?.expires_at]),
summary: __job_first([raw?.summary, input?.approval_summary, payload?.approval_summary]),
},
)
}
fn __job_dlq(input, payload, options) {
let raw = options?.dlq ?? input?.dlq ?? payload?.dlq
let dlq_id = __job_first([raw?.dlq_id, options?.dlq_id, input?.dlq_id, payload?.dlq_id])
if raw == nil && dlq_id == nil {
return nil
}
return filter_nil(
{
dlq_id: dlq_id,
state: __job_first([raw?.state, input?.dlq_state, payload?.dlq_state]) ?? "pending",
attempts: raw?.attempts ?? input?.attempts ?? payload?.attempts,
error_class: __job_first([raw?.error_class, input?.error_class, payload?.error_class]),
final_error: __job_first([raw?.final_error, input?.final_error, input?.error, payload?.final_error]),
replay_of_event_id: __job_first([raw?.replay_of_event_id, input?.replay_of_event_id, payload?.replay_of_event_id]),
},
)
}
fn __job_default_actions(kind, job_id, run_id, approval, dlq, receipt_refs, replay_refs) {
let target = filter_nil({job_id: job_id, run_id: run_id})
var out = [
{
kind: "open_job",
label: "Open job",
target: target,
effect: "navigation",
requires_approval: false,
},
]
if len(receipt_refs) > 0 {
out = out
.push(
{
kind: "open_receipt",
label: "Open receipt",
target: target.merge({receipt_refs: receipt_refs}),
effect: "navigation",
requires_approval: false,
},
)
}
if len(replay_refs) > 0 {
out = out
.push(
{
kind: "open_replay_fixture",
label: "Open replay fixture",
target: target.merge({replay_refs: replay_refs}),
effect: "navigation",
requires_approval: false,
},
)
}
if approval != nil && kind == "approval.requested" {
out = out
.push(
{
kind: "review_approval",
label: "Review approval",
target: target.merge({approval_request_id: approval.request_id}),
effect: "host_approval_decision",
requires_approval: true,
},
)
}
if dlq != nil && kind == "dlq.created" {
out = out
.push(
{
kind: "replay_dlq",
label: "Replay DLQ item",
target: target.merge({dlq_id: dlq.dlq_id}),
effect: "orchestrator_replay",
requires_approval: true,
},
)
.push(
{
kind: "dismiss_dlq",
label: "Dismiss DLQ item",
target: target.merge({dlq_id: dlq.dlq_id}),
effect: "orchestrator_dlq_state",
requires_approval: true,
},
)
}
return out
}
/** Build a stable dashboard event dedupe key from job provenance. */
pub fn dashboard_job_dedupe_key(kind, job_id, run_id = nil, timestamp = nil, source_id = nil) -> string {
let seed = __job_text(kind)
+ "\n"
+ __job_text(job_id)
+ "\n"
+ __job_text(run_id)
+ "\n"
+ __job_text(timestamp)
+ "\n"
+ __job_text(source_id)
return "dashboard_job:" + substring(sha256(seed), 0, 32)
}
fn __job_validate_required(event) {
if event.schema != DASHBOARD_JOB_EVENT_SCHEMA {
throw "std/dashboard/jobs: unsupported schema " + __job_text(event.schema)
}
for key in ["id", "dedupe_key", "kind", "job_id", "status", "title", "summary"] {
if __job_text(event[key]) == "" {
throw "std/dashboard/jobs: " + key + " is required"
}
}
if __job_text(event.source_clock.kind) == "" || __job_text(event.source_clock.timestamp) == "" {
throw "std/dashboard/jobs: source_clock.kind and source_clock.timestamp are required"
}
if __job_text(event.source.runtime) == "" || __job_text(event.source.environment) == "" {
throw "std/dashboard/jobs: source.runtime and source.environment are required"
}
}
fn __job_validate_kind_requirements(event) {
if event.kind.starts_with("run.") && __job_text(event.run_id) == "" {
throw "std/dashboard/jobs: run_id is required for run events"
}
if event.kind.starts_with("approval.") {
if event.approval == nil || __job_text(event.approval.request_id) == "" {
throw "std/dashboard/jobs: approval.request_id is required for approval events"
}
}
if event.kind.starts_with("dlq.") {
if event.dlq == nil || __job_text(event.dlq.dlq_id) == "" {
throw "std/dashboard/jobs: dlq.dlq_id is required for DLQ events"
}
}
if event.kind == "receipt.available" && len(event.receipt_refs) == 0 {
throw "std/dashboard/jobs: receipt.available requires receipt_refs"
}
if event.kind == "replay_fixture.available" && len(event.replay_refs) == 0 {
throw "std/dashboard/jobs: replay_fixture.available requires replay_refs"
}
}
fn __job_validate_refs(event) {
let _ = __job_refs(event.receipt_refs)
let _ = __job_refs(event.replay_refs)
let _ = __job_refs(event.related_refs)
}
fn __job_validate_action_intents(event) {
for intent in event.action_intents ?? [] {
let effect = __job_text(intent.effect)
if __job_text(intent.kind) == "" || __job_text(intent.label) == "" || effect == "" {
throw "std/dashboard/jobs: action intent kind, label, and effect are required"
}
if type_of(intent.target) != "dict" {
throw "std/dashboard/jobs: action intent target must be a dict"
}
if effect != "navigation" && effect != "read" && !(intent.requires_approval ?? false) {
throw "std/dashboard/jobs: action intent `" + __job_text(intent.kind) + "` must require approval"
}
}
}
/** Validate mandatory dashboard provenance and host action boundaries. */
pub fn dashboard_job_validate(event: DashboardJobEvent) -> DashboardJobEvent {
__job_validate_required(event)
__job_validate_kind_requirements(event)
__job_validate_refs(event)
__job_validate_action_intents(event)
return event
}
/**
* Normalize local or cloud orchestrator status payloads into one dashboard
* event envelope.
*/
pub fn dashboard_job_event(kind, input = nil, options = nil) -> DashboardJobEvent {
let opts = options ?? {}
let payload = __job_payload(input)
let resolved_kind = __job_kind(kind, input, opts)
let timestamp = __job_timestamp(input, payload, opts)
if timestamp == nil {
throw "std/dashboard/jobs: source timestamp is required"
}
let job_id = __job_id(input, payload, opts)
let run_id = __job_run_id(input, payload, opts)
let source = __job_source(input, payload, opts)
let receipt_refs = __job_receipt_refs(input, payload, opts)
let replay_refs = __job_replay_refs(input, payload, opts)
let approval = __job_approval(input, payload, opts)
let dlq = __job_dlq(input, payload, opts)
let privacy = __job_privacy(input, opts)
let dedupe_key = opts.dedupe_key ?? input?.dedupe_key
?? dashboard_job_dedupe_key(resolved_kind, job_id, run_id, timestamp, source.id)
let event = {
schema: DASHBOARD_JOB_EVENT_SCHEMA,
id: opts.id ?? input?.dashboard_event_id ?? ("dash_job_evt_" + substring(sha256(dedupe_key), 0, 24)),
dedupe_key: dedupe_key,
kind: resolved_kind,
source: source,
source_clock: __job_source_clock(timestamp, input, payload, opts),
tenant_id: __job_first([opts.tenant_id, input?.tenant_id, payload?.tenant_id]),
workspace_id: __job_first([opts.workspace_id, input?.workspace_id, payload?.workspace_id]),
job_id: job_id,
run_id: run_id,
workflow_id: __job_first([opts.workflow_id, input?.workflow_id, payload?.workflow_id]),
trigger_id: __job_first([opts.trigger_id, input?.trigger_id, payload?.trigger_id]),
binding_id: __job_first([opts.binding_id, input?.binding_id, payload?.binding_id]),
event_id: __job_first([opts.event_id, input?.event_id, payload?.event_id]),
queue: __job_first([opts.queue, input?.queue, payload?.queue]),
status: opts.status ?? input?.status ?? __job_status_for_kind(resolved_kind),
title: __job_first(
[opts.title, input?.title, input?.workflow_name, payload?.title, payload?.workflow_name],
)
?? job_id,
summary: __job_first([opts.summary, input?.summary, payload?.summary])
?? (__job_status_for_kind(resolved_kind) + ": " + job_id),
priority: __job_first([opts.priority, input?.priority, payload?.priority]),
queue_depth: opts.queue_depth ?? input?.queue_depth ?? payload?.queue_depth,
attempt: opts.attempt ?? input?.attempt ?? payload?.attempt,
max_attempts: opts.max_attempts ?? input?.max_attempts ?? payload?.max_attempts,
result: opts.result ?? input?.result ?? payload?.result,
error: __job_first([opts.error, input?.error, payload?.error]),
approval: approval,
dlq: dlq,
receipt_refs: receipt_refs,
replay_refs: replay_refs,
related_refs: __job_related_refs(input, payload, opts),
action_intents: opts.action_intents ?? input?.action_intents
?? __job_default_actions(resolved_kind, job_id, run_id, approval, dlq, receipt_refs, replay_refs),
privacy: privacy,
raw_payload: privacy.raw_payload_retained ? payload : {},
}
return dashboard_job_validate(filter_nil(event))
}
/** Emit a dashboard job event to the EventLog and return an audit receipt. */
pub fn dashboard_job_emit(input: DashboardJobEvent, options = nil) -> DashboardJobEmitReceipt {
let opts = options ?? {}
let event = dashboard_job_validate(input)
let topic = opts.topic ?? DASHBOARD_JOB_EVENT_TOPIC
let event_log_id = event_log
.emit(
topic,
DASHBOARD_JOB_EVENT_KIND,
event,
filter_nil(
{
schema: DASHBOARD_JOB_EVENT_SCHEMA,
job_id: event.job_id,
run_id: event.run_id,
status: event.status,
},
),
)
return {
schema: "harn.dashboard_job_event_emit_receipt.v1",
topic: topic,
kind: DASHBOARD_JOB_EVENT_KIND,
event_log_id: event_log_id,
event: event,
}
}
/** Drop duplicate dashboard events by `dedupe_key`, preserving first-seen order. */
pub fn dashboard_job_dedupe_events(events: list) -> list<DashboardJobEvent> {
var seen = {}
var out = []
for item in events {
let event = dashboard_job_validate(item)
if seen[event.dedupe_key] == nil {
seen[event.dedupe_key] = true
out = out.push(event)
}
}
return out
}
fn __job_append_refs(existing, refs) {
var out = existing ?? []
for ref in refs ?? [] {
let key = __job_text(ref.kind) + ":" + __job_text(ref.id) + ":" + __job_text(ref.url)
var found = false
for current in out {
let current_key = __job_text(current.kind) + ":" + __job_text(current.id) + ":" + __job_text(current.url)
if current_key == key {
found = true
}
}
if !found {
out = out.push(ref)
}
}
return out
}
fn __job_append_actions(existing, intents) {
var out = existing ?? []
for intent in intents ?? [] {
let key = __job_text(intent.kind) + ":" + __job_text(intent.effect)
var found = false
for current in out {
let current_key = __job_text(current.kind) + ":" + __job_text(current.effect)
if current_key == key {
found = true
}
}
if !found {
out = out.push(intent)
}
}
return out
}
fn __job_card_from_event(event) {
return filter_nil(
{
schema: DASHBOARD_JOB_CARD_SCHEMA,
job_id: event.job_id,
title: event.title,
status: event.status,
latest_event_id: event.id,
updated_at: event.source_clock.timestamp,
run_id: event.run_id,
workflow_id: event.workflow_id,
trigger_id: event.trigger_id,
binding_id: event.binding_id,
queue: event.queue,
queue_depth: event.queue_depth,
priority: event.priority,
last_result: event.result,
last_error: event.error,
approval: event.approval,
dlq: event.dlq,
receipt_refs: event.receipt_refs,
replay_refs: event.replay_refs,
related_refs: event.related_refs,
action_intents: event.action_intents,
},
)
}
fn __job_merge_card(card, event) {
let status_neutral = event.kind == "receipt.available" || event.kind == "replay_fixture.available"
return filter_nil(
card
.merge(
{
title: status_neutral ? card.title : event.title,
status: status_neutral ? card.status : event.status,
latest_event_id: event.id,
updated_at: event.source_clock.timestamp,
run_id: event.run_id ?? card.run_id,
workflow_id: event.workflow_id ?? card.workflow_id,
trigger_id: event.trigger_id ?? card.trigger_id,
binding_id: event.binding_id ?? card.binding_id,
queue: event.queue ?? card.queue,
queue_depth: event.queue_depth ?? card.queue_depth,
priority: event.priority ?? card.priority,
last_result: event.result ?? card.last_result,
last_error: event.error ?? card.last_error,
approval: event.approval ?? card.approval,
dlq: event.dlq ?? card.dlq,
receipt_refs: __job_append_refs(card.receipt_refs, event.receipt_refs),
replay_refs: __job_append_refs(card.replay_refs, event.replay_refs),
related_refs: __job_append_refs(card.related_refs, event.related_refs),
action_intents: status_neutral ? __job_append_actions(card.action_intents, event.action_intents) : event.action_intents,
},
),
)
}
/** Reduce an ordered dashboard event stream into static Jobs view cards. */
pub fn dashboard_jobs_snapshot(events: list, options = nil) {
let opts = options ?? {}
let unique = dashboard_job_dedupe_events(events)
var cards = {}
var order = []
for event in unique {
if cards[event.job_id] == nil {
order = order.push(event.job_id)
cards[event.job_id] = __job_card_from_event(event)
} else {
cards[event.job_id] = __job_merge_card(cards[event.job_id], event)
}
}
var jobs = []
for job_id in order {
jobs = jobs.push(cards[job_id])
}
return {schema: "harn.dashboard_jobs_snapshot.v1", generated_at: opts.generated_at, jobs: jobs}
}
/**
* Build a portable dashboard view from normalized events. Pass `{emit: true}`
* to also append each event to the EventLog.
*/
pub fn dashboard_jobs_view(events: list, options = nil) {
let opts = options ?? {}
let unique = dashboard_job_dedupe_events(events)
var receipts = []
if opts.emit ?? false {
for event in unique {
receipts = receipts.push(dashboard_job_emit(event, {topic: opts.topic ?? DASHBOARD_JOB_EVENT_TOPIC}))
}
}
let snapshot = dashboard_jobs_snapshot(unique, opts)
return {
schema: DASHBOARD_JOBS_VIEW_SCHEMA,
generated_at: opts.generated_at,
title: opts.title ?? "Jobs",
events: unique,
jobs: snapshot.jobs,
receipts: receipts,
}
}