/**
* std/lifecycle — pipeline lifecycle callbacks and presets.
*
* The pipeline DSL accepts a single `on_finish` callback that runs after
* the pipeline's declared steps complete and before the pipeline returns
* its value to the host. The callback signature is
* `fn(harness, return_value)` and its return value replaces the pipeline's
* return value. Register a callback by calling the global
* `pipeline_on_finish` builtin from anywhere inside the pipeline body.
*
* Four canonical presets ship from this module: `on_finish_abandon`,
* `on_finish_drain`, `on_finish_block_until_settled`, and
* `on_finish_handoff_to`. They are documented inline below; each preset
* is a pure function (or a factory returning one) with no captured state,
* so chaining and reuse are safe.
*
* Tracking: harn#1855 (P-02), harn#1853 (epic).
*
* @effects: []
* @allocation: heap
* @errors: []
* @api_stability: stable
* @example: pipeline_on_finish(on_finish_drain)
*/
/**
* Read the current unsettled-state snapshot from a harness handle. A thin
* wrapper around `harness.unsettled_state()` that lets callers spell the
* call as either method or free function depending on style.
*
* @effects: [host]
* @allocation: heap
* @errors: []
* @api_stability: stable
* @example: let state = unsettled_state(harness)
*/
pub fn unsettled_state(harness) {
return harness.unsettled_state()
}
/**
* Return `true` when every bucket on an `UnsettledState` snapshot is
* empty. Equivalent to `harness.is_empty(state)` but reusable from
* code that does not hold the harness.
*
* @effects: []
* @allocation: heap
* @errors: []
* @api_stability: stable
* @example: if is_empty(state) { return return_value }
*/
pub fn is_empty(state) {
return len(state?.suspended_subagents ?? []) == 0
&& len(state?.queued_triggers ?? []) == 0
&& len(state?.partial_handoffs ?? []) == 0
&& len(state?.in_flight_llm_calls ?? []) == 0
}
/**
* Return per-bucket counts for an `UnsettledState` snapshot as a dict
* shaped `{suspended, queued, partial, in_flight}`.
*
* @effects: []
* @allocation: heap
* @errors: []
* @api_stability: stable
* @example: counts(state).partial
*/
pub fn counts(state) {
return {
suspended: len(state?.suspended_subagents ?? []),
queued: len(state?.queued_triggers ?? []),
partial: len(state?.partial_handoffs ?? []),
in_flight: len(state?.in_flight_llm_calls ?? []),
}
}
/**
* Return a one-line human summary of an `UnsettledState` snapshot.
* Useful for audit payloads and operator-facing log lines.
*
* @effects: []
* @allocation: heap
* @errors: []
* @api_stability: stable
* @example: harness.emit_audit("snapshot", {summary: summary(state)})
*/
pub fn summary(state) {
let c = counts(state)
if c.suspended == 0 && c.queued == 0 && c.partial == 0 && c.in_flight == 0 {
return "no unsettled work"
}
return "unsettled work: " + to_string(c.suspended) + " suspended subagents, "
+ to_string(c.queued)
+ " queued triggers, "
+ to_string(c.partial)
+ " partial handoffs, "
+ to_string(c.in_flight)
+ " in-flight llm calls"
}
/**
* Abandon preset: reproduces today's no-callback behavior, but emits a
* `pipeline_abandoned_unsettled` audit entry when unsettled state is
* non-empty so the lost work is at least observable. Returns
* `return_value` unchanged.
*
* Use this preset when the pipeline's contract is "fire and forget" —
* deferred work that survives the pipeline's exit is acceptable and any
* downstream cleanup is the host's responsibility.
*
* @effects: [host]
* @allocation: heap
* @errors: []
* @api_stability: stable
* @example: pipeline_on_finish(on_finish_abandon)
*/
pub fn on_finish_abandon(harness, return_value) {
let unsettled = harness.unsettled_state()
if !is_empty(unsettled) {
harness
.emit_audit(
"pipeline_abandoned_unsettled",
{counts: counts(unsettled), summary: summary(unsettled)},
)
}
return return_value
}
/**
* Drain preset (the documented default): scans the harness unsettled
* state and either finalizes the pipeline immediately (when nothing is
* deferred) or delegates the per-item disposition to a settlement agent
* via `harness.spawn_settlement_agent`. The settlement-agent loop itself
* lands in harn#1856 (P-03); until that ticket ships the harness method
* returns a typed unsupported receipt and the preset surfaces that
* receipt as the pipeline's return value so callers can detect the gap.
*
* Use this preset when the pipeline should explicitly account for every
* piece of deferred work before exiting. It is the recommended default.
*
* @effects: [host]
* @allocation: heap
* @errors: []
* @api_stability: stable
* @example: pipeline_on_finish(on_finish_drain)
*/
pub fn on_finish_drain(harness, return_value) {
let unsettled = harness.unsettled_state()
if is_empty(unsettled) {
harness.emit_audit("pipeline_finalized", {reason: "no_unsettled"})
return return_value
}
return harness.spawn_settlement_agent(unsettled, return_value)
}
/**
* `on_finish_block_until_settled(timeout, fallback)` returns a callback
* that asks the harness to wait for unsettled work to drain naturally.
* If everything settles within `timeout`, the callback emits
* `pipeline_finalized` and returns the unchanged `return_value`. On
* timeout it emits `settlement_timeout` and delegates to `fallback`
* (default `on_finish_drain`). The fallback may itself be any callback,
* so chains like `block_until_settled → handoff_to → drain` compose
* cleanly.
*
* @effects: [host]
* @allocation: heap
* @errors: []
* @api_stability: experimental
* @example: pipeline_on_finish(on_finish_block_until_settled(30s))
*/
pub fn on_finish_block_until_settled(timeout = 30s, fallback = nil) {
return { harness, return_value ->
let result = harness.wait_for_any_settlement(timeout)
if result?.status == "settled" {
harness
.emit_audit("pipeline_finalized", {reason: "settled_within_timeout", timeout: timeout})
return return_value
}
harness.emit_audit("settlement_timeout", {timeout: timeout, result: result})
let next = if fallback == nil {
on_finish_drain
} else {
fallback
}
return next(harness, return_value)
}
}
/**
* `on_finish_handoff_to(target_pipeline, options)` returns a callback
* that packages the current unsettled-state snapshot into a typed
* envelope (with `origin`, `unsettled`, and any caller-supplied options)
* and hands it to a target pipeline via `harness.handoff_to`. When there
* is nothing unsettled, the callback short-circuits to
* `pipeline_finalized` and returns the unchanged `return_value` — the
* typical case where the handoff pipeline does not need to run at all.
*
* @effects: [host]
* @allocation: heap
* @errors: []
* @api_stability: experimental
* @example: pipeline_on_finish(on_finish_handoff_to("nightly-drain"))
*/
pub fn on_finish_handoff_to(target_pipeline, options = nil) {
return { harness, return_value ->
let unsettled = harness.unsettled_state()
if is_empty(unsettled) {
harness
.emit_audit(
"pipeline_finalized",
{reason: "no_unsettled", handoff_skipped: target_pipeline},
)
return return_value
}
let envelope = {unsettled: unsettled, origin: harness.current_pipeline_id()}
+ options ?? {}
return harness.handoff_to(target_pipeline, envelope)
}
}
/**
* Return and clear the lifecycle audit log entries recorded since the
* last drain. Each entry is `{seq, kind, payload, pipeline_id}`. Useful
* from conformance fixtures and replay oracles that need to verify
* which presets recorded what during a pipeline run.
*
* @effects: [host]
* @allocation: heap
* @errors: []
* @api_stability: experimental
* @example: lifecycle_audit_log_take()
*/
pub fn lifecycle_audit_log_take() {
return pipeline_lifecycle_audit_log_take()
}
/**
* Non-destructive variant of `lifecycle_audit_log_take`: returns entries
* without draining them.
*
* @effects: [host]
* @allocation: heap
* @errors: []
* @api_stability: experimental
* @example: lifecycle_audit_log_snapshot()
*/
pub fn lifecycle_audit_log_snapshot() {
return pipeline_lifecycle_audit_log_snapshot()
}