/**
* std/lifecycle/combinators — function-shaped wrappers for lifecycle
* callbacks (P-07, harn#1860). The six combinators here are not
* lifecycle-specific: they wrap any `(harness, return_value) ->
* return_value` shape and apply equally to hook handlers,
* `resume_by` callbacks, `on_finish` callbacks, and any other
* function-shaped value the runtime hands to user code.
*
* Import:
* import {
* compose, first_available, with_telemetry, with_timeout,
* if_unsettled, when,
* } from "std/lifecycle/combinators"
*
* All six are pure factories: each accepts a callback (or a list of
* callbacks) plus options, and returns a closure with no captured
* mutable state. They compose freely — `compose(with_telemetry(drain),
* with_timeout(drain, 60s))` is meaningful.
*
* Tracking: harn#1860 (P-07), harn#1853 (epic).
*/
import { is_empty, unsettled_state } from "std/lifecycle"
fn __resolve_timeout_ms(ms) {
if type_of(ms) == "int" {
return ms
}
if type_of(ms) == "float" {
return to_int(ms)
}
if type_of(ms) == "dict" {
if ms?.ms != nil {
return to_int(ms.ms)
}
if ms?.duration_ms != nil {
return to_int(ms.duration_ms)
}
if ms?.seconds != nil {
return to_int(ms.seconds * 1000.0)
}
}
return 0
}
fn __summarize_result(value) -> string {
let kind = type_of(value)
if value == nil {
return "nil"
}
if kind == "string" {
let s = value
if len(s) > 80 {
return kind + ":" + substring(s, 0, 80) + "..."
}
return kind + ":" + s
}
if kind == "int" || kind == "float" || kind == "bool" {
return kind + ":" + to_string(value)
}
if kind == "list" {
return "list:len=" + to_string(len(value))
}
if kind == "dict" {
return "dict:keys=" + to_string(len(keys(value)))
}
return kind
}
fn __stringify_error(err) -> string {
if err == nil {
return ""
}
if type_of(err) == "string" {
return err
}
if type_of(err) == "dict" {
return err?.message ?? json_stringify(err)
}
return to_string(err)
}
/**
* compose(callbacks) returns a callback that invokes every entry in
* `callbacks` sequentially with `(harness, return_value)`. Each
* callback's return value becomes the next callback's `return_value`,
* so this composes naturally with `pipeline_on_finish` chains where a
* later step wraps, redacts, or augments an earlier step's output.
* The composed callback returns the last entry's return value. An
* empty list returns `return_value` unchanged. Errors propagate; the
* composition stops at the first throw.
*
* @effects: []
* @allocation: heap
* @errors: []
* @api_stability: experimental
* @example: pipeline_on_finish(compose([on_finish_drain, audit_callback]))
*/
pub fn compose(callbacks) {
return { harness, return_value ->
if type_of(callbacks) != "list" || len(callbacks) == 0 {
return return_value
}
var current = return_value
for cb in callbacks {
current = cb(harness, current)
}
return current
}
}
/**
* first_available(callbacks) returns a callback that invokes each
* entry in order with `(harness, return_value)` and returns the first
* non-nil result. If every callback returns nil (or the list is
* empty), the composed callback returns nil. Errors propagate; on a
* throw the chain stops at the throwing callback. Use this for
* fallback chains like `[cloud_handle, local_handle, parent_handle]`
* where the first responder wins.
*
* @effects: []
* @allocation: heap
* @errors: []
* @api_stability: experimental
* @example: pipeline_on_finish(first_available([cloud_drain, local_drain]))
*/
pub fn first_available(callbacks) {
return { harness, return_value ->
if type_of(callbacks) != "list" || len(callbacks) == 0 {
return nil
}
for cb in callbacks {
let result = cb(harness, return_value)
if result != nil {
return result
}
}
return nil
}
}
/**
* with_telemetry(callback, span_name?) wraps `callback` so each
* invocation opens a `SpanKind::FnCall` OTel span named `span_name`
* (linked to the active parent span via the standard VM tracing
* stack) and emits paired `{span_name}_started` /
* `{span_name}_completed` audit entries on the supplied harness. If
* the wrapped callback throws, the wrapper emits
* `{span_name}_errored` (with a stringified error), closes the span,
* and re-throws. The default `span_name` is `"lifecycle_callback"`.
*
* @effects: [host]
* @allocation: heap
* @errors: []
* @api_stability: experimental
* @example: pipeline_on_finish(with_telemetry(on_finish_drain, "drain"))
*/
pub fn with_telemetry(callback, span_name = "lifecycle_callback") {
let label = if type_of(span_name) == "string" {
span_name
} else {
"lifecycle_callback"
}
return { harness, return_value ->
let span = __lifecycle_span_start(label)
harness.emit_audit(label + "_started", {})
try {
let result = callback(harness, return_value)
harness
.emit_audit(label + "_completed", {result_summary: __summarize_result(result)})
__lifecycle_span_end(span)
return result
} catch (e) {
harness.emit_audit(label + "_errored", {error: __stringify_error(e)})
__lifecycle_span_end(span)
throw e
}
}
}
/**
* with_timeout(callback, ms) wraps `callback` with a soft, clock-aware
* deadline. `ms` may be an int / float (milliseconds), or a dict with
* one of `{ms, duration_ms, seconds}`. The wrapper measures elapsed
* time around the callback with `now_ms()` (mockable in tests via
* `mock_time` / `advance_time`). If the callback overruns the
* deadline, the wrapper emits a `{lifecycle_callback,timed_out}`
* audit entry and returns a sentinel dict shaped
* `{__timed_out: true, timeout_ms, elapsed_ms, return_value}` so
* downstream composition (`compose`, `first_available`) can detect
* the timeout without relying on an exception. Callbacks that
* complete under the deadline pass their value through unchanged.
*
* This is a soft deadline by design: lifecycle callbacks own user
* state and side-effects, so cooperative cancellation (or letting
* the callback complete and recording the overrun) is safer than a
* hard thread-kill.
*
* @effects: [host]
* @allocation: heap
* @errors: []
* @api_stability: experimental
* @example: pipeline_on_finish(with_timeout(on_finish_drain, 30000))
*/
pub fn with_timeout(callback, ms) {
let timeout_ms = __resolve_timeout_ms(ms)
return { harness, return_value ->
let start = harness.clock.now_ms()
let result = callback(harness, return_value)
let elapsed = harness.clock.now_ms() - start
if timeout_ms <= 0 || elapsed <= timeout_ms {
return result
}
harness
.emit_audit("lifecycle_callback_timed_out", {timeout_ms: timeout_ms, elapsed_ms: elapsed})
return {__timed_out: true, timeout_ms: timeout_ms, elapsed_ms: elapsed, return_value: result}
}
}
/**
* if_unsettled(callback) returns a callback that only invokes `callback`
* when `harness.unsettled_state()` reports at least one bucket with
* pending work. When everything is settled the wrapper short-circuits
* to the inbound `return_value`. Exactly one `unsettled_state()`
* snapshot is taken per invocation so the decision is consistent and
* cheap.
*
* @effects: [host]
* @allocation: heap
* @errors: []
* @api_stability: experimental
* @example: pipeline_on_finish(if_unsettled(on_finish_drain))
*/
pub fn if_unsettled(callback) {
return { harness, return_value ->
let state = unsettled_state(harness)
if is_empty(state) {
return return_value
}
return callback(harness, return_value)
}
}
/**
* when(predicate, callback) returns a callback that invokes
* `callback(harness, return_value)` only when
* `predicate(harness, return_value)` returns truthy. Predicates may
* inspect the harness, the inbound return value, or external state.
* When the predicate is false the wrapper short-circuits to the
* inbound `return_value`.
*
* @effects: []
* @allocation: heap
* @errors: []
* @api_stability: experimental
* @example: pipeline_on_finish(when({ h, rv -> rv == nil }, fallback_cb))
*/
pub fn when(predicate, callback) {
return { harness, return_value ->
if predicate(harness, return_value) {
return callback(harness, return_value)
}
return return_value
}
}