// std/checkpoint — Checkpoint & resume utilities for resilient pipelines.
//
// Import: import { checkpoint_stage } from "std/checkpoint"
// import { checkpoint_stage, checkpoint_stage_retry } from "std/checkpoint"
// checkpoint_stage(name, f) -> value
//
// Runs f() and caches the result under `name`. On subsequent calls with the
// same name, returns the cached result without running f() again. Enables
// idempotent, resumable pipelines that survive crashes and restarts.
//
// Example:
// import { checkpoint_stage } from "std/checkpoint"
//
// pipeline process(task) {
// let data = checkpoint_stage("fetch", { -> fetch_dataset(url) })
// let cleaned = checkpoint_stage("clean", { -> clean(data) })
// let result = checkpoint_stage("process", { -> run_model(cleaned) })
// upload(result)
// }
//
// On first run all three stages execute. On a resumed run (after a crash),
// completed stages are skipped automatically.
import { agent_emit_event } from "std/agent/state"
/** checkpoint_stage. */
pub fn checkpoint_stage(name, f) {
if checkpoint_exists(name) {
return checkpoint_get(name)
}
let result = f()
checkpoint(name, result)
return result
}
// checkpoint_stage_retry(name, max_retries, fn) -> value
//
// Like checkpoint_stage, but retries fn() up to max_retries times on failure
// before propagating the error. Once successful, caches the result so retries
// are never needed on resume.
//
// Example:
// let data = checkpoint_stage_retry("fetch", 3, fn() { fetch_with_timeout(url) })
/** checkpoint_stage_retry. */
pub fn checkpoint_stage_retry(name, max_retries, f) {
if checkpoint_exists(name) {
return checkpoint_get(name)
}
var attempts = 0
var last_err = "unknown error"
while attempts < max_retries {
let result = try {
f()
}
if is_ok(result) {
let v = unwrap(result)
checkpoint(name, v)
return v
}
last_err = to_string(unwrap_err(result))
attempts = attempts + 1
}
throw "checkpoint_stage_retry failed after " + to_string(max_retries) + " attempts: " + last_err
}
fn __typed_checkpoint_options(options) {
let opts = options ?? {}
var llm_opts = opts?.llm_options ?? opts
if llm_opts?.schema_retries == nil {
llm_opts = llm_opts + {schema_retries: opts?.schema_retries ?? 2}
}
if llm_opts?.repair == nil {
llm_opts = llm_opts + {repair: opts?.repair ?? {enabled: true}}
}
if opts?.system != nil && llm_opts?.system == nil {
llm_opts = llm_opts + {system: opts.system}
}
return llm_opts
}
fn __typed_checkpoint_string_list(value) {
if value == nil {
return []
}
if type_of(value) == "list" {
return value.map({ item -> to_string(item) })
}
return [to_string(value)]
}
fn __typed_checkpoint_validator_errors(value) {
if value == nil {
return []
}
if type_of(value) == "bool" {
return if value {
[]
} else {
["validator returned false"]
}
}
if type_of(value) == "string" {
let text = trim(value)
return if text == "" {
[]
} else {
[text]
}
}
if type_of(value) == "list" {
return __typed_checkpoint_string_list(value)
}
if type_of(value) == "dict" {
if value?.ok ?? false {
return []
}
let errors = value?.errors ?? value?.validation_errors ?? value?.messages
if errors != nil {
return __typed_checkpoint_string_list(errors)
}
let message = value?.error ?? value?.message ?? "validator rejected output"
return [to_string(message)]
}
return ["validator returned unsupported result type " + type_of(value)]
}
fn __typed_checkpoint_validate(data, validator) {
if validator == nil {
return []
}
let result = validator(data)
return __typed_checkpoint_validator_errors(result)
}
fn __typed_checkpoint_failure_lists(result) {
let category = result?.error_category ?? "unknown"
let error = result?.error ?? "structured output failed"
if category == "missing_json" {
return {parse_failures: [error], schema_failures: []}
}
return {parse_failures: [], schema_failures: [error]}
}
fn __typed_checkpoint_attempt_record(index, result, validator_errors) {
return {
attempt: index,
ok: result.ok,
raw_text: result?.raw_text ?? "",
error: result?.error ?? "",
error_category: result?.error_category,
repaired: result?.repaired ?? false,
extracted_json: result?.extracted_json ?? false,
validator_errors: validator_errors,
}
}
fn __typed_checkpoint_correction_prompt(name, prompt, errors) {
return prompt + "\n\nTyped-output checkpoint '" + name
+ "' rejected the previous JSON after parsing. Return a corrected JSON object only.\n"
+ "Validator correction messages:\n- "
+ errors.join("\n- ")
}
fn __typed_checkpoint_status(ok, category) {
if ok {
return "accepted"
}
if category == "validator_failure" {
return "validator_rejected"
}
return "schema_rejected"
}
fn __typed_checkpoint_emit(options, envelope) {
try {
__host_typed_checkpoint_trace(envelope)
} catch (e) {
}
let session_id = options?.session_id ?? options?.llm_options?.session_id
if session_id != nil && session_id != "" {
try {
agent_emit_event(session_id, "typed_checkpoint", envelope)
} catch (e) {
}
}
}
fn __typed_checkpoint_envelope(args) {
let ok = args.ok
let checkpoint_attempts = args.checkpoint_attempts ?? 0
let category = if ok {
nil
} else {
args.error_category ?? "unknown"
}
let parse_failures = args.parse_failures ?? []
let schema_failures = args.schema_failures ?? []
let validator_failures = args.validator_failures ?? []
let validation_errors = parse_failures + schema_failures + validator_failures
let final_result = args.final_result
return {
_type: "typed_output_checkpoint",
version: 1,
name: args.name,
ok: ok,
status: __typed_checkpoint_status(ok, category),
data: if ok {
final_result?.data
} else {
args.data ?? nil
},
raw_text: final_result?.raw_text ?? args.raw_text ?? "",
raw_outputs: args.raw_outputs ?? [],
error: if ok {
""
} else {
args.error ?? validation_errors.join("; ")
},
error_category: category,
attempts: args.attempts ?? 0,
checkpoint_attempts: checkpoint_attempts,
retries: if checkpoint_attempts > 0 {
checkpoint_attempts - 1
} else {
0
},
repaired: args.repaired ?? false,
extracted_json: args.extracted_json ?? false,
parse_failures: parse_failures,
schema_failures: schema_failures,
validator_failures: validator_failures,
validation_errors: validation_errors,
final_accepted: ok,
trace: args.trace ?? [],
usage: final_result?.usage ?? {},
model: final_result?.model ?? "",
provider: final_result?.provider ?? "",
}
}
/**
* Run a strict, schema-bound LLM checkpoint with bounded repair and an
* optional deterministic post-parse validator. The validator may return
* true/nil, a string, a list of messages, or `{ok, errors}`; rejected
* outputs are re-prompted with those correction messages before any
* caller-owned side effects run.
*/
pub fn typed_output_checkpoint(name, prompt, schema, options = nil, validator = nil) {
let opts = options ?? {}
let max_validator_retries = opts?.validator_retries ?? 1
var checkpoint_attempt = 0
var total_llm_attempts = 0
var raw_outputs = []
var trace = []
var validator_failures = []
var current_prompt = prompt
var any_repaired = false
var any_extracted = false
var last_result = nil
while checkpoint_attempt <= max_validator_retries {
checkpoint_attempt = checkpoint_attempt + 1
let result = llm_call_structured_result(current_prompt, schema, __typed_checkpoint_options(opts))
last_result = result
let result_attempts = result?.attempts ?? 0
total_llm_attempts = total_llm_attempts + result_attempts
raw_outputs = raw_outputs.push(result?.raw_text ?? "")
let result_repaired = result?.repaired ?? false
let result_extracted = result?.extracted_json ?? false
any_repaired = any_repaired || result_repaired
any_extracted = any_extracted || result_extracted
if !result.ok {
let failures = __typed_checkpoint_failure_lists(result)
trace = trace.push(__typed_checkpoint_attempt_record(checkpoint_attempt, result, []))
let envelope = __typed_checkpoint_envelope(
{
name: name,
ok: false,
final_result: result,
error: result?.error ?? "structured output failed",
error_category: result?.error_category ?? "schema_validation",
attempts: total_llm_attempts,
checkpoint_attempts: checkpoint_attempt,
repaired: any_repaired,
extracted_json: any_extracted,
parse_failures: failures.parse_failures,
schema_failures: failures.schema_failures,
validator_failures: validator_failures,
raw_outputs: raw_outputs,
trace: trace,
},
)
__typed_checkpoint_emit(opts, envelope)
return envelope
}
let errors = __typed_checkpoint_validate(result.data, validator)
trace = trace.push(__typed_checkpoint_attempt_record(checkpoint_attempt, result, errors))
if len(errors) == 0 {
let envelope = __typed_checkpoint_envelope(
{
name: name,
ok: true,
final_result: result,
attempts: total_llm_attempts,
checkpoint_attempts: checkpoint_attempt,
repaired: any_repaired,
extracted_json: any_extracted,
validator_failures: validator_failures,
raw_outputs: raw_outputs,
trace: trace,
},
)
__typed_checkpoint_emit(opts, envelope)
return envelope
}
validator_failures = validator_failures + errors
if checkpoint_attempt > max_validator_retries {
let envelope = __typed_checkpoint_envelope(
{
name: name,
ok: false,
final_result: result,
data: result.data,
error: errors.join("; "),
error_category: "validator_failure",
attempts: total_llm_attempts,
checkpoint_attempts: checkpoint_attempt,
repaired: any_repaired,
extracted_json: any_extracted,
validator_failures: validator_failures,
raw_outputs: raw_outputs,
trace: trace,
},
)
__typed_checkpoint_emit(opts, envelope)
return envelope
}
current_prompt = __typed_checkpoint_correction_prompt(name, prompt, errors)
}
let envelope = __typed_checkpoint_envelope(
{
name: name,
ok: false,
final_result: last_result,
error: "typed checkpoint exhausted without a final result",
error_category: "validator_failure",
attempts: total_llm_attempts,
checkpoint_attempts: checkpoint_attempt,
repaired: any_repaired,
extracted_json: any_extracted,
validator_failures: validator_failures,
raw_outputs: raw_outputs,
trace: trace,
},
)
__typed_checkpoint_emit(opts, envelope)
return envelope
}
fn __typed_checkpoint_text_repair_prompt(name, raw_text, errors) {
return "Typed-output checkpoint '" + name + "' rejected an already-produced JSON summary.\n"
+ "Do not describe new actions or claim new side effects. Repair only the summary below.\n"
+ "Validator correction messages:\n- "
+ errors.join("\n- ")
+ "\n\nRaw summary:\n"
+ raw_text
+ "\n\nReturn the corrected JSON object only."
}
/**
* Validate already-produced model text against a schema and optional
* deterministic validator. Validator repair prompts operate only on the
* raw summary text, so callers can run this after side-effectful agent
* work without replaying those side effects.
*/
pub fn typed_output_checkpoint_from_text(name, raw_text, schema, options = nil, validator = nil) {
let opts = options ?? {}
let max_validator_retries = opts?.validator_retries ?? 1
var checkpoint_attempt = 1
var total_attempts = 0
var raw_outputs = [raw_text]
var trace = []
var validator_failures = []
let recovered = schema_recover(raw_text, schema, opts)
let recovered_attempts = recovered.attempts ?? 0
total_attempts = total_attempts + recovered_attempts
if !recovered.ok {
let failures = __typed_checkpoint_failure_lists(recovered)
trace = trace.push(__typed_checkpoint_attempt_record(checkpoint_attempt, recovered, []))
let envelope = __typed_checkpoint_envelope(
{
name: name,
ok: false,
final_result: recovered,
error: recovered?.error ?? "structured output recovery failed",
error_category: recovered?.error_category ?? "schema_validation",
attempts: total_attempts,
checkpoint_attempts: checkpoint_attempt,
repaired: recovered?.repaired ?? false,
extracted_json: recovered?.stage == "extracted",
parse_failures: failures.parse_failures,
schema_failures: failures.schema_failures,
validator_failures: validator_failures,
raw_outputs: raw_outputs,
trace: trace,
},
)
__typed_checkpoint_emit(opts, envelope)
return envelope
}
var last_result = recovered
var any_repaired = recovered.repaired ?? false
var any_extracted = recovered.stage == "extracted"
var errors = __typed_checkpoint_validate(recovered.data, validator)
trace = trace.push(__typed_checkpoint_attempt_record(checkpoint_attempt, recovered, errors))
while len(errors) > 0 && checkpoint_attempt <= max_validator_retries {
validator_failures = validator_failures + errors
checkpoint_attempt = checkpoint_attempt + 1
let prompt = __typed_checkpoint_text_repair_prompt(name, raw_text, errors)
let result = llm_call_structured_result(prompt, schema, __typed_checkpoint_options(opts))
last_result = result
let result_attempts = result?.attempts ?? 0
total_attempts = total_attempts + result_attempts
raw_outputs = raw_outputs.push(result?.raw_text ?? "")
let result_repaired = result?.repaired ?? false
let result_extracted = result?.extracted_json ?? false
any_repaired = any_repaired || result_repaired
any_extracted = any_extracted || result_extracted
if !result.ok {
let failures = __typed_checkpoint_failure_lists(result)
trace = trace.push(__typed_checkpoint_attempt_record(checkpoint_attempt, result, []))
let envelope = __typed_checkpoint_envelope(
{
name: name,
ok: false,
final_result: result,
error: result?.error ?? "structured output failed",
error_category: result?.error_category ?? "schema_validation",
attempts: total_attempts,
checkpoint_attempts: checkpoint_attempt,
repaired: any_repaired,
extracted_json: any_extracted,
parse_failures: failures.parse_failures,
schema_failures: failures.schema_failures,
validator_failures: validator_failures,
raw_outputs: raw_outputs,
trace: trace,
},
)
__typed_checkpoint_emit(opts, envelope)
return envelope
}
errors = __typed_checkpoint_validate(result.data, validator)
trace = trace.push(__typed_checkpoint_attempt_record(checkpoint_attempt, result, errors))
}
if len(errors) == 0 {
let envelope = __typed_checkpoint_envelope(
{
name: name,
ok: true,
final_result: last_result,
attempts: total_attempts,
checkpoint_attempts: checkpoint_attempt,
repaired: any_repaired,
extracted_json: any_extracted,
validator_failures: validator_failures,
raw_outputs: raw_outputs,
trace: trace,
},
)
__typed_checkpoint_emit(opts, envelope)
return envelope
}
validator_failures = validator_failures + errors
let envelope = __typed_checkpoint_envelope(
{
name: name,
ok: false,
final_result: last_result,
data: last_result?.data,
error: errors.join("; "),
error_category: "validator_failure",
attempts: total_attempts,
checkpoint_attempts: checkpoint_attempt,
repaired: any_repaired,
extracted_json: any_extracted,
validator_failures: validator_failures,
raw_outputs: raw_outputs,
trace: trace,
},
)
__typed_checkpoint_emit(opts, envelope)
return envelope
}