/**
* std/lifecycle/pool — named, concurrency-bounded agent thread pools (PL-01/PL-03).
*
* Foundation for the agent pool epic (#1883). Provides:
* - pool_create({...}) to allocate a named pool with a max_concurrent cap
* - pool_get(name) / pool_list() for lookup and introspection
* - pool.submit(fn() { ... }, options?) to enqueue a closure on the handle
* - pool.size() and pool.snapshot() for inspection
* - pool_wait(handle) to block until a submitted task reaches a terminal state
*
* Queue strategies are selected with the QueueStrategy descriptors below.
* Backpressure policies are selected with Backpressure descriptors; channel
* composition and durable state arrive in later pool tickets.
*
* Import:
* import { Backpressure, pool_create, pool_get, pool_list, pool_wait, fifo } from "std/lifecycle/pool"
*/
fn __queue_strategy(kind, options = nil) {
let base = {_type: "queue_strategy", kind: kind}
if options == nil {
return base
}
return base + options
}
/**
* fifo returns a queue strategy descriptor that dequeues oldest-first.
*
* @effects: []
* @allocation: heap
* @errors: []
* @api_stability: experimental
* @example: pool_create({queue: fifo()})
*/
pub fn fifo() {
return __queue_strategy("fifo")
}
/**
* priority returns a queue strategy descriptor that dequeues the highest
* submit-time `priority` first, with FIFO tiebreaks.
*
* @effects: []
* @allocation: heap
* @errors: []
* @api_stability: experimental
* @example: pool_create({queue: priority()})
*/
pub fn priority() {
return __queue_strategy("priority")
}
/**
* lifo returns a queue strategy descriptor that dequeues newest-first.
*
* @effects: []
* @allocation: heap
* @errors: []
* @api_stability: experimental
* @example: pool_create({queue: lifo()})
*/
pub fn lifo() {
return __queue_strategy("lifo")
}
/**
* fair_round_robin returns a strategy descriptor that partitions queued
* tasks by a submit option field and round-robins across distinct values.
*
* @effects: []
* @allocation: heap
* @errors: []
* @api_stability: experimental
* @example: pool_create({queue: fair_round_robin("tenant_id")})
*/
pub fn fair_round_robin(key = "key") {
return __queue_strategy("fair_round_robin", {key: key})
}
/**
* QueueStrategy returns the named queue strategy namespace as a dict for
* callers that prefer dotted local access.
*
* @effects: []
* @allocation: heap
* @errors: []
* @api_stability: experimental
* @example: let QueueStrategy = QueueStrategy()
*/
pub fn QueueStrategy() {
return {fifo: fifo(), priority: priority(), lifo: lifo(), fair_round_robin: fair_round_robin}
}
fn __backpressure(kind, options = nil) {
let base = {_type: "backpressure", kind: kind}
if options == nil {
return base
}
return base + options
}
/**
* backpressure_queue returns a bounded queue backpressure descriptor.
*
* `on_full` may be `"block_submitter"`, `"drop_oldest"`, `"drop_newest"`,
* or `"fail_submitter"`.
*
* @effects: []
* @allocation: heap
* @errors: []
* @api_stability: experimental
* @example: pool_create({backpressure: backpressure_queue(100, "block_submitter")})
*/
pub fn backpressure_queue(max_depth, on_full = "block_submitter") {
return __backpressure("queue", {max_depth: max_depth, on_full: on_full})
}
/**
* fail_fast returns a policy descriptor that rejects submissions when no
* worker slot is immediately available.
*
* @effects: []
* @allocation: heap
* @errors: []
* @api_stability: experimental
* @example: pool_create({backpressure: fail_fast()})
*/
pub fn fail_fast() {
return __backpressure("fail_fast")
}
/**
* ring_buffer returns a bounded policy that evicts the oldest queued task
* when a new submission arrives after the queue reaches capacity.
*
* @effects: []
* @allocation: heap
* @errors: []
* @api_stability: experimental
* @example: pool_create({backpressure: ring_buffer(10)})
*/
pub fn ring_buffer(capacity) {
return __backpressure("ring_buffer", {capacity: capacity})
}
/**
* Backpressure returns the named backpressure namespace as a dict for
* callers that prefer dotted local access.
*
* @effects: []
* @allocation: heap
* @errors: []
* @api_stability: experimental
* @example: let Backpressure = Backpressure()
*/
pub fn Backpressure() {
return {queue: backpressure_queue, fail_fast: fail_fast(), ring_buffer: ring_buffer}
}
fn __pool_attach_methods(snapshot) {
if type_of(snapshot) != "dict" {
return snapshot
}
let pool_id = snapshot.id
let methods = {
_namespace: "pool",
submit: { closure, options = nil -> __pool_submit(pool_id, closure, options) },
size: { -> __pool_size(pool_id) },
snapshot: { -> __pool_attach_methods(__pool_snapshot(pool_id)) },
}
return snapshot + methods
}
/**
* pool_create allocates a named pool and returns its handle.
*
* @effects: [host]
* @allocation: heap
* @errors: []
* @api_stability: experimental
* @example: pool_create({name: "review", max_concurrent: 2})
*/
pub fn pool_create(options = nil) {
return __pool_attach_methods(__pool_create(options))
}
/**
* pool_get looks up a pool by name or id; returns nil when not found.
*
* @effects: [host]
* @allocation: heap
* @errors: []
* @api_stability: experimental
* @example: pool_get("review")
*/
pub fn pool_get(name_or_id) {
let snapshot = __pool_get(name_or_id)
if snapshot == nil {
return nil
}
return __pool_attach_methods(snapshot)
}
/**
* pool_list returns every pool registered in the current runtime.
*
* @effects: [host]
* @allocation: heap
* @errors: []
* @api_stability: experimental
* @example: pool_list()
*/
pub fn pool_list() {
let snapshots = __pool_list()
var out = []
for snapshot in snapshots {
out = out.push(__pool_attach_methods(snapshot))
}
return out
}
/**
* pool_wait blocks until one or more pool task handles complete.
*
* @effects: [host]
* @allocation: heap
* @errors: []
* @api_stability: experimental
* @example: pool_wait(task)
*/
pub fn pool_wait(handle_or_handles) {
return __pool_wait(handle_or_handles)
}