// std/coordination — durable multi-agent coordination ledger helpers.
//
// The storage substrate is Harn's channel/event-log system. This module only
// normalizes coordination envelopes so agents can coordinate without inventing
// host-local mailboxes or user-visible prose protocols.
import { filter_nil } from "std/collections"
import { ensure_parent_dir, read_json, write_json } from "std/fs"
import { memory_store } from "std/memory"
let __COORD_SCHEMA = "harn.coordination.message.v1"
let __COORD_RECEIPT_SCHEMA = "harn.coordination.receipt.v1"
let __COORD_DIR_LOCK_SCHEMA = "harn.coordination.dir_lock.v1"
let __COORD_CHANNEL_PREFIX = "coord"
let __COORD_KINDS = ["status", "claim", "handoff", "blocker", "decision", "request", "fact"]
let __COORD_DIR_LOCK_TTL_MS = 10 * 60 * 1000
let __COORD_DIR_LOCK_REAPER_TTL_MS = 60 * 1000
fn __coord_string(value) {
if value == nil {
return ""
}
if type_of(value) == "string" {
return value
}
return to_string(value)
}
fn __coord_text(value) {
if value == nil {
return ""
}
if type_of(value) == "string" {
return value
}
if type_of(value) == "dict" || type_of(value) == "list" {
return json_stringify(value)
}
return to_string(value)
}
fn __coord_dict(value) {
if type_of(value) == "dict" {
return value
}
return {}
}
fn __coord_is_callable(value) -> bool {
let kind = type_of(value)
return kind == "function" || kind == "closure" || kind == "fn"
}
fn __coord_ctx() {
let ctx = runtime_context()
if type_of(ctx) == "dict" {
return ctx
}
return {}
}
fn __coord_normalized_scope(scope) {
let value = lowercase(trim(__coord_string(scope)))
if value == "" {
return "session"
}
if contains(["session", "pipeline", "tenant", "workspace", "task"], value) {
return value
}
throw "std/coordination: unsupported scope `" + value + "`"
}
fn __coord_validate_component(label, value) {
let text = trim(__coord_string(value))
if text == "" {
throw "std/coordination: " + label + " is required"
}
if contains(text, ":") || contains(text, " ") || contains(text, "\n") || contains(text, "\t") {
throw "std/coordination: " + label + " must not contain whitespace or ':'"
}
return text
}
fn __coord_topic_component(value) {
return regex_replace("[^A-Za-z0-9._-]", "_", __coord_validate_component("topic component", value))
}
fn __coord_channel_name(parts) {
var normalized = [__COORD_CHANNEL_PREFIX]
for part in parts {
normalized = normalized + [__coord_topic_component(part)]
}
return join(normalized, ".")
}
fn __coord_scope_id(scope, opts, ctx) {
if scope == "session" {
return opts?.session_id
?? opts?.scope_id
?? ctx?.agent_session_id
?? ctx?.root_agent_session_id
?? ctx?.scope_id
?? ctx?.root_task_id
?? "session"
}
if scope == "pipeline" {
let id = opts?.pipeline_id ?? opts?.scope_id ?? ctx?.workflow_id ?? ctx?.run_id
if id == nil || trim(__coord_string(id)) == "" {
throw "std/coordination: pipeline scope requires pipeline_id, workflow_id, or run_id"
}
return id
}
if scope == "workspace" {
return opts?.workspace_id ?? opts?.scope_id ?? ctx?.workspace_id ?? "workspace"
}
if scope == "task" {
return opts?.task_id ?? opts?.scope_id ?? ctx?.task_id ?? ctx?.root_task_id ?? "task"
}
return opts?.tenant_id ?? opts?.scope_id ?? ctx?.tenant_id ?? "default"
}
fn __coord_resolution(scope, room, options) {
let opts = __coord_dict(options)
let ctx = __coord_ctx()
let logical_scope = __coord_normalized_scope(scope)
let room_name = __coord_validate_component("room", room)
let logical_scope_id = __coord_validate_component("scope id", __coord_scope_id(logical_scope, opts, ctx))
if logical_scope == "workspace" {
let tenant_id = __coord_validate_component("tenant id", opts?.tenant_id ?? ctx?.tenant_id ?? "default")
return {
logical_scope: logical_scope,
logical_scope_id: logical_scope_id,
channel_scope: "tenant",
channel_scope_id: tenant_id,
channel_name: __coord_channel_name(["workspace", logical_scope_id, room_name]),
room: room_name,
}
}
if logical_scope == "task" {
let session_id = __coord_validate_component("session id", __coord_scope_id("session", opts, ctx))
return {
logical_scope: logical_scope,
logical_scope_id: logical_scope_id,
channel_scope: "session",
channel_scope_id: session_id,
channel_name: __coord_channel_name(["task", logical_scope_id, room_name]),
room: room_name,
}
}
return {
logical_scope: logical_scope,
logical_scope_id: logical_scope_id,
channel_scope: logical_scope,
channel_scope_id: logical_scope_id,
channel_name: __coord_channel_name([logical_scope, room_name]),
room: room_name,
}
}
fn __coord_channel_options(resolution, options, id = nil) {
let opts = __coord_dict(options)
var out = {scope: resolution.channel_scope}
if id != nil {
out = out + {id: id}
}
if resolution.channel_scope == "session" {
out = out + {session_id: resolution.channel_scope_id}
} else if resolution.channel_scope == "pipeline" {
out = out + {pipeline_id: resolution.channel_scope_id}
} else if resolution.channel_scope == "tenant" {
out = out + {tenant_id: resolution.channel_scope_id}
}
let ttl = opts?.ttl
if ttl != nil {
out = out + {ttl: ttl}
}
let limit = opts?.limit
if limit != nil {
out = out + {limit: limit}
}
let cursor = opts?.from_cursor ?? opts?.cursor
if cursor != nil {
out = out + {from_cursor: cursor}
}
return out
}
fn __coord_list(value) {
if value == nil {
return []
}
if type_of(value) == "list" {
return value
}
return [value]
}
fn __coord_nonnegative_int(value, fallback, label) {
if value == nil {
return fallback
}
let parsed = to_int(value)
if parsed == nil || parsed < 0 {
throw "std/coordination: " + label + " must be a non-negative int"
}
return parsed
}
fn __coord_positive_int(value, fallback, label) {
if value == nil {
return fallback
}
let parsed = to_int(value)
if parsed == nil || parsed <= 0 {
throw "std/coordination: " + label + " must be a positive int"
}
return parsed
}
fn __coord_address_identifier(address) {
if address == nil {
return nil
}
if type_of(address) == "string" {
return address
}
if type_of(address) == "dict" {
return address?.session_id
?? address?.root_session_id
?? address?.agent
?? address?.worker_id
?? address?.run_id
?? address?.task_id
?? address?.root_task_id
}
return nil
}
fn __coord_consumer_id(options, fallback = nil) {
let opts = __coord_dict(options)
let ctx = __coord_ctx()
let address_id = __coord_address_identifier(opts?.to ?? opts?.recipient)
return __coord_validate_component(
"consumer id",
opts?.consumer_id
?? opts?.consumer
?? fallback
?? address_id
?? opts?.session_id
?? ctx?.agent_session_id
?? ctx?.worker_id
?? ctx?.run_id
?? "default",
)
}
fn __coord_text_matches(value, expected) {
let needle = trim(__coord_string(expected))
if needle == "" {
return true
}
if type_of(value) == "list" {
for item in value {
if __coord_text_matches(item, needle) {
return true
}
}
return false
}
return trim(__coord_string(value)) == needle
}
fn __coord_address_matches(address, expected) {
let needle = trim(__coord_string(__coord_address_identifier(expected) ?? expected))
if needle == "" {
return true
}
if type_of(address) == "string" {
return trim(address) == needle
}
if type_of(address) == "dict" {
return __coord_text_matches(
[
address?.session_id,
address?.root_session_id,
address?.agent,
address?.worker_id,
address?.run_id,
address?.task_id,
address?.root_task_id,
],
needle,
)
}
return false
}
fn __coord_kind_matches(kind, expected) {
if expected == nil || trim(__coord_string(expected)) == "" {
return true
}
if type_of(expected) == "list" {
for item in expected {
if kind == lowercase(trim(__coord_string(item))) {
return true
}
}
return false
}
return kind == lowercase(trim(__coord_string(expected)))
}
fn __coord_message_from_input(value, options) {
let opts = __coord_dict(options)
if type_of(value) == "dict" {
let message = value?.message ?? value
if type_of(message) == "dict" && trim(__coord_string(message?.id)) != "" {
return message
}
}
let id = trim(__coord_string(value))
if id == "" {
throw "std/coordination: request id is required"
}
return filter_nil({id: id, thread_id: opts?.thread_id ?? id, from: opts?.from})
}
fn __coord_wait_reply_options(request, options) {
let opts = __coord_dict(options)
let fallback_consumer = "reply." + request.id
let consumer_id = __coord_consumer_id(opts, fallback_consumer)
return filter_nil(
opts
+ {
consumer_id: consumer_id,
to: opts?.to ?? opts?.recipient ?? request?.from ?? consumer_id,
from: opts?.response_from ?? opts?.reply_from ?? opts?.from,
reply_to: request.id,
thread_id: opts?.thread_id ?? request?.thread_id ?? request.id,
kind: opts?.response_kind ?? opts?.reply_kind ?? opts?.kind,
subject: opts?.response_subject ?? opts?.reply_subject ?? opts?.subject,
},
)
}
fn __coord_message_matches(message, options, default_to) {
let opts = __coord_dict(options)
if !__coord_kind_matches(message?.kind, opts?.kind) {
return false
}
if !__coord_address_matches(message?.to, opts?.to ?? opts?.recipient ?? default_to) {
return false
}
if opts?.from != nil && !__coord_address_matches(message?.from, opts?.from) {
return false
}
if opts?.reply_to != nil && !__coord_text_matches(message?.reply_to, opts?.reply_to) {
return false
}
if opts?.thread_id != nil && !__coord_text_matches(message?.thread_id, opts?.thread_id) {
return false
}
if opts?.subject != nil && !__coord_text_matches(message?.subject, opts?.subject) {
return false
}
return true
}
fn __coord_kind(message, options) {
let kind = lowercase(trim(__coord_string(options?.kind ?? message?.kind ?? "status")))
if !contains(__COORD_KINDS, kind) {
throw "std/coordination: unsupported message kind `" + kind + "`"
}
return kind
}
fn __coord_dir_lock_path(resource_path, options) {
let opts = __coord_dict(options)
let explicit = trim(__coord_string(opts?.lock_path))
if explicit != "" {
return explicit
}
let resource = trim(__coord_string(resource_path))
if resource == "" {
throw "std/coordination: lock resource path is required"
}
return resource + ".lock.d"
}
fn __coord_dir_lock_record_path(lock_path) {
return path_join(lock_path, "owner.json")
}
fn __coord_dir_lock_reaper_path(lock_path) {
return lock_path + ".reap.d"
}
fn __coord_dir_lock_owner(owner, options) {
let opts = __coord_dict(options)
let ctx = __coord_ctx()
let raw = if type_of(owner) == "dict" {
owner
} else {
{}
}
let explicit = if type_of(owner) == "string" {
owner
} else {
nil
}
var owner_id = trim(
__coord_string(
raw?.id
?? raw?.owner_id
?? explicit
?? opts?.owner_id
?? opts?.run_id
?? ctx?.run_id
?? ctx?.agent_session_id
?? ctx?.worker_id
?? ctx?.agent
?? ctx?.persona,
),
)
if owner_id == "" {
owner_id = "coord-lock-" + uuid_v7()
}
return filter_nil(
raw
+ {
id: owner_id,
agent: raw?.agent ?? opts?.agent ?? ctx?.agent ?? ctx?.persona,
session_id: raw?.session_id ?? opts?.session_id ?? ctx?.agent_session_id,
worker_id: raw?.worker_id ?? opts?.worker_id ?? ctx?.worker_id,
run_id: raw?.run_id ?? opts?.run_id ?? ctx?.run_id,
},
)
}
fn __coord_dir_lock_record(resource_path, lock_path, owner, options, now_ms) {
let opts = __coord_dict(options)
let ttl_ms = __coord_nonnegative_int(opts?.ttl_ms ?? opts?.lease_ms, __COORD_DIR_LOCK_TTL_MS, "ttl_ms")
let token = trim(__coord_string(opts?.token))
return filter_nil(
{
schema: __COORD_DIR_LOCK_SCHEMA,
resource_path: resource_path,
lock_path: lock_path,
record_path: __coord_dir_lock_record_path(lock_path),
token: if token == "" {
"coord-lock-token-" + uuid_v7()
} else {
token
},
owner: owner,
reason: opts?.reason,
refs: __coord_list(opts?.refs),
acquired_at_ms: now_ms,
updated_at_ms: now_ms,
ttl_ms: ttl_ms,
expires_at_ms: now_ms + ttl_ms,
},
)
}
fn __coord_dir_lock_same_record(record, expected) {
if type_of(record) != "dict" || type_of(expected) != "dict" {
return false
}
let token = record?.token
return token != nil && token == expected?.token && record?.acquired_at_ms == expected?.acquired_at_ms
}
fn __coord_dir_lock_existing(lock_path, now_ms) {
let record_path = __coord_dir_lock_record_path(lock_path)
if !harness.fs.exists(lock_path) {
return {status: "absent", lock_path: lock_path, record_path: record_path}
}
if !harness.fs.exists(record_path) {
return {status: "unrecorded", lock_path: lock_path, record_path: record_path}
}
let record = read_json(record_path, nil)
if type_of(record) != "dict" || record?.schema != __COORD_DIR_LOCK_SCHEMA {
return {status: "malformed", lock_path: lock_path, record_path: record_path, record: record}
}
return {
status: if __coord_dir_lock_stale(record, now_ms) {
"stale"
} else {
"active"
},
lock_path: lock_path,
record_path: record_path,
record: record,
}
}
fn __coord_dir_lock_clear_if_same(lock_path, expected_record) {
let current = __coord_dir_lock_existing(lock_path, harness.clock.now_ms())
if current.status == "absent" {
return true
}
if !__coord_dir_lock_same_record(current.record, expected_record) {
return false
}
harness.fs.delete(lock_path)
return true
}
fn __coord_dir_lock_stale(record, now_ms) {
let expires_at_ms = to_int(record?.expires_at_ms)
return expires_at_ms != nil && expires_at_ms <= now_ms
}
fn __coord_dir_lock_conflict(resource_path, lock_path, existing, options) {
let opts = __coord_dict(options)
let result = filter_nil(
{
schema: __COORD_DIR_LOCK_SCHEMA,
acquired: false,
status: "conflict",
reason: existing.status,
resource_path: resource_path,
lock_path: lock_path,
record_path: existing.record_path,
holder: existing.record,
message: coord_dir_lock_conflict_message(existing),
},
)
if opts?.throw_on_conflict ?? true {
throw result.message
}
return result
}
fn __coord_dir_lock_reaper_options(options, lock_path) {
let opts = __coord_dict(options)
let ttl_ms = __coord_positive_int(opts?.reaper_ttl_ms, __COORD_DIR_LOCK_REAPER_TTL_MS, "reaper_ttl_ms")
return opts
+ {
lock_path: __coord_dir_lock_reaper_path(lock_path),
ttl_ms: ttl_ms,
reason: opts?.reaper_reason ?? "coord_dir_lock_stale_cleanup",
token: "coord-lock-reaper-token-" + uuid_v7(),
}
}
fn __coord_dir_lock_reaper_conflict(resource_path, lock_path, options) {
let opts = __coord_dict(options)
let reaper_path = __coord_dir_lock_reaper_path(lock_path)
let reaper = __coord_dir_lock_existing(reaper_path, harness.clock.now_ms())
if reaper.status == "absent" {
return nil
}
if reaper.status == "stale" && (opts?.take_stale ?? true) {
let cleared = __coord_dir_lock_clear_if_same(reaper_path, reaper.record)
if cleared {
return nil
}
}
let refreshed = __coord_dir_lock_existing(reaper_path, harness.clock.now_ms())
return __coord_dir_lock_conflict(resource_path, reaper_path, refreshed, opts)
}
fn __coord_dir_lock_take_stale(resource_path, lock_path, owner, options, existing) {
let opts = __coord_dict(options)
let reaper_opts = __coord_dir_lock_reaper_options(opts, lock_path)
let reaper_path = __coord_dir_lock_reaper_path(lock_path)
let reaper = __coord_dir_lock_try_acquire(resource_path, reaper_path, owner, reaper_opts)
if reaper == nil {
let active_reaper = __coord_dir_lock_existing(reaper_path, harness.clock.now_ms())
return __coord_dir_lock_conflict(resource_path, reaper_path, active_reaper, opts)
}
var acquired = nil
let current = __coord_dir_lock_existing(lock_path, harness.clock.now_ms())
if current.status == "stale" && __coord_dir_lock_same_record(current.record, existing.record) {
let cleared = __coord_dir_lock_clear_if_same(lock_path, existing.record)
if cleared {
acquired = __coord_dir_lock_try_acquire(resource_path, lock_path, owner, opts, current.record)
}
}
let _ = try {
coord_release_dir_lock(reaper)
} catch (_e) {
nil
}
if acquired != nil {
return acquired
}
let refreshed = __coord_dir_lock_existing(lock_path, harness.clock.now_ms())
return __coord_dir_lock_conflict(resource_path, lock_path, refreshed, opts)
}
fn __coord_dir_lock_try_acquire(resource_path, lock_path, owner, options, stale_removed = nil) {
let opts = __coord_dict(options)
if opts?.ensure_parent ?? true {
ensure_parent_dir(lock_path)
}
let created = try {
harness.fs.mkdir(lock_path, false)
true
} catch (_e) {
false
}
if !created {
return nil
}
let now_ms = harness.clock.now_ms()
let record = __coord_dir_lock_record(resource_path, lock_path, owner, opts, now_ms)
write_json(record.record_path, record, {pretty: true, ensure_parent: false})
return filter_nil(
{
schema: __COORD_DIR_LOCK_SCHEMA,
acquired: true,
status: "acquired",
resource_path: resource_path,
lock_path: lock_path,
reaper_path: __coord_dir_lock_reaper_path(lock_path),
record_path: record.record_path,
token: record.token,
owner: owner,
record: record,
stale_removed: stale_removed,
},
)
}
fn __coord_actor(message, options, ctx) {
let raw = __coord_dict(options?.from ?? message?.from)
return filter_nil(
raw
+ {
agent: raw?.agent ?? options?.agent ?? ctx?.agent ?? ctx?.persona,
session_id: raw?.session_id ?? ctx?.agent_session_id,
root_session_id: raw?.root_session_id ?? ctx?.root_agent_session_id,
worker_id: raw?.worker_id ?? ctx?.worker_id,
run_id: raw?.run_id ?? ctx?.run_id,
task_id: raw?.task_id ?? ctx?.task_id,
root_task_id: raw?.root_task_id ?? ctx?.root_task_id,
},
)
}
fn __coord_body(message, options) {
if type_of(message) == "string" {
return message
}
return __coord_text(
options?.body ?? message?.body ?? message?.text ?? message?.message ?? message?.subject,
)
}
fn __coord_subject(message, options, body) {
let explicit = options?.subject ?? message?.subject ?? message?.title
if explicit != nil {
return trim(__coord_string(explicit))
}
let first_line = split(body ?? "", "\n")[0] ?? ""
if len(first_line) > 96 {
return substring(first_line, 0, 96)
}
return first_line
}
fn __coord_envelope(scope, room, message, options) {
let msg = __coord_dict(message)
let opts = __coord_dict(options)
let ctx = __coord_ctx()
let resolution = __coord_resolution(scope, room, opts)
let id = __coord_validate_component(
"message id",
opts?.id ?? opts?.dedupe_key ?? msg?.id ?? ("coord-" + uuid_v7()),
)
let body = __coord_body(message, opts)
let subject = __coord_subject(msg, opts, body)
let kind = __coord_kind(msg, opts)
return filter_nil(
{
schema: __COORD_SCHEMA,
id: id,
scope: resolution.logical_scope,
scope_id: resolution.logical_scope_id,
room: resolution.room,
kind: kind,
from: __coord_actor(msg, opts, ctx),
to: opts?.to ?? msg?.to,
subject: subject,
body: body,
data: opts?.data ?? msg?.data ?? msg?.payload,
refs: __coord_list(opts?.refs ?? msg?.refs),
related_ids: __coord_list(opts?.related_ids ?? msg?.related_ids),
reply_to: opts?.reply_to ?? msg?.reply_to,
thread_id: opts?.thread_id ?? msg?.thread_id,
dedupe_key: opts?.dedupe_key,
created_at: opts?.created_at ?? msg?.created_at ?? date_iso(),
ttl: opts?.ttl ?? msg?.ttl,
privacy: opts?.privacy ?? msg?.privacy,
},
)
}
/**
* Send a durable addressed request.
*
* This is `coord_send` with request defaults: kind `request`, a stable id, and
* `thread_id` defaulting to the request id. Callers can then wait for replies
* by id without encoding a bespoke "intent_query"/"intent_verdict" protocol in
* message bodies or host-local mailboxes.
*
* @effects: [transcript.write]
* @errors: [runtime]
*/
pub fn coord_request(scope, room, recipient, message, options = nil) {
let msg = __coord_dict(message)
let opts = __coord_dict(options)
let request_id = __coord_validate_component(
"request id",
opts?.id ?? opts?.dedupe_key ?? msg?.id ?? ("coord-" + uuid_v7()),
)
return coord_send(
scope,
room,
recipient,
msg,
opts
+ {id: request_id, kind: opts?.request_kind ?? "request", thread_id: opts?.thread_id ?? request_id},
)
}
/**
* Return the directory-lock path used for a resource path.
*
* Directory locks use `harness.fs.mkdir(path, false)` for atomic creation, so
* they coordinate across processes without shell-specific `mkdir` glue.
*
* @effects: []
* @errors: [runtime]
*/
pub fn coord_dir_lock_path(resource_path, options = nil) -> string {
return __coord_dir_lock_path(resource_path, options)
}
/**
* Return the owner-record path stored inside a directory lock.
*
* @effects: []
* @errors: []
*/
pub fn coord_dir_lock_record_path(lock_path: string) -> string {
return __coord_dir_lock_record_path(lock_path)
}
/**
* Return the internal stale-cleanup guard directory for a directory lock.
*
* @effects: []
* @errors: []
*/
pub fn coord_dir_lock_reaper_path(lock_path: string) -> string {
return __coord_dir_lock_reaper_path(lock_path)
}
/**
* Render a human-readable conflict message from a directory-lock state.
*
* @effects: []
* @errors: []
*/
pub fn coord_dir_lock_conflict_message(conflict) -> string {
let state = conflict?.reason ?? conflict?.status ?? "active"
let holder = conflict?.holder ?? conflict?.record
let owner = holder?.owner ?? {}
let owner_id = owner?.id ?? holder?.owner_id ?? "unknown"
let expires = holder?.expires_at_ms
let lock_path = conflict?.lock_path ?? holder?.lock_path ?? "unknown"
var message = "coordination directory lock is held: " + lock_path + " (state: " + state + ")"
message = message + "\n- owner: " + __coord_string(owner_id)
if owner?.run_id != nil {
message = message + "\n- run: " + __coord_string(owner.run_id)
}
if expires != nil {
message = message + "\n- expires_at_ms: " + __coord_string(expires)
}
return message
}
/**
* Acquire an atomic filesystem-backed directory lease.
*
* The function creates `resource_path + ".lock.d"` with non-recursive mkdir.
* On success it writes `owner.json` inside the lock directory and returns a
* structured record. If the lock exists and its owner record has expired, the
* stale directory is removed and acquisition is retried once.
*
* Options:
* - `lock_path`: override the default lock directory path.
* - `ttl_ms` / `lease_ms`: lease duration, default 10 minutes.
* - `throw_on_conflict`: default true; false returns `{acquired:false}`.
* - `take_stale`: default true; false reports stale locks as conflicts.
* - `reaper_ttl_ms`: stale-cleanup guard duration, default 1 minute.
* - `ensure_parent`: default true; create the lock parent when missing.
*
* @effects: [fs.write]
* @errors: [runtime]
*/
pub fn coord_acquire_dir_lock(resource_path: string, owner = nil, options = nil) {
let opts = __coord_dict(options)
let lock_path = __coord_dir_lock_path(resource_path, opts)
let normalized_owner = __coord_dir_lock_owner(owner, opts)
let reaper_conflict = __coord_dir_lock_reaper_conflict(resource_path, lock_path, opts)
if reaper_conflict != nil {
return reaper_conflict
}
let acquired = __coord_dir_lock_try_acquire(resource_path, lock_path, normalized_owner, opts)
if acquired != nil {
return acquired
}
let now_ms = harness.clock.now_ms()
let existing = __coord_dir_lock_existing(lock_path, now_ms)
if existing.status == "stale" && (opts?.take_stale ?? true) {
return __coord_dir_lock_take_stale(resource_path, lock_path, normalized_owner, opts, existing)
}
return __coord_dir_lock_conflict(resource_path, lock_path, existing, opts)
}
/**
* Release a directory lease returned by `coord_acquire_dir_lock`.
*
* By default the token in the live owner record must match the supplied lock
* record. Pass `{strict:false}` to best-effort delete by path.
*
* @effects: [fs.write]
* @errors: [runtime]
*/
pub fn coord_release_dir_lock(lock, options = nil) {
let opts = __coord_dict(options)
let lock_path = if type_of(lock) == "string" {
lock
} else {
lock?.lock_path ?? lock?.record?.lock_path
}
if trim(__coord_string(lock_path)) == "" {
throw "std/coordination: lock_path is required"
}
if !harness.fs.exists(lock_path) {
return {released: false, status: "absent", lock_path: lock_path}
}
if opts?.strict ?? true {
let expected = lock?.token ?? lock?.record?.token
if expected != nil {
let current = __coord_dir_lock_existing(lock_path, harness.clock.now_ms())
let actual = current?.record?.token
if actual != expected {
throw "std/coordination: refusing to release directory lock owned by another token"
}
}
}
harness.fs.delete(lock_path)
return {released: true, status: "released", lock_path: lock_path}
}
/**
* Acquire a directory lease for the duration of `body(lock)`.
*
* The lock is always released before returning the body value or rethrowing the
* body error. This is the preferred API for exclusive sections because callers
* do not need to duplicate release-on-error scaffolding around
* `coord_acquire_dir_lock` / `coord_release_dir_lock`.
*
* @effects: [fs.write]
* @errors: [runtime]
* @example: coord_with_dir_lock(path, { lock -> update_manifest(lock) }, {id: "release"})
*/
pub fn coord_with_dir_lock(resource_path: string, body, owner = nil, options = nil) {
if !__coord_is_callable(body) {
throw "std/coordination: coord_with_dir_lock body must be callable; got " + type_of(body)
}
let lock = coord_acquire_dir_lock(resource_path, owner, options)
if !(lock?.acquired ?? false) {
throw lock?.message ?? "std/coordination: directory lock was not acquired"
}
try {
let value = body(lock)
coord_release_dir_lock(lock)
return value
} catch (e) {
let _ = try {
coord_release_dir_lock(lock)
} catch (_release_error) {
nil
}
throw e
}
}
/**
* Append an addressed coordination message to a scoped room.
*
* This is a typed wrapper over `coord_post`: the message remains append-only
* channel data, while the `to` field lets consumers build non-destructive
* inboxes with explicit cursor/ack semantics.
*
* @effects: [transcript.write]
* @errors: [runtime]
*/
pub fn coord_send(scope, room, recipient, message, options = nil) {
let opts = __coord_dict(options)
return coord_post(scope, room, message, opts + {to: recipient})
}
/**
* Reply to a coordination message or receipt.
*
* `reply_to` is first-class envelope metadata; `related_ids` also includes the
* parent id so consumers that group by relationship do not need to inspect
* message bodies.
*
* @effects: [transcript.write]
* @errors: [runtime]
*/
pub fn coord_reply(scope, room, original, message, options = nil) {
let opts = __coord_dict(options)
let parent = original?.message ?? original
if type_of(parent) != "dict" || trim(__coord_string(parent?.id)) == "" {
throw "std/coordination: coord_reply expects a coordination message or receipt"
}
let related = __coord_list(opts?.related_ids ?? message?.related_ids) + [parent.id]
return coord_post(
scope,
room,
message,
opts
+ {
to: opts?.to ?? parent?.from,
reply_to: parent.id,
thread_id: opts?.thread_id ?? parent?.thread_id ?? parent.id,
related_ids: related,
},
)
}
/**
* Append a durable coordination message to a scoped room.
*
* `scope` is one of `session`, `pipeline`, `tenant`, `workspace`, or `task`.
* `workspace` maps to tenant-scoped channels; `task` maps to the current
* session with the task id embedded in the channel name.
*
* @effects: [transcript.write]
* @errors: [runtime]
*/
pub fn coord_post(scope, room, message, options = nil) {
let opts = __coord_dict(options)
let resolution = __coord_resolution(scope, room, opts)
let envelope = __coord_envelope(scope, room, message, opts)
let channel = emit_channel(
resolution.channel_name,
envelope,
__coord_channel_options(resolution, opts, envelope.id),
)
return {
schema: __COORD_RECEIPT_SCHEMA,
id: envelope.id,
event_id: channel.event_id,
duplicate: channel.duplicate,
scope: envelope.scope,
scope_id: envelope.scope_id,
room: envelope.room,
kind: envelope.kind,
subject: envelope.subject,
channel: channel,
message: envelope,
}
}
/**
* Wait a bounded time for replies to a request.
*
* `request_or_id` may be a request receipt, a coordination message, or a raw
* id. The helper polls the caller's durable inbox for messages whose
* `reply_to` and `thread_id` match the request. It does not acknowledge by
* default; pass `{ack: true}` only when the room is single-purpose or the caller
* is intentionally advancing its consumer cursor through every scanned event.
*
* Options:
* - `consumer_id` / `consumer`: durable cursor owner. Defaults to
* `reply.<request_id>` so acknowledging one request does not hide replies for
* a different request addressed to the same agent.
* - `response_kind` / `reply_kind` / `kind`: optional reply kind filter.
* - `response_subject` / `reply_subject` / `subject`: optional reply subject
* filter.
* - `response_from` / `reply_from` / `from`: optional responder filter.
* - `timeout_ms`: wall-clock budget; default 2000. Use 0 for a single scan.
* - `poll_interval_ms`: sleep between scans; default 50.
* - `ack`: advance the consumer cursor after a match.
*
* @effects: [transcript.write]
* @errors: [runtime]
*/
pub fn coord_wait_reply(scope, room, request_or_id, options = nil) {
let opts = __coord_dict(options)
let request = __coord_message_from_input(request_or_id, opts)
let wait_opts = __coord_wait_reply_options(request, opts)
let consumer_id = wait_opts.consumer_id
let timeout_ms = __coord_nonnegative_int(opts?.timeout_ms ?? opts?.budget_ms, 2000, "timeout_ms")
let poll_interval_ms = __coord_positive_int(opts?.poll_interval_ms, 50, "poll_interval_ms")
let started = harness.clock.monotonic_ms()
var last_inbox = nil
while true {
let inbox = coord_inbox(scope, room, wait_opts)
last_inbox = inbox
if len(inbox.messages) > 0 {
let elapsed = harness.clock.monotonic_ms() - started
let acked = if opts?.ack ?? false {
coord_ack(scope, room, consumer_id, inbox.next_cursor, opts)
} else {
nil
}
return filter_nil(
{
status: "matched",
matched: true,
timed_out: false,
request_id: request.id,
thread_id: wait_opts.thread_id,
consumer_id: consumer_id,
reply: inbox.messages[0],
replies: inbox.messages,
inbox: inbox,
next_cursor: inbox.next_cursor,
acknowledged: acked,
elapsed_ms: elapsed,
},
)
}
let elapsed = harness.clock.monotonic_ms() - started
if timeout_ms == 0 || elapsed >= timeout_ms {
return filter_nil(
{
status: "timeout",
matched: false,
timed_out: true,
request_id: request.id,
thread_id: wait_opts.thread_id,
consumer_id: consumer_id,
replies: [],
inbox: last_inbox,
next_cursor: last_inbox?.next_cursor,
elapsed_ms: elapsed,
},
)
}
sleep(poll_interval_ms)
}
}
/**
* Read the last acknowledged cursor for a coordination room consumer.
*
* Returns `nil` when the consumer has not acknowledged this room yet. Cursor
* reads are explicit so inbox consumption can be durable without deleting
* messages for other agents.
*
* @effects: []
* @errors: [runtime]
*/
pub fn coord_cursor(scope, room, consumer_id, options = nil) {
let opts = __coord_dict(options)
let resolution = __coord_resolution(scope, room, opts)
return channel_consumer_cursor(
resolution.channel_name,
__coord_consumer_id(opts, consumer_id),
__coord_channel_options(resolution, opts),
)
}
/**
* Acknowledge a coordination room through `cursor` for one consumer.
*
* This never removes messages. It only advances the caller's durable consumer
* cursor on the underlying channel/event log.
*
* @effects: [transcript.write]
* @errors: [runtime]
*/
pub fn coord_ack(scope, room, consumer_id, cursor, options = nil) {
let opts = __coord_dict(options)
let resolution = __coord_resolution(scope, room, opts)
return channel_ack(
resolution.channel_name,
__coord_consumer_id(opts, consumer_id),
cursor,
__coord_channel_options(resolution, opts),
)
}
/**
* Read addressed coordination messages for a consumer without acknowledging.
*
* Options:
* - `consumer_id` / `consumer`: explicit durable cursor owner.
* - `to` / `recipient`: address filter; defaults to the consumer id.
* - `from`, `kind`, `reply_to`, `thread_id`, `subject`: optional filters.
* - `from_cursor` / `cursor`: override the consumer cursor for this read.
* - `include_events`: keep channel event wrappers beside each message.
*
* The returned `next_cursor` is the high-water cursor scanned, not an implicit
* ack. Call `coord_ack(...)` explicitly after processing messages.
*
* @effects: []
* @errors: [runtime]
*/
pub fn coord_inbox(scope, room, options = nil) {
let opts = __coord_dict(options)
let consumer_id = __coord_consumer_id(opts)
let from_cursor = opts?.from_cursor ?? opts?.cursor ?? coord_cursor(scope, room, consumer_id, opts)
let scanned = coord_read(scope, room, filter_nil(opts + {include_events: true, from_cursor: from_cursor}))
var messages = []
var last_scanned_cursor = from_cursor
for item in scanned {
let event = item?.event ?? {}
let message = item?.message ?? item
if event?.cursor != nil {
last_scanned_cursor = event.cursor
}
if __coord_message_matches(message, opts, consumer_id) {
if opts?.include_events ?? false {
messages = messages + [item]
} else {
messages = messages + [message]
}
}
}
return {
consumer_id: consumer_id,
from_cursor: from_cursor,
last_scanned_cursor: last_scanned_cursor,
next_cursor: last_scanned_cursor,
scanned: len(scanned),
messages: messages,
}
}
/**
* Read durable coordination messages from a scoped room.
*
* Pass `{include_events: true}` to preserve the channel event wrapper next to
* each message.
*
* @effects: []
* @errors: [runtime]
*/
pub fn coord_read(scope, room, options = nil) {
let opts = __coord_dict(options)
let resolution = __coord_resolution(scope, room, opts)
let events = channel_events(resolution.channel_name, __coord_channel_options(resolution, opts))
var out = []
for event in events {
if opts?.include_events ?? false {
out = out + [{event: event, message: event.payload}]
} else {
out = out + [event.payload]
}
}
return out
}
/**
* Subscribe to the underlying durable channel topic for a coordination room.
*
* Stream entries are channel event rows; the normalized coordination message is
* available at `entry.payload`.
*
* @effects: []
* @errors: [runtime]
*/
pub fn coord_subscribe(scope, room, options = nil) {
let opts = __coord_dict(options)
let resolution = __coord_resolution(scope, room, opts)
return channel_subscribe(resolution.channel_name, __coord_channel_options(resolution, opts))
}
/**
* Explicitly persist a coordination message/receipt into durable memory.
*
* This is opt-in by construction: `coord_post` writes the ledger only, while
* `coord_remember` is the point where callers decide a message should become
* recallable context.
*
* @effects: []
* @errors: [runtime]
*/
pub fn coord_remember(message_or_receipt, options = nil) {
let opts = __coord_dict(options)
let message = message_or_receipt?.message ?? message_or_receipt
if type_of(message) != "dict" || message?.schema != __COORD_SCHEMA {
throw "std/coordination: coord_remember expects a coordination message or receipt"
}
let namespace = opts?.namespace ?? ("coordination/" + message.scope + "/" + message.room)
let key = opts?.key ?? message.id
let tags = opts?.tags ?? ["coordination", message.kind, message.scope]
return memory_store(namespace, key, message, tags, opts?.memory_options)
}