// std/checkpoint — Checkpoint & resume utilities for resilient pipelines.
//
// Import: import { checkpoint_stage } from "std/checkpoint"
// import { checkpoint_stage, checkpoint_stage_retry } from "std/checkpoint"
// checkpoint_stage(name, f) -> value
//
// Runs f() and caches the result under `name`. On subsequent calls with the
// same name, returns the cached result without running f() again. Enables
// idempotent, resumable pipelines that survive crashes and restarts.
//
// Example:
// import { checkpoint_stage } from "std/checkpoint"
//
// pipeline process(task) {
// let data = checkpoint_stage("fetch", { -> fetch_dataset(url) })
// let cleaned = checkpoint_stage("clean", { -> clean(data) })
// let result = checkpoint_stage("process", { -> run_model(cleaned) })
// upload(result)
// }
//
// On first run all three stages execute. On a resumed run (after a crash),
// completed stages are skipped automatically.
/** checkpoint_stage. */
pub fn checkpoint_stage(name, f) {
if checkpoint_exists(name) {
return checkpoint_get(name)
}
let result = f()
checkpoint(name, result)
return result
}
// checkpoint_stage_retry(name, max_retries, fn) -> value
//
// Like checkpoint_stage, but retries fn() up to max_retries times on failure
// before propagating the error. Once successful, caches the result so retries
// are never needed on resume.
//
// Example:
// let data = checkpoint_stage_retry("fetch", 3, fn() { fetch_with_timeout(url) })
/** checkpoint_stage_retry. */
pub fn checkpoint_stage_retry(name, max_retries, f) {
if checkpoint_exists(name) {
return checkpoint_get(name)
}
var attempts = 0
var last_err = "unknown error"
while attempts < max_retries {
let result = try { f() }
if is_ok(result) {
let v = unwrap(result)
checkpoint(name, v)
return v
}
last_err = to_string(unwrap_err(result))
attempts = attempts + 1
}
throw("checkpoint_stage_retry failed after " + to_string(max_retries) + " attempts: " + last_err)
}