// std/coordination — durable multi-agent coordination ledger helpers.
//
// The storage substrate is Harn's channel/event-log system. This module only
// normalizes coordination envelopes so agents can coordinate without inventing
// host-local mailboxes or user-visible prose protocols.
import { filter_nil } from "std/collections"
import { memory_store } from "std/memory"
let __COORD_SCHEMA = "harn.coordination.message.v1"
let __COORD_RECEIPT_SCHEMA = "harn.coordination.receipt.v1"
let __COORD_CHANNEL_PREFIX = "coord"
let __COORD_KINDS = ["status", "claim", "handoff", "blocker", "decision", "request", "fact"]
fn __coord_string(value) {
if value == nil {
return ""
}
if type_of(value) == "string" {
return value
}
return to_string(value)
}
fn __coord_text(value) {
if value == nil {
return ""
}
if type_of(value) == "string" {
return value
}
if type_of(value) == "dict" || type_of(value) == "list" {
return json_stringify(value)
}
return to_string(value)
}
fn __coord_dict(value) {
if type_of(value) == "dict" {
return value
}
return {}
}
fn __coord_ctx() {
let ctx = runtime_context()
if type_of(ctx) == "dict" {
return ctx
}
return {}
}
fn __coord_normalized_scope(scope) {
let value = lowercase(trim(__coord_string(scope)))
if value == "" {
return "session"
}
if contains(["session", "pipeline", "tenant", "workspace", "task"], value) {
return value
}
throw "std/coordination: unsupported scope `" + value + "`"
}
fn __coord_validate_component(label, value) {
let text = trim(__coord_string(value))
if text == "" {
throw "std/coordination: " + label + " is required"
}
if contains(text, ":") || contains(text, " ") || contains(text, "\n") || contains(text, "\t") {
throw "std/coordination: " + label + " must not contain whitespace or ':'"
}
return text
}
fn __coord_topic_component(value) {
return regex_replace("[^A-Za-z0-9._-]", "_", __coord_validate_component("topic component", value))
}
fn __coord_channel_name(parts) {
var normalized = [__COORD_CHANNEL_PREFIX]
for part in parts {
normalized = normalized + [__coord_topic_component(part)]
}
return join(normalized, ".")
}
fn __coord_scope_id(scope, opts, ctx) {
if scope == "session" {
return opts?.session_id
?? opts?.scope_id
?? ctx?.agent_session_id
?? ctx?.root_agent_session_id
?? ctx?.scope_id
?? ctx?.root_task_id
?? "session"
}
if scope == "pipeline" {
let id = opts?.pipeline_id ?? opts?.scope_id ?? ctx?.workflow_id ?? ctx?.run_id
if id == nil || trim(__coord_string(id)) == "" {
throw "std/coordination: pipeline scope requires pipeline_id, workflow_id, or run_id"
}
return id
}
if scope == "workspace" {
return opts?.workspace_id ?? opts?.scope_id ?? ctx?.workspace_id ?? "workspace"
}
if scope == "task" {
return opts?.task_id ?? opts?.scope_id ?? ctx?.task_id ?? ctx?.root_task_id ?? "task"
}
return opts?.tenant_id ?? opts?.scope_id ?? ctx?.tenant_id ?? "default"
}
fn __coord_resolution(scope, room, options) {
let opts = __coord_dict(options)
let ctx = __coord_ctx()
let logical_scope = __coord_normalized_scope(scope)
let room_name = __coord_validate_component("room", room)
let logical_scope_id = __coord_validate_component("scope id", __coord_scope_id(logical_scope, opts, ctx))
if logical_scope == "workspace" {
let tenant_id = __coord_validate_component("tenant id", opts?.tenant_id ?? ctx?.tenant_id ?? "default")
return {
logical_scope: logical_scope,
logical_scope_id: logical_scope_id,
channel_scope: "tenant",
channel_scope_id: tenant_id,
channel_name: __coord_channel_name(["workspace", logical_scope_id, room_name]),
room: room_name,
}
}
if logical_scope == "task" {
let session_id = __coord_validate_component("session id", __coord_scope_id("session", opts, ctx))
return {
logical_scope: logical_scope,
logical_scope_id: logical_scope_id,
channel_scope: "session",
channel_scope_id: session_id,
channel_name: __coord_channel_name(["task", logical_scope_id, room_name]),
room: room_name,
}
}
return {
logical_scope: logical_scope,
logical_scope_id: logical_scope_id,
channel_scope: logical_scope,
channel_scope_id: logical_scope_id,
channel_name: __coord_channel_name([logical_scope, room_name]),
room: room_name,
}
}
fn __coord_channel_options(resolution, options, id = nil) {
let opts = __coord_dict(options)
var out = {scope: resolution.channel_scope}
if id != nil {
out = out + {id: id}
}
if resolution.channel_scope == "session" {
out = out + {session_id: resolution.channel_scope_id}
} else if resolution.channel_scope == "pipeline" {
out = out + {pipeline_id: resolution.channel_scope_id}
} else if resolution.channel_scope == "tenant" {
out = out + {tenant_id: resolution.channel_scope_id}
}
let ttl = opts?.ttl
if ttl != nil {
out = out + {ttl: ttl}
}
let limit = opts?.limit
if limit != nil {
out = out + {limit: limit}
}
let cursor = opts?.from_cursor ?? opts?.cursor
if cursor != nil {
out = out + {from_cursor: cursor}
}
return out
}
fn __coord_list(value) {
if value == nil {
return []
}
if type_of(value) == "list" {
return value
}
return [value]
}
fn __coord_address_identifier(address) {
if address == nil {
return nil
}
if type_of(address) == "string" {
return address
}
if type_of(address) == "dict" {
return address?.session_id
?? address?.root_session_id
?? address?.agent
?? address?.worker_id
?? address?.run_id
?? address?.task_id
?? address?.root_task_id
}
return nil
}
fn __coord_consumer_id(options, fallback = nil) {
let opts = __coord_dict(options)
let ctx = __coord_ctx()
let address_id = __coord_address_identifier(opts?.to ?? opts?.recipient)
return __coord_validate_component(
"consumer id",
opts?.consumer_id
?? opts?.consumer
?? fallback
?? address_id
?? opts?.session_id
?? ctx?.agent_session_id
?? ctx?.worker_id
?? ctx?.run_id
?? "default",
)
}
fn __coord_text_matches(value, expected) {
let needle = trim(__coord_string(expected))
if needle == "" {
return true
}
if type_of(value) == "list" {
for item in value {
if __coord_text_matches(item, needle) {
return true
}
}
return false
}
return trim(__coord_string(value)) == needle
}
fn __coord_address_matches(address, expected) {
let needle = trim(__coord_string(__coord_address_identifier(expected) ?? expected))
if needle == "" {
return true
}
if type_of(address) == "string" {
return trim(address) == needle
}
if type_of(address) == "dict" {
return __coord_text_matches(
[
address?.session_id,
address?.root_session_id,
address?.agent,
address?.worker_id,
address?.run_id,
address?.task_id,
address?.root_task_id,
],
needle,
)
}
return false
}
fn __coord_kind_matches(kind, expected) {
if expected == nil || trim(__coord_string(expected)) == "" {
return true
}
if type_of(expected) == "list" {
for item in expected {
if kind == lowercase(trim(__coord_string(item))) {
return true
}
}
return false
}
return kind == lowercase(trim(__coord_string(expected)))
}
fn __coord_message_matches(message, options, default_to) {
let opts = __coord_dict(options)
if !__coord_kind_matches(message?.kind, opts?.kind) {
return false
}
if !__coord_address_matches(message?.to, opts?.to ?? opts?.recipient ?? default_to) {
return false
}
if opts?.from != nil && !__coord_address_matches(message?.from, opts?.from) {
return false
}
if opts?.reply_to != nil && !__coord_text_matches(message?.reply_to, opts?.reply_to) {
return false
}
if opts?.thread_id != nil && !__coord_text_matches(message?.thread_id, opts?.thread_id) {
return false
}
if opts?.subject != nil && !__coord_text_matches(message?.subject, opts?.subject) {
return false
}
return true
}
fn __coord_kind(message, options) {
let kind = lowercase(trim(__coord_string(options?.kind ?? message?.kind ?? "status")))
if !contains(__COORD_KINDS, kind) {
throw "std/coordination: unsupported message kind `" + kind + "`"
}
return kind
}
fn __coord_actor(message, options, ctx) {
let raw = __coord_dict(options?.from ?? message?.from)
return filter_nil(
raw
+ {
agent: raw?.agent ?? options?.agent ?? ctx?.agent ?? ctx?.persona,
session_id: raw?.session_id ?? ctx?.agent_session_id,
root_session_id: raw?.root_session_id ?? ctx?.root_agent_session_id,
worker_id: raw?.worker_id ?? ctx?.worker_id,
run_id: raw?.run_id ?? ctx?.run_id,
task_id: raw?.task_id ?? ctx?.task_id,
root_task_id: raw?.root_task_id ?? ctx?.root_task_id,
},
)
}
fn __coord_body(message, options) {
if type_of(message) == "string" {
return message
}
return __coord_text(
options?.body ?? message?.body ?? message?.text ?? message?.message ?? message?.subject,
)
}
fn __coord_subject(message, options, body) {
let explicit = options?.subject ?? message?.subject ?? message?.title
if explicit != nil {
return trim(__coord_string(explicit))
}
let first_line = split(body ?? "", "\n")[0] ?? ""
if len(first_line) > 96 {
return substring(first_line, 0, 96)
}
return first_line
}
fn __coord_envelope(scope, room, message, options) {
let msg = __coord_dict(message)
let opts = __coord_dict(options)
let ctx = __coord_ctx()
let resolution = __coord_resolution(scope, room, opts)
let id = __coord_validate_component(
"message id",
opts?.id ?? opts?.dedupe_key ?? msg?.id ?? ("coord-" + uuid_v7()),
)
let body = __coord_body(message, opts)
let subject = __coord_subject(msg, opts, body)
let kind = __coord_kind(msg, opts)
return filter_nil(
{
schema: __COORD_SCHEMA,
id: id,
scope: resolution.logical_scope,
scope_id: resolution.logical_scope_id,
room: resolution.room,
kind: kind,
from: __coord_actor(msg, opts, ctx),
to: opts?.to ?? msg?.to,
subject: subject,
body: body,
data: opts?.data ?? msg?.data ?? msg?.payload,
refs: __coord_list(opts?.refs ?? msg?.refs),
related_ids: __coord_list(opts?.related_ids ?? msg?.related_ids),
reply_to: opts?.reply_to ?? msg?.reply_to,
thread_id: opts?.thread_id ?? msg?.thread_id,
dedupe_key: opts?.dedupe_key,
created_at: opts?.created_at ?? msg?.created_at ?? date_iso(),
ttl: opts?.ttl ?? msg?.ttl,
privacy: opts?.privacy ?? msg?.privacy,
},
)
}
/**
* Append an addressed coordination message to a scoped room.
*
* This is a typed wrapper over `coord_post`: the message remains append-only
* channel data, while the `to` field lets consumers build non-destructive
* inboxes with explicit cursor/ack semantics.
*
* @effects: [transcript.write]
* @errors: [runtime]
*/
pub fn coord_send(scope, room, recipient, message, options = nil) {
let opts = __coord_dict(options)
return coord_post(scope, room, message, opts + {to: recipient})
}
/**
* Reply to a coordination message or receipt.
*
* `reply_to` is first-class envelope metadata; `related_ids` also includes the
* parent id so consumers that group by relationship do not need to inspect
* message bodies.
*
* @effects: [transcript.write]
* @errors: [runtime]
*/
pub fn coord_reply(scope, room, original, message, options = nil) {
let opts = __coord_dict(options)
let parent = original?.message ?? original
if type_of(parent) != "dict" || trim(__coord_string(parent?.id)) == "" {
throw "std/coordination: coord_reply expects a coordination message or receipt"
}
let related = __coord_list(opts?.related_ids ?? message?.related_ids) + [parent.id]
return coord_post(
scope,
room,
message,
opts
+ {
to: opts?.to ?? parent?.from,
reply_to: parent.id,
thread_id: opts?.thread_id ?? parent?.thread_id ?? parent.id,
related_ids: related,
},
)
}
/**
* Append a durable coordination message to a scoped room.
*
* `scope` is one of `session`, `pipeline`, `tenant`, `workspace`, or `task`.
* `workspace` maps to tenant-scoped channels; `task` maps to the current
* session with the task id embedded in the channel name.
*
* @effects: [transcript.write]
* @errors: [runtime]
*/
pub fn coord_post(scope, room, message, options = nil) {
let opts = __coord_dict(options)
let resolution = __coord_resolution(scope, room, opts)
let envelope = __coord_envelope(scope, room, message, opts)
let channel = emit_channel(
resolution.channel_name,
envelope,
__coord_channel_options(resolution, opts, envelope.id),
)
return {
schema: __COORD_RECEIPT_SCHEMA,
id: envelope.id,
event_id: channel.event_id,
duplicate: channel.duplicate,
scope: envelope.scope,
scope_id: envelope.scope_id,
room: envelope.room,
kind: envelope.kind,
subject: envelope.subject,
channel: channel,
message: envelope,
}
}
/**
* Read the last acknowledged cursor for a coordination room consumer.
*
* Returns `nil` when the consumer has not acknowledged this room yet. Cursor
* reads are explicit so inbox consumption can be durable without deleting
* messages for other agents.
*
* @effects: []
* @errors: [runtime]
*/
pub fn coord_cursor(scope, room, consumer_id, options = nil) {
let opts = __coord_dict(options)
let resolution = __coord_resolution(scope, room, opts)
return channel_consumer_cursor(
resolution.channel_name,
__coord_consumer_id(opts, consumer_id),
__coord_channel_options(resolution, opts),
)
}
/**
* Acknowledge a coordination room through `cursor` for one consumer.
*
* This never removes messages. It only advances the caller's durable consumer
* cursor on the underlying channel/event log.
*
* @effects: [transcript.write]
* @errors: [runtime]
*/
pub fn coord_ack(scope, room, consumer_id, cursor, options = nil) {
let opts = __coord_dict(options)
let resolution = __coord_resolution(scope, room, opts)
return channel_ack(
resolution.channel_name,
__coord_consumer_id(opts, consumer_id),
cursor,
__coord_channel_options(resolution, opts),
)
}
/**
* Read addressed coordination messages for a consumer without acknowledging.
*
* Options:
* - `consumer_id` / `consumer`: explicit durable cursor owner.
* - `to` / `recipient`: address filter; defaults to the consumer id.
* - `from`, `kind`, `reply_to`, `thread_id`, `subject`: optional filters.
* - `from_cursor` / `cursor`: override the consumer cursor for this read.
* - `include_events`: keep channel event wrappers beside each message.
*
* The returned `next_cursor` is the high-water cursor scanned, not an implicit
* ack. Call `coord_ack(...)` explicitly after processing messages.
*
* @effects: []
* @errors: [runtime]
*/
pub fn coord_inbox(scope, room, options = nil) {
let opts = __coord_dict(options)
let consumer_id = __coord_consumer_id(opts)
let from_cursor = opts?.from_cursor ?? opts?.cursor ?? coord_cursor(scope, room, consumer_id, opts)
let scanned = coord_read(scope, room, filter_nil(opts + {include_events: true, from_cursor: from_cursor}))
var messages = []
var last_scanned_cursor = from_cursor
for item in scanned {
let event = item?.event ?? {}
let message = item?.message ?? item
if event?.cursor != nil {
last_scanned_cursor = event.cursor
}
if __coord_message_matches(message, opts, consumer_id) {
if opts?.include_events ?? false {
messages = messages + [item]
} else {
messages = messages + [message]
}
}
}
return {
consumer_id: consumer_id,
from_cursor: from_cursor,
last_scanned_cursor: last_scanned_cursor,
next_cursor: last_scanned_cursor,
scanned: len(scanned),
messages: messages,
}
}
/**
* Read durable coordination messages from a scoped room.
*
* Pass `{include_events: true}` to preserve the channel event wrapper next to
* each message.
*
* @effects: []
* @errors: [runtime]
*/
pub fn coord_read(scope, room, options = nil) {
let opts = __coord_dict(options)
let resolution = __coord_resolution(scope, room, opts)
let events = channel_events(resolution.channel_name, __coord_channel_options(resolution, opts))
var out = []
for event in events {
if opts?.include_events ?? false {
out = out + [{event: event, message: event.payload}]
} else {
out = out + [event.payload]
}
}
return out
}
/**
* Subscribe to the underlying durable channel topic for a coordination room.
*
* Stream entries are channel event rows; the normalized coordination message is
* available at `entry.payload`.
*
* @effects: []
* @errors: [runtime]
*/
pub fn coord_subscribe(scope, room, options = nil) {
let opts = __coord_dict(options)
let resolution = __coord_resolution(scope, room, opts)
return channel_subscribe(resolution.channel_name, __coord_channel_options(resolution, opts))
}
/**
* Explicitly persist a coordination message/receipt into durable memory.
*
* This is opt-in by construction: `coord_post` writes the ledger only, while
* `coord_remember` is the point where callers decide a message should become
* recallable context.
*
* @effects: []
* @errors: [runtime]
*/
pub fn coord_remember(message_or_receipt, options = nil) {
let opts = __coord_dict(options)
let message = message_or_receipt?.message ?? message_or_receipt
if type_of(message) != "dict" || message?.schema != __COORD_SCHEMA {
throw "std/coordination: coord_remember expects a coordination message or receipt"
}
let namespace = opts?.namespace ?? ("coordination/" + message.scope + "/" + message.room)
let key = opts?.key ?? message.id
let tags = opts?.tags ?? ["coordination", message.kind, message.scope]
return memory_store(namespace, key, message, tags, opts?.memory_options)
}