harn-stdlib 0.8.26

Embedded Harn standard library source catalog
Documentation
/**
 * std/lifecycle/pool — named, concurrency-bounded agent thread pools (PL-01/PL-03).
 *
 * Foundation for the agent pool epic (#1883). Provides:
 *   - pool_create({...}) to allocate a named pool with a max_concurrent cap
 *   - pool_get(name) / pool_list() for lookup and introspection
 *   - pool.submit(fn() { ... }, options?) to enqueue a closure on the handle
 *   - pool.size() and pool.snapshot() for inspection
 *   - pool_wait(handle) to block until a submitted task reaches a terminal state
 *
 * Queue strategies are selected with the QueueStrategy descriptors below.
 * Backpressure policies are selected with Backpressure descriptors; channel
 * composition and durable state arrive in later pool tickets.
 *
 * Import:
 *   import { Backpressure, pool_create, pool_get, pool_list, pool_wait, fifo } from "std/lifecycle/pool"
 */
fn __queue_strategy(kind, options = nil) {
  let base = {_type: "queue_strategy", kind: kind}
  if options == nil {
    return base
  }
  return base + options
}

/**
 * fifo returns a queue strategy descriptor that dequeues oldest-first.
 *
 * @effects: []
 * @allocation: heap
 * @errors: []
 * @api_stability: experimental
 * @example: pool_create({queue: fifo()})
 */
pub fn fifo() {
  return __queue_strategy("fifo")
}

/**
 * priority returns a queue strategy descriptor that dequeues the highest
 * submit-time `priority` first, with FIFO tiebreaks.
 *
 * @effects: []
 * @allocation: heap
 * @errors: []
 * @api_stability: experimental
 * @example: pool_create({queue: priority()})
 */
pub fn priority() {
  return __queue_strategy("priority")
}

/**
 * lifo returns a queue strategy descriptor that dequeues newest-first.
 *
 * @effects: []
 * @allocation: heap
 * @errors: []
 * @api_stability: experimental
 * @example: pool_create({queue: lifo()})
 */
pub fn lifo() {
  return __queue_strategy("lifo")
}

/**
 * fair_round_robin returns a strategy descriptor that partitions queued
 * tasks by a submit option field and round-robins across distinct values.
 *
 * @effects: []
 * @allocation: heap
 * @errors: []
 * @api_stability: experimental
 * @example: pool_create({queue: fair_round_robin("tenant_id")})
 */
pub fn fair_round_robin(key = "key") {
  return __queue_strategy("fair_round_robin", {key: key})
}

/**
 * QueueStrategy returns the named queue strategy namespace as a dict for
 * callers that prefer dotted local access.
 *
 * @effects: []
 * @allocation: heap
 * @errors: []
 * @api_stability: experimental
 * @example: let QueueStrategy = QueueStrategy()
 */
pub fn QueueStrategy() {
  return {fifo: fifo(), priority: priority(), lifo: lifo(), fair_round_robin: fair_round_robin}
}

fn __backpressure(kind, options = nil) {
  let base = {_type: "backpressure", kind: kind}
  if options == nil {
    return base
  }
  return base + options
}

/**
 * backpressure_queue returns a bounded queue backpressure descriptor.
 *
 * `on_full` may be `"block_submitter"`, `"drop_oldest"`, `"drop_newest"`,
 * or `"fail_submitter"`.
 *
 * @effects: []
 * @allocation: heap
 * @errors: []
 * @api_stability: experimental
 * @example: pool_create({backpressure: backpressure_queue(100, "block_submitter")})
 */
pub fn backpressure_queue(max_depth, on_full = "block_submitter") {
  return __backpressure("queue", {max_depth: max_depth, on_full: on_full})
}

/**
 * fail_fast returns a policy descriptor that rejects submissions when no
 * worker slot is immediately available.
 *
 * @effects: []
 * @allocation: heap
 * @errors: []
 * @api_stability: experimental
 * @example: pool_create({backpressure: fail_fast()})
 */
pub fn fail_fast() {
  return __backpressure("fail_fast")
}

/**
 * ring_buffer returns a bounded policy that evicts the oldest queued task
 * when a new submission arrives after the queue reaches capacity.
 *
 * @effects: []
 * @allocation: heap
 * @errors: []
 * @api_stability: experimental
 * @example: pool_create({backpressure: ring_buffer(10)})
 */
pub fn ring_buffer(capacity) {
  return __backpressure("ring_buffer", {capacity: capacity})
}

/**
 * Backpressure returns the named backpressure namespace as a dict for
 * callers that prefer dotted local access.
 *
 * @effects: []
 * @allocation: heap
 * @errors: []
 * @api_stability: experimental
 * @example: let Backpressure = Backpressure()
 */
pub fn Backpressure() {
  return {queue: backpressure_queue, fail_fast: fail_fast(), ring_buffer: ring_buffer}
}

fn __pool_attach_methods(snapshot) {
  if type_of(snapshot) != "dict" {
    return snapshot
  }
  let pool_id = snapshot.id
  let methods = {
    _namespace: "pool",
    submit: { closure, options = nil -> __pool_submit(pool_id, closure, options) },
    size: { -> __pool_size(pool_id) },
    snapshot: { -> __pool_attach_methods(__pool_snapshot(pool_id)) },
  }
  return snapshot + methods
}

/**
 * pool_create allocates a named pool and returns its handle.
 *
 * @effects: [host]
 * @allocation: heap
 * @errors: []
 * @api_stability: experimental
 * @example: pool_create({name: "review", max_concurrent: 2})
 */
pub fn pool_create(options = nil) {
  return __pool_attach_methods(__pool_create(options))
}

/**
 * pool_get looks up a pool by name or id; returns nil when not found.
 *
 * @effects: [host]
 * @allocation: heap
 * @errors: []
 * @api_stability: experimental
 * @example: pool_get("review")
 */
pub fn pool_get(name_or_id) {
  let snapshot = __pool_get(name_or_id)
  if snapshot == nil {
    return nil
  }
  return __pool_attach_methods(snapshot)
}

/**
 * pool_list returns every pool registered in the current runtime.
 *
 * @effects: [host]
 * @allocation: heap
 * @errors: []
 * @api_stability: experimental
 * @example: pool_list()
 */
pub fn pool_list() {
  let snapshots = __pool_list()
  var out = []
  for snapshot in snapshots {
    out = out.push(__pool_attach_methods(snapshot))
  }
  return out
}

/**
 * pool_wait blocks until one or more pool task handles complete.
 *
 * @effects: [host]
 * @allocation: heap
 * @errors: []
 * @api_stability: experimental
 * @example: pool_wait(task)
 */
pub fn pool_wait(handle_or_handles) {
  return __pool_wait(handle_or_handles)
}