use std::cell::RefCell;
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::path::PathBuf;
use std::rc::Rc;
use std::sync::Arc;
use crate::event_log::{
active_event_log, install_memory_for_current_thread, EventLog, LogEvent, Topic,
};
use crate::stdlib::registration::{
async_builtin, register_builtin_group, AsyncBuiltin, BuiltinGroup, SyncBuiltin,
};
use crate::value::{VmClosure, VmError, VmValue};
use crate::vm::{Vm, VmBuiltinArity};
use harn_parser::diagnostic_codes::Code;
use serde::{Deserialize, Serialize};
use serde_json::json;
const DEFAULT_MAX_CONCURRENT: usize = 1;
const POOL_TYPE: &str = "pool";
const POOL_TASK_TYPE: &str = "pool_task";
const POOL_AUDIT_TOPIC: &str = "lifecycle.pool.audit";
const POOL_EVENT_LOG_QUEUE_DEPTH: usize = 128;
const PIPELINE_POOLS_ROOT: &str = ".harn/pools";
const DEFAULT_STALE_AFTER_MS: i64 = 30_000;
#[derive(Clone)]
struct PendingTask {
task_id: String,
closure: Rc<VmClosure>,
state: Rc<RefCell<TaskState>>,
priority: i64,
key: Option<String>,
seq: u64,
}
struct TaskState {
id: String,
pool_id: String,
pool_name: String,
key: Option<String>,
priority: i64,
status: TaskStatus,
submitted_at: String,
started_at: Option<String>,
finished_at: Option<String>,
result: Option<VmValue>,
error: Option<String>,
rejection_reason: Option<String>,
rejection_policy: Option<String>,
idempotency_key: Option<String>,
heartbeat_at_ms: i64,
submitted_at_ms: i64,
submit_span_link: Option<crate::tracing::SpanLink>,
submitted_by: String,
waiters: Vec<tokio::sync::oneshot::Sender<()>>,
}
#[derive(Clone, Copy, PartialEq, Eq)]
enum TaskStatus {
Queued,
Running,
Completed,
Failed,
Rejected,
}
impl TaskStatus {
fn as_str(self) -> &'static str {
match self {
TaskStatus::Queued => "queued",
TaskStatus::Running => "running",
TaskStatus::Completed => "completed",
TaskStatus::Failed => "failed",
TaskStatus::Rejected => "rejected",
}
}
fn is_terminal(self) -> bool {
matches!(
self,
TaskStatus::Completed | TaskStatus::Failed | TaskStatus::Rejected
)
}
}
struct PoolEntry {
id: String,
name: String,
max_concurrent: usize,
created_at: String,
submit_counter: u64,
queue: VecDeque<PendingTask>,
queue_strategy: QueueStrategy,
backpressure: BackpressureStrategy,
round_robin_after: Option<String>,
active: HashMap<String, Rc<RefCell<TaskState>>>,
tasks: BTreeMap<String, Rc<RefCell<TaskState>>>,
space_waiters: Vec<tokio::sync::oneshot::Sender<()>>,
config: BTreeMap<String, VmValue>,
scope: PoolScope,
scope_id: String,
idempotency_index: HashMap<String, String>,
stale_after_ms: i64,
store: Option<Rc<RefCell<PoolDurableStore>>>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
enum QueueStrategy {
Fifo,
Priority,
Lifo,
FairRoundRobin { key_field: String },
}
#[derive(Clone, Debug, PartialEq, Eq)]
enum BackpressureStrategy {
Unbounded,
Queue {
max_depth: usize,
on_full: QueueOnFullPolicy,
},
FailFast,
RingBuffer {
capacity: usize,
},
}
impl BackpressureStrategy {
fn name(&self) -> &'static str {
match self {
BackpressureStrategy::Unbounded => "unbounded",
BackpressureStrategy::Queue { .. } => "queue",
BackpressureStrategy::FailFast => "fail_fast",
BackpressureStrategy::RingBuffer { .. } => "ring_buffer",
}
}
fn max_depth(&self) -> Option<usize> {
match self {
BackpressureStrategy::Queue { max_depth, .. } => Some(*max_depth),
BackpressureStrategy::RingBuffer { capacity } => Some(*capacity),
_ => None,
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum QueueOnFullPolicy {
BlockSubmitter,
DropOldest,
DropNewest,
FailSubmitter,
}
impl QueueOnFullPolicy {
fn as_str(self) -> &'static str {
match self {
QueueOnFullPolicy::BlockSubmitter => "block_submitter",
QueueOnFullPolicy::DropOldest => "drop_oldest",
QueueOnFullPolicy::DropNewest => "drop_newest",
QueueOnFullPolicy::FailSubmitter => "fail_submitter",
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
enum PoolScope {
Session,
Pipeline,
Tenant,
Org,
}
impl PoolScope {
fn parse(value: &str) -> Result<Self, VmError> {
match value.trim() {
"" | "session" => Ok(Self::Session),
"pipeline" => Ok(Self::Pipeline),
"tenant" => Ok(Self::Tenant),
"org" => Ok(Self::Org),
other => Err(VmError::Runtime(format!(
"pool_create: unknown scope '{other}' (expected one of session/pipeline/tenant/org)"
))),
}
}
fn as_str(self) -> &'static str {
match self {
Self::Session => "session",
Self::Pipeline => "pipeline",
Self::Tenant => "tenant",
Self::Org => "org",
}
}
fn is_host_routed(self) -> bool {
matches!(self, Self::Tenant | Self::Org)
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
struct PersistedTask {
id: String,
pool_id: String,
pool_name: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
key: Option<String>,
priority: i64,
status: String,
submitted_at: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
started_at: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
finished_at: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
error: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
rejection_reason: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
rejection_policy: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
idempotency_key: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
result_display: Option<String>,
heartbeat_at_ms: i64,
seq: u64,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
enum PoolRecord {
Pool {
id: String,
name: String,
scope: String,
scope_id: String,
max_concurrent: usize,
created_at: String,
submit_counter: u64,
},
Task { task: PersistedTask },
}
struct PoolDurableStore {
path: PathBuf,
}
impl PoolDurableStore {
fn new(path: PathBuf) -> Self {
Self { path }
}
fn append(&self, record: &PoolRecord) -> Result<(), VmError> {
use std::io::Write as _;
if let Some(parent) = self.path.parent() {
if !parent.as_os_str().is_empty() {
std::fs::create_dir_all(parent).map_err(|err| {
VmError::Runtime(format!(
"pool durable store: create dir '{}': {err}",
parent.display()
))
})?;
}
}
let mut line = serde_json::to_vec(record)
.map_err(|err| VmError::Runtime(format!("pool durable store: encode: {err}")))?;
line.push(b'\n');
let mut file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)
.map_err(|err| {
VmError::Runtime(format!(
"pool durable store: open '{}': {err}",
self.path.display()
))
})?;
file.write_all(&line)
.and_then(|()| file.sync_data())
.map_err(|err| {
VmError::Runtime(format!(
"pool durable store: write '{}': {err}",
self.path.display()
))
})?;
Ok(())
}
fn load(&self) -> Result<Option<PersistedPoolState>, VmError> {
let bytes = match std::fs::read(&self.path) {
Ok(bytes) => bytes,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(err) => {
return Err(VmError::Runtime(format!(
"pool durable store: read '{}': {err}",
self.path.display()
)));
}
};
let mut meta: Option<PersistedPoolMeta> = None;
let mut tasks: BTreeMap<String, PersistedTask> = BTreeMap::new();
for (line_no, raw) in bytes.split(|byte| *byte == b'\n').enumerate() {
if raw.is_empty() {
continue;
}
let record: PoolRecord = serde_json::from_slice(raw).map_err(|err| {
VmError::Runtime(format!(
"pool durable store: decode '{}' line {}: {err}",
self.path.display(),
line_no + 1
))
})?;
match record {
PoolRecord::Pool {
id,
name,
scope,
scope_id,
max_concurrent,
created_at,
submit_counter,
} => {
meta = Some(PersistedPoolMeta {
id,
name,
scope,
scope_id,
max_concurrent,
created_at,
submit_counter,
});
}
PoolRecord::Task { task } => {
tasks.insert(task.id.clone(), task);
}
}
}
let Some(meta) = meta else {
return Ok(None);
};
Ok(Some(PersistedPoolState { meta, tasks }))
}
fn compact(&self, meta: &PersistedPoolMeta, tasks: &[PersistedTask]) -> Result<(), VmError> {
use std::io::Write as _;
crate::atomic_io::atomic_write_with(&self.path, |writer| {
let header = PoolRecord::Pool {
id: meta.id.clone(),
name: meta.name.clone(),
scope: meta.scope.clone(),
scope_id: meta.scope_id.clone(),
max_concurrent: meta.max_concurrent,
created_at: meta.created_at.clone(),
submit_counter: meta.submit_counter,
};
let header_line = serde_json::to_vec(&header)
.map_err(|err| std::io::Error::other(format!("encode header: {err}")))?;
writer.write_all(&header_line)?;
writer.write_all(b"\n")?;
for task in tasks {
let line = serde_json::to_vec(&PoolRecord::Task { task: task.clone() })
.map_err(|err| std::io::Error::other(format!("encode task: {err}")))?;
writer.write_all(&line)?;
writer.write_all(b"\n")?;
}
Ok(())
})
.map_err(|err| {
VmError::Runtime(format!(
"pool durable store: compact '{}': {err}",
self.path.display()
))
})
}
}
#[derive(Clone, Debug)]
struct PersistedPoolMeta {
id: String,
name: String,
scope: String,
scope_id: String,
max_concurrent: usize,
created_at: String,
submit_counter: u64,
}
struct PersistedPoolState {
meta: PersistedPoolMeta,
tasks: BTreeMap<String, PersistedTask>,
}
#[derive(Clone)]
struct PoolDropAudit {
pool_id: String,
pool_name: String,
task_id: String,
replacement_task_id: Option<String>,
reason: String,
policy: String,
queue_depth: usize,
max_depth: Option<usize>,
occurred_at: String,
}
#[derive(Clone)]
struct PoolSubmitReceipt {
pool_id: String,
pool_name: String,
task_id: String,
submitted_at: String,
priority: i64,
key: Option<String>,
idempotency_key: Option<String>,
submitted_by: String,
}
#[derive(Clone)]
struct PoolDequeueReceipt {
pool_id: String,
pool_name: String,
task_id: String,
dequeued_at: String,
queued_for_ms: i64,
slot_index: usize,
}
struct PoolSpanGuard {
span_id: u64,
otel_span: tracing::Span,
}
impl PoolSpanGuard {
fn start(
kind: crate::tracing::SpanKind,
name: String,
links: Vec<crate::tracing::SpanLink>,
) -> Self {
Self::start_with_parenting(kind, name, links, true)
}
fn start_detached(
kind: crate::tracing::SpanKind,
name: String,
links: Vec<crate::tracing::SpanLink>,
) -> Self {
Self::start_with_parenting(kind, name, links, false)
}
fn start_with_parenting(
kind: crate::tracing::SpanKind,
name: String,
links: Vec<crate::tracing::SpanLink>,
inherit_parent: bool,
) -> Self {
let span_id = if inherit_parent {
crate::tracing::span_start_with_links(kind, name.clone(), links.clone())
} else {
crate::tracing::span_start_detached_with_links(kind, name.clone(), links.clone())
};
let otel_span = tracing::info_span!(
target: "harn.vm.pool",
"harn.pool",
harn.kind = kind.as_str(),
harn.name = %name,
);
for link in links {
let trace_id = crate::TraceId(link.trace_id);
let mut attributes: std::collections::HashMap<String, String> =
link.attributes.into_iter().collect();
attributes
.entry("harn.link.kind".to_string())
.or_insert_with(|| "pool_submit".to_string());
let _ = crate::observability::otel::set_span_link(
&otel_span,
&trace_id,
&link.span_id,
Some(attributes),
);
}
Self { span_id, otel_span }
}
fn link(&self) -> Option<crate::tracing::SpanLink> {
crate::observability::otel::current_span_context_hex(&self.otel_span)
.map(|(trace_id, span_id)| crate::tracing::SpanLink::new(trace_id, span_id))
.or_else(|| crate::tracing::span_link(self.span_id))
}
fn set_metadata(&self, key: &str, value: serde_json::Value) {
crate::tracing::span_set_metadata(self.span_id, key, value);
}
fn end(&mut self) {
if self.span_id != 0 {
crate::tracing::span_end(self.span_id);
self.span_id = 0;
}
}
}
impl Drop for PoolSpanGuard {
fn drop(&mut self) {
self.end();
}
}
impl QueueStrategy {
fn name(&self) -> &'static str {
match self {
QueueStrategy::Fifo => "fifo",
QueueStrategy::Priority => "priority",
QueueStrategy::Lifo => "lifo",
QueueStrategy::FairRoundRobin { .. } => "fair_round_robin",
}
}
fn key_field(&self) -> Option<&str> {
match self {
QueueStrategy::FairRoundRobin { key_field } => Some(key_field.as_str()),
_ => None,
}
}
}
thread_local! {
static POOLS: RefCell<HashMap<String, Rc<RefCell<PoolEntry>>>> =
RefCell::new(HashMap::new());
static POOL_NAMES: RefCell<HashMap<String, String>> = RefCell::new(HashMap::new());
}
fn next_pool_id() -> String {
format!("pool_{}", uuid::Uuid::now_v7())
}
fn deterministic_pool_id(scope: PoolScope, scope_id: &str, name: &str) -> String {
let mut hasher = blake3::Hasher::new();
hasher.update(scope.as_str().as_bytes());
hasher.update(b"\x00");
hasher.update(scope_id.as_bytes());
hasher.update(b"\x00");
hasher.update(name.as_bytes());
let digest = hasher.finalize().to_hex();
format!("pool_{}_{}", scope.as_str(), &digest.as_str()[..32])
}
fn next_task_id(pool: &PoolEntry) -> String {
format!("{}_task_{}", pool.id, uuid::Uuid::now_v7())
}
fn lookup_pool(pool_id: &str) -> Result<Rc<RefCell<PoolEntry>>, VmError> {
POOLS.with(|pools| {
pools
.borrow()
.get(pool_id)
.cloned()
.ok_or_else(|| VmError::Runtime(format!("pool not found: {pool_id}")))
})
}
fn pool_id_from_value(value: &VmValue, builtin: &str) -> Result<String, VmError> {
match value {
VmValue::String(text) => Ok(text.to_string()),
VmValue::Dict(map) => map
.get("id")
.map(|value| value.display())
.filter(|id| !id.is_empty())
.ok_or_else(|| VmError::Runtime(format!("{builtin}: pool handle missing id"))),
_ => Err(VmError::Runtime(format!(
"{builtin}: expected pool handle or pool id"
))),
}
}
fn task_handle_from_value(value: &VmValue, builtin: &str) -> Result<(String, String), VmError> {
let map = value.as_dict().ok_or_else(|| {
VmError::Runtime(format!(
"{builtin}: expected pool task handle (got {})",
value.type_name()
))
})?;
let pool_id = map
.get("pool_id")
.map(|v| v.display())
.filter(|s| !s.is_empty())
.ok_or_else(|| VmError::Runtime(format!("{builtin}: task handle missing pool_id")))?;
let task_id = map
.get("id")
.map(|v| v.display())
.filter(|s| !s.is_empty())
.ok_or_else(|| VmError::Runtime(format!("{builtin}: task handle missing id")))?;
Ok((pool_id, task_id))
}
fn parse_options(
value: Option<&VmValue>,
builtin: &str,
) -> Result<BTreeMap<String, VmValue>, VmError> {
match value {
None | Some(VmValue::Nil) => Ok(BTreeMap::new()),
Some(VmValue::Dict(map)) => Ok((**map).clone()),
Some(other) => Err(VmError::Runtime(format!(
"{builtin}: options must be a dict (got {})",
other.type_name()
))),
}
}
fn parse_max_concurrent(opts: &BTreeMap<String, VmValue>) -> Result<usize, VmError> {
match opts.get("max_concurrent") {
None | Some(VmValue::Nil) => Ok(DEFAULT_MAX_CONCURRENT),
Some(VmValue::Int(n)) => {
if *n < 1 {
return Err(VmError::Runtime(
"pool_create: max_concurrent must be >= 1".to_string(),
));
}
Ok(*n as usize)
}
Some(other) => Err(VmError::Runtime(format!(
"pool_create: max_concurrent must be an int (got {})",
other.type_name()
))),
}
}
fn parse_name(opts: &BTreeMap<String, VmValue>) -> Option<String> {
opts.get("name").and_then(|value| match value {
VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
_ => None,
})
}
fn parse_scope(opts: &BTreeMap<String, VmValue>) -> Result<PoolScope, VmError> {
match opts.get("scope") {
None | Some(VmValue::Nil) => Ok(PoolScope::Session),
Some(VmValue::String(text)) => PoolScope::parse(text),
Some(other) => Err(VmError::Runtime(format!(
"pool_create: scope must be a string (got {})",
other.type_name()
))),
}
}
fn parse_scope_id_override(opts: &BTreeMap<String, VmValue>) -> Option<String> {
for key in ["scope_id", "pipeline_id", "run_id"] {
if let Some(VmValue::String(text)) = opts.get(key) {
if !text.trim().is_empty() {
return Some(text.to_string());
}
}
}
None
}
fn parse_stale_after_ms(opts: &BTreeMap<String, VmValue>) -> Result<i64, VmError> {
match opts.get("stale_after_ms") {
None | Some(VmValue::Nil) => Ok(DEFAULT_STALE_AFTER_MS),
Some(VmValue::Int(n)) if *n >= 0 => Ok(*n),
Some(VmValue::Duration(n)) if *n >= 0 => Ok(*n),
Some(other) => Err(VmError::Runtime(format!(
"pool_create: stale_after_ms must be a non-negative int or duration (got {})",
other.type_name()
))),
}
}
fn parse_idempotency_key(opts: &BTreeMap<String, VmValue>) -> Result<Option<String>, VmError> {
match opts.get("idempotency_key").or_else(|| opts.get("id")) {
None | Some(VmValue::Nil) => Ok(None),
Some(VmValue::String(text)) if !text.trim().is_empty() => Ok(Some(text.to_string())),
Some(VmValue::String(_)) => Err(VmError::Runtime(
"pool.submit: idempotency_key cannot be empty".to_string(),
)),
Some(other) => Err(VmError::Runtime(format!(
"pool.submit: idempotency_key must be a string (got {})",
other.type_name()
))),
}
}
fn resolve_pipeline_scope_id(opts: &BTreeMap<String, VmValue>) -> Result<String, VmError> {
if let Some(explicit) = parse_scope_id_override(opts) {
return Ok(explicit);
}
if let Some(vm) = crate::vm::clone_async_builtin_child_vm() {
if let VmValue::Dict(values) = crate::runtime_context::runtime_context_value(&vm) {
for key in ["workflow_id", "run_id"] {
if let Some(VmValue::String(text)) = values.get(key) {
if !text.is_empty() {
return Ok(text.to_string());
}
}
}
}
}
Err(VmError::Runtime(
"pool_create: pipeline-scope pool requires a pipeline_id (or active workflow/run \
context); pass options.pipeline_id explicitly when creating from outside a pipeline"
.to_string(),
))
}
fn parse_priority(opts: &BTreeMap<String, VmValue>) -> Result<i64, VmError> {
match opts.get("priority") {
None | Some(VmValue::Nil) => Ok(0),
Some(VmValue::Int(n)) => Ok(*n),
Some(other) => Err(VmError::Runtime(format!(
"pool.submit: priority must be an int (got {})",
other.type_name()
))),
}
}
fn parse_key(opts: &BTreeMap<String, VmValue>) -> Result<Option<String>, VmError> {
match opts.get("key") {
None | Some(VmValue::Nil) => Ok(None),
Some(VmValue::String(text)) => Ok(Some(text.to_string())),
Some(other) => Err(VmError::Runtime(format!(
"pool.submit: key must be a string (got {})",
other.type_name()
))),
}
}
fn parse_submit_key(
opts: &BTreeMap<String, VmValue>,
queue_strategy: &QueueStrategy,
) -> Result<Option<String>, VmError> {
if let Some(field) = queue_strategy.key_field() {
match opts.get(field) {
Some(VmValue::String(text)) => return Ok(Some(text.to_string())),
Some(VmValue::Nil) | None => {}
Some(other) => {
return Err(VmError::Runtime(format!(
"pool.submit: {field} must be a string (got {})",
other.type_name()
)));
}
}
}
parse_key(opts)
}
fn parse_queue_strategy(opts: &BTreeMap<String, VmValue>) -> Result<QueueStrategy, VmError> {
let Some(value) = opts.get("queue") else {
return Ok(QueueStrategy::Priority);
};
match value {
VmValue::Nil => Ok(QueueStrategy::Priority),
VmValue::String(text) => parse_queue_strategy_name(text),
VmValue::Dict(map) => {
let kind = map
.get("kind")
.or_else(|| map.get("strategy"))
.map(VmValue::display)
.filter(|name| !name.is_empty())
.ok_or_else(|| {
VmError::Runtime("pool_create: queue strategy missing kind".to_string())
})?;
match kind.as_str() {
"fair_round_robin" => {
let key_field = map
.get("key")
.or_else(|| map.get("key_field"))
.map(VmValue::display)
.filter(|name| !name.is_empty())
.unwrap_or_else(|| "key".to_string());
Ok(QueueStrategy::FairRoundRobin { key_field })
}
_ => parse_queue_strategy_name(&kind),
}
}
other => Err(VmError::Runtime(format!(
"pool_create: queue must be a strategy dict or string (got {})",
other.type_name()
))),
}
}
fn parse_queue_strategy_name(name: &str) -> Result<QueueStrategy, VmError> {
match name {
"fifo" => Ok(QueueStrategy::Fifo),
"priority" => Ok(QueueStrategy::Priority),
"lifo" => Ok(QueueStrategy::Lifo),
"fair_round_robin" => Ok(QueueStrategy::FairRoundRobin {
key_field: "key".to_string(),
}),
other => Err(VmError::Runtime(format!(
"pool_create: unknown queue strategy '{other}'"
))),
}
}
fn parse_backpressure(opts: &BTreeMap<String, VmValue>) -> Result<BackpressureStrategy, VmError> {
let Some(value) = opts.get("backpressure") else {
return Ok(BackpressureStrategy::Unbounded);
};
match value {
VmValue::Nil => Ok(BackpressureStrategy::Unbounded),
VmValue::String(text) => parse_backpressure_name(text),
VmValue::Dict(map) => {
let kind = map
.get("kind")
.or_else(|| map.get("strategy"))
.map(VmValue::display)
.filter(|name| !name.is_empty())
.ok_or_else(|| {
VmError::Runtime("pool_create: backpressure missing kind".to_string())
})?;
match kind.as_str() {
"queue" => {
let max_depth = parse_positive_usize(
map.get("max_depth").or_else(|| map.get("capacity")),
"pool_create: backpressure.max_depth",
)?;
let on_full = parse_on_full_policy(map.get("on_full"))?;
Ok(BackpressureStrategy::Queue { max_depth, on_full })
}
"ring_buffer" => {
let capacity = parse_positive_usize(
map.get("capacity").or_else(|| map.get("max_depth")),
"pool_create: backpressure.capacity",
)?;
Ok(BackpressureStrategy::RingBuffer { capacity })
}
_ => parse_backpressure_name(&kind),
}
}
other => Err(VmError::Runtime(format!(
"pool_create: backpressure must be a policy dict or string (got {})",
other.type_name()
))),
}
}
fn parse_backpressure_name(name: &str) -> Result<BackpressureStrategy, VmError> {
match name {
"unbounded" => Ok(BackpressureStrategy::Unbounded),
"fail_fast" => Ok(BackpressureStrategy::FailFast),
other => Err(VmError::Runtime(format!(
"pool_create: unknown backpressure policy '{other}'"
))),
}
}
fn parse_positive_usize(value: Option<&VmValue>, name: &str) -> Result<usize, VmError> {
match value {
Some(VmValue::Int(n)) if *n >= 1 => Ok(*n as usize),
Some(VmValue::Int(_)) => Err(VmError::Runtime(format!("{name} must be >= 1"))),
Some(other) => Err(VmError::Runtime(format!(
"{name} must be an int (got {})",
other.type_name()
))),
None => Err(VmError::Runtime(format!("{name} is required"))),
}
}
fn parse_on_full_policy(value: Option<&VmValue>) -> Result<QueueOnFullPolicy, VmError> {
match value {
None | Some(VmValue::Nil) => Ok(QueueOnFullPolicy::BlockSubmitter),
Some(VmValue::String(text)) => match text.as_ref() {
"block_submitter" => Ok(QueueOnFullPolicy::BlockSubmitter),
"drop_oldest" => Ok(QueueOnFullPolicy::DropOldest),
"drop_newest" => Ok(QueueOnFullPolicy::DropNewest),
"fail_submitter" => Ok(QueueOnFullPolicy::FailSubmitter),
other => Err(VmError::Runtime(format!(
"pool_create: unknown backpressure on_full policy '{other}'"
))),
},
Some(other) => Err(VmError::Runtime(format!(
"pool_create: backpressure.on_full must be a string (got {})",
other.type_name()
))),
}
}
fn pool_snapshot_value(pool: &PoolEntry) -> VmValue {
let mut tasks: Vec<VmValue> = pool
.tasks
.values()
.map(|task| task_snapshot_value(&task.borrow()))
.collect();
tasks.sort_by_key(task_sort_key);
let queued: i64 = pool.queue.len() as i64;
let active: i64 = pool.active.len() as i64;
let mut completed: i64 = 0;
let mut failed: i64 = 0;
let mut rejected: i64 = 0;
for task in pool.tasks.values() {
match task.borrow().status {
TaskStatus::Completed => completed += 1,
TaskStatus::Failed => failed += 1,
TaskStatus::Rejected => rejected += 1,
_ => {}
}
}
let mut snapshot = BTreeMap::new();
snapshot.insert("_type".to_string(), VmValue::String(Rc::from(POOL_TYPE)));
snapshot.insert(
"id".to_string(),
VmValue::String(Rc::from(pool.id.as_str())),
);
snapshot.insert(
"name".to_string(),
VmValue::String(Rc::from(pool.name.as_str())),
);
snapshot.insert(
"max_concurrent".to_string(),
VmValue::Int(pool.max_concurrent as i64),
);
snapshot.insert(
"created_at".to_string(),
VmValue::String(Rc::from(pool.created_at.as_str())),
);
snapshot.insert("active".to_string(), VmValue::Int(active));
snapshot.insert("queued".to_string(), VmValue::Int(queued));
snapshot.insert("completed".to_string(), VmValue::Int(completed));
snapshot.insert("failed".to_string(), VmValue::Int(failed));
snapshot.insert("rejected".to_string(), VmValue::Int(rejected));
snapshot.insert("total".to_string(), VmValue::Int(pool.tasks.len() as i64));
snapshot.insert(
"queue_strategy".to_string(),
VmValue::String(Rc::from(pool.queue_strategy.name())),
);
snapshot.insert(
"backpressure".to_string(),
backpressure_snapshot_value(&pool.backpressure),
);
snapshot.insert(
"blocked_submitters".to_string(),
VmValue::Int(pool.space_waiters.len() as i64),
);
snapshot.insert("tasks".to_string(), VmValue::List(Rc::new(tasks)));
snapshot.insert(
"scope".to_string(),
VmValue::String(Rc::from(pool.scope.as_str())),
);
if !pool.scope_id.is_empty() {
snapshot.insert(
"scope_id".to_string(),
VmValue::String(Rc::from(pool.scope_id.as_str())),
);
}
snapshot.insert("durable".to_string(), VmValue::Bool(pool.store.is_some()));
snapshot.insert(
"stale_after_ms".to_string(),
VmValue::Int(pool.stale_after_ms),
);
if !pool.config.is_empty() {
snapshot.insert(
"config".to_string(),
VmValue::Dict(Rc::new(pool.config.clone())),
);
}
VmValue::Dict(Rc::new(snapshot))
}
fn backpressure_snapshot_value(backpressure: &BackpressureStrategy) -> VmValue {
let mut value = BTreeMap::new();
value.insert(
"_type".to_string(),
VmValue::String(Rc::from("backpressure")),
);
value.insert(
"kind".to_string(),
VmValue::String(Rc::from(backpressure.name())),
);
if let Some(max_depth) = backpressure.max_depth() {
value.insert("max_depth".to_string(), VmValue::Int(max_depth as i64));
}
if let BackpressureStrategy::Queue { on_full, .. } = backpressure {
value.insert(
"on_full".to_string(),
VmValue::String(Rc::from(on_full.as_str())),
);
}
VmValue::Dict(Rc::new(value))
}
fn task_sort_key(task: &VmValue) -> String {
match task {
VmValue::Dict(map) => map
.get("submitted_at")
.map(|value| value.display())
.unwrap_or_default(),
_ => String::new(),
}
}
fn task_snapshot_value(task: &TaskState) -> VmValue {
let mut entry = BTreeMap::new();
entry.insert(
"_type".to_string(),
VmValue::String(Rc::from(POOL_TASK_TYPE)),
);
entry.insert(
"id".to_string(),
VmValue::String(Rc::from(task.id.as_str())),
);
entry.insert(
"pool_id".to_string(),
VmValue::String(Rc::from(task.pool_id.as_str())),
);
entry.insert(
"pool".to_string(),
VmValue::String(Rc::from(task.pool_name.as_str())),
);
entry.insert(
"status".to_string(),
VmValue::String(Rc::from(task.status.as_str())),
);
entry.insert("priority".to_string(), VmValue::Int(task.priority));
entry.insert(
"submitted_at".to_string(),
VmValue::String(Rc::from(task.submitted_at.as_str())),
);
if let Some(key) = &task.key {
entry.insert("key".to_string(), VmValue::String(Rc::from(key.as_str())));
}
if let Some(started_at) = &task.started_at {
entry.insert(
"started_at".to_string(),
VmValue::String(Rc::from(started_at.as_str())),
);
}
if let Some(finished_at) = &task.finished_at {
entry.insert(
"finished_at".to_string(),
VmValue::String(Rc::from(finished_at.as_str())),
);
}
if let Some(result) = &task.result {
entry.insert("result".to_string(), result.clone());
}
if let Some(error) = &task.error {
entry.insert(
"error".to_string(),
VmValue::String(Rc::from(error.as_str())),
);
}
if let Some(reason) = &task.rejection_reason {
entry.insert(
"rejection_reason".to_string(),
VmValue::String(Rc::from(reason.as_str())),
);
}
if let Some(policy) = &task.rejection_policy {
entry.insert(
"rejection_policy".to_string(),
VmValue::String(Rc::from(policy.as_str())),
);
}
VmValue::Dict(Rc::new(entry))
}
fn task_handle_value(task: &TaskState) -> VmValue {
let mut handle = BTreeMap::new();
handle.insert(
"_type".to_string(),
VmValue::String(Rc::from(POOL_TASK_TYPE)),
);
handle.insert(
"id".to_string(),
VmValue::String(Rc::from(task.id.as_str())),
);
handle.insert(
"pool_id".to_string(),
VmValue::String(Rc::from(task.pool_id.as_str())),
);
handle.insert(
"pool".to_string(),
VmValue::String(Rc::from(task.pool_name.as_str())),
);
handle.insert(
"submitted_at".to_string(),
VmValue::String(Rc::from(task.submitted_at.as_str())),
);
handle.insert(
"status".to_string(),
VmValue::String(Rc::from(task.status.as_str())),
);
if let Some(key) = &task.key {
handle.insert("key".to_string(), VmValue::String(Rc::from(key.as_str())));
}
if let Some(error) = &task.error {
handle.insert(
"error".to_string(),
VmValue::String(Rc::from(error.as_str())),
);
}
if let Some(reason) = &task.rejection_reason {
handle.insert(
"rejection_reason".to_string(),
VmValue::String(Rc::from(reason.as_str())),
);
}
if let Some(policy) = &task.rejection_policy {
handle.insert(
"rejection_policy".to_string(),
VmValue::String(Rc::from(policy.as_str())),
);
}
VmValue::Dict(Rc::new(handle))
}
fn ordered_pool_config(opts: &BTreeMap<String, VmValue>) -> BTreeMap<String, VmValue> {
let mut config = BTreeMap::new();
for key in ["queue", "backpressure", "priority"] {
if let Some(value) = opts.get(key) {
config.insert(key.to_string(), value.clone());
}
}
config
}
fn pool_create_sync(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
let opts = parse_options(args.first(), "pool_create")?;
let name = parse_name(&opts).unwrap_or_else(|| format!("pool_{}", uuid::Uuid::now_v7()));
if let Some(existing) = POOL_NAMES.with(|names| names.borrow().get(&name).cloned()) {
return Err(VmError::Runtime(format!(
"pool_create: pool '{name}' already exists (id={existing}); use pool_get to reuse"
)));
}
let max_concurrent = parse_max_concurrent(&opts)?;
let queue_strategy = parse_queue_strategy(&opts)?;
let backpressure = parse_backpressure(&opts)?;
let scope = parse_scope(&opts)?;
let stale_after_ms = parse_stale_after_ms(&opts)?;
if scope.is_host_routed() {
return Err(VmError::Runtime(format!(
"pool_create: scope '{}' is host-routed (harn-cloud, see harn-cloud#306) and \
not wired in the in-process runtime. Use scope: \"session\" or \
scope: \"pipeline\" until the host capability ships",
scope.as_str()
)));
}
let (id, scope_id, store, persisted) = match scope {
PoolScope::Session => (next_pool_id(), String::new(), None, None),
PoolScope::Pipeline => {
let pipeline_id = resolve_pipeline_scope_id(&opts)?;
let id = deterministic_pool_id(scope, &pipeline_id, &name);
let dir_override = parse_durable_dir(&opts)?;
let path = pipeline_pool_file_path(dir_override.as_deref(), &pipeline_id, &name);
let store = PoolDurableStore::new(path);
let persisted = store.load()?;
(
id,
pipeline_id,
Some(Rc::new(RefCell::new(store))),
persisted,
)
}
PoolScope::Tenant | PoolScope::Org => unreachable!("host-routed scope returned above"),
};
let submit_counter = persisted
.as_ref()
.map(|state| state.meta.submit_counter)
.unwrap_or(0);
let entry = Rc::new(RefCell::new(PoolEntry {
id: id.clone(),
name: name.clone(),
max_concurrent,
created_at: uuid::Uuid::now_v7().to_string(),
submit_counter,
queue: VecDeque::new(),
queue_strategy,
backpressure,
round_robin_after: None,
active: HashMap::new(),
tasks: BTreeMap::new(),
space_waiters: Vec::new(),
config: ordered_pool_config(&opts),
scope,
scope_id: scope_id.clone(),
idempotency_index: HashMap::new(),
stale_after_ms,
store: store.clone(),
}));
if let (Some(persisted), Some(store_ref)) = (persisted, store.clone()) {
rehydrate_persisted_state(&entry, &store_ref, persisted, stale_after_ms)?;
} else if let Some(store_ref) = store.clone() {
let meta = persisted_meta_from_entry(&entry.borrow());
store_ref.borrow().compact(&meta, &[])?;
}
POOLS.with(|pools| pools.borrow_mut().insert(id.clone(), entry.clone()));
POOL_NAMES.with(|names| names.borrow_mut().insert(name, id.clone()));
let snapshot = pool_snapshot_value(&entry.borrow());
Ok(snapshot)
}
fn pipeline_pool_file_path(dir_override: Option<&str>, pipeline_id: &str, name: &str) -> PathBuf {
let mut hasher = blake3::Hasher::new();
hasher.update(pipeline_id.as_bytes());
hasher.update(b"\x00");
hasher.update(name.as_bytes());
let digest = hasher.finalize().to_hex();
let safe_pipeline = crate::event_log::sanitize_topic_component(pipeline_id);
let safe_name = crate::event_log::sanitize_topic_component(name);
let root = match dir_override {
Some(path) => PathBuf::from(path),
None => PathBuf::from(PIPELINE_POOLS_ROOT),
};
root.join(format!(
"{safe_pipeline}__{safe_name}__{}.jsonl",
&digest.as_str()[..16]
))
}
fn parse_durable_dir(opts: &BTreeMap<String, VmValue>) -> Result<Option<String>, VmError> {
match opts.get("dir") {
None | Some(VmValue::Nil) => Ok(None),
Some(VmValue::String(text)) if !text.trim().is_empty() => Ok(Some(text.to_string())),
Some(VmValue::String(_)) => Err(VmError::Runtime(
"pool_create: dir cannot be empty".to_string(),
)),
Some(other) => Err(VmError::Runtime(format!(
"pool_create: dir must be a string (got {})",
other.type_name()
))),
}
}
fn pool_get_sync(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
let key = args
.first()
.map(VmValue::display)
.filter(|s| !s.is_empty())
.ok_or_else(|| VmError::Runtime("pool_get: name is required".to_string()))?;
let pool_id = POOL_NAMES.with(|names| names.borrow().get(&key).cloned());
let id = match pool_id {
Some(id) => id,
None => {
if POOLS.with(|pools| pools.borrow().contains_key(&key)) {
key
} else {
return Ok(VmValue::Nil);
}
}
};
let entry = lookup_pool(&id)?;
let snapshot = pool_snapshot_value(&entry.borrow());
Ok(snapshot)
}
fn pool_list_sync(_args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
let mut entries: Vec<Rc<RefCell<PoolEntry>>> =
POOLS.with(|pools| pools.borrow().values().cloned().collect());
entries.sort_by(|a, b| a.borrow().created_at.cmp(&b.borrow().created_at));
Ok(VmValue::List(Rc::new(
entries
.iter()
.map(|entry| pool_snapshot_value(&entry.borrow()))
.collect(),
)))
}
fn pool_size_sync(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
let pool_id = pool_id_from_value(
args.first()
.ok_or_else(|| VmError::Runtime("pool.size: pool handle is required".to_string()))?,
"pool.size",
)?;
let entry = lookup_pool(&pool_id)?;
let entry = entry.borrow();
Ok(VmValue::Int(
(entry.active.len() + entry.queue.len()) as i64,
))
}
fn pool_reload_sync(_args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
reset_pool_state();
Ok(VmValue::Nil)
}
fn pool_snapshot_sync(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
let pool_id = pool_id_from_value(
args.first().ok_or_else(|| {
VmError::Runtime("pool.snapshot: pool handle is required".to_string())
})?,
"pool.snapshot",
)?;
let entry = lookup_pool(&pool_id)?;
let snapshot = pool_snapshot_value(&entry.borrow());
Ok(snapshot)
}
async fn pool_submit_builtin(args: Vec<VmValue>) -> Result<VmValue, VmError> {
let pool_id = pool_id_from_value(
args.first()
.ok_or_else(|| VmError::Runtime("pool.submit: pool handle is required".to_string()))?,
"pool.submit",
)?;
let closure = match args.get(1) {
Some(VmValue::Closure(closure)) => closure.clone(),
Some(other) => {
return Err(VmError::Runtime(format!(
"pool.submit: second argument must be a closure (got {})",
other.type_name()
)));
}
None => {
return Err(VmError::Runtime(
"pool.submit: closure is required".to_string(),
));
}
};
let opts = parse_options(args.get(2), "pool.submit")?;
let priority = parse_priority(&opts)?;
let idempotency_key = parse_idempotency_key(&opts)?;
let entry = lookup_pool(&pool_id)?;
let key = {
let pool = entry.borrow();
parse_submit_key(&opts, &pool.queue_strategy)?
};
let state = submit_to_pool_entry(&entry, closure, key, priority, idempotency_key).await?;
let handle = task_handle_value(&state.borrow());
Ok(handle)
}
fn lookup_pool_by_name_or_id(name_or_id: &str) -> Option<Rc<RefCell<PoolEntry>>> {
let id = POOL_NAMES
.with(|names| names.borrow().get(name_or_id).cloned())
.or_else(|| {
POOLS.with(|pools| {
pools
.borrow()
.contains_key(name_or_id)
.then(|| name_or_id.to_string())
})
})?;
POOLS.with(|pools| pools.borrow().get(&id).cloned())
}
pub struct PoolSubmitOutcome {
pub pool_id: String,
pub pool_name: String,
pub task_id: String,
pub status: &'static str,
pub rejection_reason: Option<String>,
}
pub async fn submit_closure_to_named_pool(
pool_name: &str,
closure: Rc<VmClosure>,
priority: i64,
key: Option<String>,
) -> Result<PoolSubmitOutcome, VmError> {
let entry = lookup_pool_by_name_or_id(pool_name).ok_or_else(|| {
VmError::Runtime(format!(
"pool: pool '{pool_name}' not found; create it with pool_create first"
))
})?;
let state = submit_to_pool_entry(&entry, closure, key, priority, None).await?;
let task = state.borrow();
Ok(PoolSubmitOutcome {
pool_id: task.pool_id.clone(),
pool_name: task.pool_name.clone(),
task_id: task.id.clone(),
status: task.status.as_str(),
rejection_reason: task.rejection_reason.clone(),
})
}
async fn submit_to_pool_entry(
entry: &Rc<RefCell<PoolEntry>>,
closure: Rc<VmClosure>,
key: Option<String>,
priority: i64,
idempotency_key: Option<String>,
) -> Result<Rc<RefCell<TaskState>>, VmError> {
if let Some(idem) = idempotency_key.as_ref() {
if let Some(existing) = lookup_idempotency_match(entry, idem) {
return Ok(existing);
}
}
let submitted_by = current_submitter();
let (pool_id_for_span, pool_name_for_span) = {
let pool = entry.borrow();
(pool.id.clone(), pool.name.clone())
};
loop {
let mut submit_span = PoolSpanGuard::start(
crate::tracing::SpanKind::PoolSubmit,
format!("pool.submit {pool_name_for_span}"),
Vec::new(),
);
submit_span.set_metadata("pool", serde_json::json!(pool_name_for_span));
submit_span.set_metadata("pool_id", serde_json::json!(pool_id_for_span));
submit_span.set_metadata("priority", serde_json::json!(priority));
if let Some(key) = &key {
submit_span.set_metadata("key", serde_json::json!(key));
}
if let Some(idem) = &idempotency_key {
submit_span.set_metadata("idempotency_key", serde_json::json!(idem));
}
let submit_link = submit_span.link();
let attempt = {
let mut pool = entry.borrow_mut();
submit_or_wait(
&mut pool,
closure.clone(),
key.clone(),
priority,
idempotency_key.clone(),
submit_link.clone(),
submitted_by.clone(),
)
};
match attempt {
SubmitAttempt::Submitted { task, audits } => {
{
let task_ref = task.borrow();
submit_span.set_metadata("task_id", serde_json::json!(task_ref.id));
submit_span.set_metadata("status", serde_json::json!(task_ref.status.as_str()));
}
let receipt = {
let task_ref = task.borrow();
pool_submit_receipt(&entry.borrow(), &task_ref)
};
emit_pool_submit_receipt(receipt).await;
for audit in audits {
emit_pool_drop(audit).await;
}
submit_span.end();
dispatch_ready(entry);
return Ok(task);
}
SubmitAttempt::Wait(receiver) => {
submit_span.set_metadata("blocked", serde_json::json!(true));
submit_span.end();
let _ = receiver.await;
}
SubmitAttempt::Fail(error) => {
submit_span.set_metadata("error", serde_json::json!(error.to_string()));
submit_span.end();
return Err(error);
}
}
}
}
fn current_submitter() -> String {
if let Some(vm) = crate::vm::clone_async_builtin_child_vm() {
if let VmValue::Dict(values) = crate::runtime_context::runtime_context_value(&vm) {
for key in [
"agent_session_id",
"worker_id",
"workflow_id",
"run_id",
"task_id",
] {
if let Some(VmValue::String(text)) = values.get(key) {
if !text.is_empty() {
return text.to_string();
}
}
}
}
}
"user".to_string()
}
fn lookup_idempotency_match(
entry: &Rc<RefCell<PoolEntry>>,
idempotency_key: &str,
) -> Option<Rc<RefCell<TaskState>>> {
let pool = entry.borrow();
let task_id = pool.idempotency_index.get(idempotency_key)?.clone();
pool.tasks.get(&task_id).cloned()
}
enum SubmitAttempt {
Submitted {
task: Rc<RefCell<TaskState>>,
audits: Vec<PoolDropAudit>,
},
Wait(tokio::sync::oneshot::Receiver<()>),
Fail(VmError),
}
#[allow(clippy::too_many_arguments)]
fn submit_or_wait(
pool: &mut PoolEntry,
closure: Rc<VmClosure>,
key: Option<String>,
priority: i64,
idempotency_key: Option<String>,
submit_span_link: Option<crate::tracing::SpanLink>,
submitted_by: String,
) -> SubmitAttempt {
if can_accept_now(pool) {
let (state, pending) = create_pending_task(
pool,
closure,
key,
priority,
idempotency_key,
submit_span_link,
submitted_by,
);
enqueue_task(pool, pending);
return SubmitAttempt::Submitted {
task: state,
audits: Vec::new(),
};
}
match pool.backpressure.clone() {
BackpressureStrategy::Unbounded => {
let (state, pending) = create_pending_task(
pool,
closure,
key,
priority,
idempotency_key,
submit_span_link,
submitted_by,
);
enqueue_task(pool, pending);
SubmitAttempt::Submitted {
task: state,
audits: Vec::new(),
}
}
BackpressureStrategy::Queue {
max_depth: _,
on_full: QueueOnFullPolicy::BlockSubmitter,
} => {
let (sender, receiver) = tokio::sync::oneshot::channel();
pool.space_waiters.push(sender);
SubmitAttempt::Wait(receiver)
}
BackpressureStrategy::Queue {
max_depth,
on_full: QueueOnFullPolicy::DropOldest,
} => submit_with_oldest_drop(
pool,
closure,
key,
priority,
idempotency_key,
submit_span_link,
submitted_by,
"drop_oldest_queue_full",
QueueOnFullPolicy::DropOldest.as_str(),
Some(max_depth),
),
BackpressureStrategy::Queue {
max_depth,
on_full: QueueOnFullPolicy::DropNewest,
} => submit_with_newest_drop(
pool,
closure,
key,
priority,
idempotency_key,
submit_span_link,
submitted_by,
"drop_newest_queue_full",
QueueOnFullPolicy::DropNewest.as_str(),
Some(max_depth),
),
BackpressureStrategy::Queue {
on_full: QueueOnFullPolicy::FailSubmitter,
..
} => SubmitAttempt::Fail(policy_error(
Code::PoolBackpressureFull,
format!(
"pool.submit: pool '{}' queue is full under fail_submitter backpressure",
pool.name
),
)),
BackpressureStrategy::FailFast => SubmitAttempt::Fail(policy_error(
Code::PoolFailFastFull,
format!(
"pool.submit: pool '{}' has no immediate capacity under fail_fast backpressure",
pool.name
),
)),
BackpressureStrategy::RingBuffer { capacity } => submit_with_oldest_drop(
pool,
closure,
key,
priority,
idempotency_key,
submit_span_link,
submitted_by,
"ring_buffer_drop_oldest",
"ring_buffer",
Some(capacity),
),
}
}
fn can_accept_now(pool: &PoolEntry) -> bool {
if pool.active.len() < pool.max_concurrent && pool.queue.is_empty() {
return true;
}
match &pool.backpressure {
BackpressureStrategy::Unbounded => true,
BackpressureStrategy::Queue { max_depth, .. } => pool.queue.len() < *max_depth,
BackpressureStrategy::FailFast => false,
BackpressureStrategy::RingBuffer { capacity } => pool.queue.len() < *capacity,
}
}
#[allow(clippy::too_many_arguments)]
fn submit_with_oldest_drop(
pool: &mut PoolEntry,
closure: Rc<VmClosure>,
key: Option<String>,
priority: i64,
idempotency_key: Option<String>,
submit_span_link: Option<crate::tracing::SpanLink>,
submitted_by: String,
reason: &str,
policy: &str,
max_depth: Option<usize>,
) -> SubmitAttempt {
let queue_depth = pool.queue.len();
let (state, pending) = create_pending_task(
pool,
closure,
key,
priority,
idempotency_key,
submit_span_link,
submitted_by,
);
let replacement_task_id = state.borrow().id.clone();
let mut audits = Vec::new();
if let Some(dropped) = pool.queue.pop_front() {
audits.push(reject_pending_task(
pool,
dropped,
Some(replacement_task_id.as_str()),
reason,
policy,
queue_depth,
max_depth,
));
}
enqueue_task(pool, pending);
SubmitAttempt::Submitted {
task: state,
audits,
}
}
#[allow(clippy::too_many_arguments)]
fn submit_with_newest_drop(
pool: &mut PoolEntry,
closure: Rc<VmClosure>,
key: Option<String>,
priority: i64,
idempotency_key: Option<String>,
submit_span_link: Option<crate::tracing::SpanLink>,
submitted_by: String,
reason: &str,
policy: &str,
max_depth: Option<usize>,
) -> SubmitAttempt {
let queue_depth = pool.queue.len();
let (state, _pending) = create_pending_task(
pool,
closure,
key,
priority,
idempotency_key,
submit_span_link,
submitted_by,
);
let task_id = state.borrow().id.clone();
let waiters = reject_task_state(&state, reason, policy);
wake_task_waiters(waiters);
persist_task_if_durable(pool, &state.borrow());
let audit = pool_drop_audit(pool, &task_id, None, reason, policy, queue_depth, max_depth);
SubmitAttempt::Submitted {
task: state,
audits: vec![audit],
}
}
fn create_pending_task(
pool: &mut PoolEntry,
closure: Rc<VmClosure>,
key: Option<String>,
priority: i64,
idempotency_key: Option<String>,
submit_span_link: Option<crate::tracing::SpanLink>,
submitted_by: String,
) -> (Rc<RefCell<TaskState>>, PendingTask) {
pool.submit_counter += 1;
let seq = pool.submit_counter;
let task_id = next_task_id(pool);
let now_ms = now_ms_for_pool();
let state = Rc::new(RefCell::new(TaskState {
id: task_id.clone(),
pool_id: pool.id.clone(),
pool_name: pool.name.clone(),
key: key.clone(),
priority,
status: TaskStatus::Queued,
submitted_at: uuid::Uuid::now_v7().to_string(),
started_at: None,
finished_at: None,
result: None,
error: None,
rejection_reason: None,
rejection_policy: None,
idempotency_key: idempotency_key.clone(),
heartbeat_at_ms: now_ms,
submitted_at_ms: now_ms,
submit_span_link,
submitted_by,
waiters: Vec::new(),
}));
if let Some(idem) = &idempotency_key {
pool.idempotency_index.insert(idem.clone(), task_id.clone());
}
pool.tasks.insert(task_id.clone(), state.clone());
persist_task_if_durable(pool, &state.borrow());
let pending = PendingTask {
task_id,
closure,
state: state.clone(),
priority,
key,
seq,
};
(state, pending)
}
fn now_ms_for_pool() -> i64 {
crate::clock_mock::now_ms()
}
fn persist_task_if_durable(pool: &PoolEntry, state: &TaskState) {
let Some(store) = pool.store.as_ref() else {
return;
};
let record = PoolRecord::Task {
task: persisted_task_from_state(state),
};
if let Err(err) = store.borrow().append(&record) {
let _ = err;
}
}
fn persisted_meta_from_entry(pool: &PoolEntry) -> PersistedPoolMeta {
PersistedPoolMeta {
id: pool.id.clone(),
name: pool.name.clone(),
scope: pool.scope.as_str().to_string(),
scope_id: pool.scope_id.clone(),
max_concurrent: pool.max_concurrent,
created_at: pool.created_at.clone(),
submit_counter: pool.submit_counter,
}
}
fn persisted_task_from_state(state: &TaskState) -> PersistedTask {
PersistedTask {
id: state.id.clone(),
pool_id: state.pool_id.clone(),
pool_name: state.pool_name.clone(),
key: state.key.clone(),
priority: state.priority,
status: state.status.as_str().to_string(),
submitted_at: state.submitted_at.clone(),
started_at: state.started_at.clone(),
finished_at: state.finished_at.clone(),
error: state.error.clone(),
rejection_reason: state.rejection_reason.clone(),
rejection_policy: state.rejection_policy.clone(),
idempotency_key: state.idempotency_key.clone(),
result_display: state.result.as_ref().map(VmValue::display),
heartbeat_at_ms: state.heartbeat_at_ms,
seq: 0,
}
}
fn rehydrate_persisted_state(
entry: &Rc<RefCell<PoolEntry>>,
store: &Rc<RefCell<PoolDurableStore>>,
persisted: PersistedPoolState,
stale_after_ms: i64,
) -> Result<(), VmError> {
let now = now_ms_for_pool();
let mut idempotency_index: HashMap<String, String> = HashMap::new();
let mut tasks: BTreeMap<String, Rc<RefCell<TaskState>>> = BTreeMap::new();
let mut rehydrated_persisted: Vec<PersistedTask> = Vec::new();
{
let mut pool = entry.borrow_mut();
for (_, task) in persisted.tasks.into_iter() {
let live = task_state_from_persisted(&pool, &task, now, stale_after_ms);
let (task_id, idem) = {
let borrowed = live.borrow();
rehydrated_persisted.push(persisted_task_from_state(&borrowed));
(borrowed.id.clone(), borrowed.idempotency_key.clone())
};
if let Some(idem) = idem {
idempotency_index.insert(idem, task_id.clone());
}
tasks.insert(task_id, live);
}
pool.tasks = tasks;
pool.idempotency_index = idempotency_index;
}
let meta = persisted_meta_from_entry(&entry.borrow());
store.borrow().compact(&meta, &rehydrated_persisted)?;
Ok(())
}
fn task_state_from_persisted(
pool: &PoolEntry,
persisted: &PersistedTask,
now: i64,
stale_after_ms: i64,
) -> Rc<RefCell<TaskState>> {
let status = match persisted.status.as_str() {
"queued" | "running" => {
if now.saturating_sub(persisted.heartbeat_at_ms) >= stale_after_ms {
TaskStatus::Failed
} else {
TaskStatus::Failed
}
}
"completed" => TaskStatus::Completed,
"failed" => TaskStatus::Failed,
"rejected" => TaskStatus::Rejected,
_ => TaskStatus::Failed,
};
let finished_at = persisted
.finished_at
.clone()
.or_else(|| Some(uuid::Uuid::now_v7().to_string()));
let (error, rejection_reason, rejection_policy) = match status {
TaskStatus::Failed if persisted.status != "failed" => (
Some(format!(
"pool: task {} reloaded as stale after process restart",
persisted.id
)),
None,
None,
),
_ => (
persisted.error.clone(),
persisted.rejection_reason.clone(),
persisted.rejection_policy.clone(),
),
};
let result = persisted
.result_display
.as_ref()
.map(|text| VmValue::String(Rc::from(text.as_str())));
Rc::new(RefCell::new(TaskState {
id: persisted.id.clone(),
pool_id: pool.id.clone(),
pool_name: pool.name.clone(),
key: persisted.key.clone(),
priority: persisted.priority,
status,
submitted_at: persisted.submitted_at.clone(),
started_at: persisted.started_at.clone(),
finished_at,
result,
error,
rejection_reason,
rejection_policy,
idempotency_key: persisted.idempotency_key.clone(),
heartbeat_at_ms: persisted.heartbeat_at_ms,
submitted_at_ms: persisted.heartbeat_at_ms,
submit_span_link: None,
submitted_by: "reloaded".to_string(),
waiters: Vec::new(),
}))
}
fn enqueue_task(pool: &mut PoolEntry, pending: PendingTask) {
pool.queue.push_back(pending);
}
fn dispatch_ready(pool: &Rc<RefCell<PoolEntry>>) {
let mut freed_queue_space = false;
loop {
let next = {
let mut pool_ref = pool.borrow_mut();
if pool_ref.active.len() >= pool_ref.max_concurrent {
break;
}
let next = pop_next_task(&mut pool_ref);
if next.is_some() {
freed_queue_space = true;
}
next
};
let Some(pending) = next else { break };
spawn_task(pool.clone(), pending);
}
if freed_queue_space {
wake_space_waiters(pool);
}
}
fn reject_pending_task(
pool: &PoolEntry,
pending: PendingTask,
replacement_task_id: Option<&str>,
reason: &str,
policy: &str,
queue_depth: usize,
max_depth: Option<usize>,
) -> PoolDropAudit {
let task_id = pending.task_id.clone();
let waiters = reject_task_state(&pending.state, reason, policy);
wake_task_waiters(waiters);
persist_task_if_durable(pool, &pending.state.borrow());
pool_drop_audit(
pool,
&task_id,
replacement_task_id,
reason,
policy,
queue_depth,
max_depth,
)
}
fn reject_task_state(
state: &Rc<RefCell<TaskState>>,
reason: &str,
policy: &str,
) -> Vec<tokio::sync::oneshot::Sender<()>> {
let mut state_ref = state.borrow_mut();
state_ref.status = TaskStatus::Rejected;
state_ref.finished_at = Some(uuid::Uuid::now_v7().to_string());
state_ref.heartbeat_at_ms = now_ms_for_pool();
state_ref.error = Some(reason.to_string());
state_ref.rejection_reason = Some(reason.to_string());
state_ref.rejection_policy = Some(policy.to_string());
std::mem::take(&mut state_ref.waiters)
}
fn wake_task_waiters(waiters: Vec<tokio::sync::oneshot::Sender<()>>) {
for waiter in waiters {
let _ = waiter.send(());
}
}
fn wake_space_waiters(pool: &Rc<RefCell<PoolEntry>>) {
let waiters = {
let mut pool_ref = pool.borrow_mut();
std::mem::take(&mut pool_ref.space_waiters)
};
for waiter in waiters {
let _ = waiter.send(());
}
}
fn pool_drop_audit(
pool: &PoolEntry,
task_id: &str,
replacement_task_id: Option<&str>,
reason: &str,
policy: &str,
queue_depth: usize,
max_depth: Option<usize>,
) -> PoolDropAudit {
PoolDropAudit {
pool_id: pool.id.clone(),
pool_name: pool.name.clone(),
task_id: task_id.to_string(),
replacement_task_id: replacement_task_id.map(str::to_string),
reason: reason.to_string(),
policy: policy.to_string(),
queue_depth,
max_depth,
occurred_at: uuid::Uuid::now_v7().to_string(),
}
}
async fn emit_pool_drop(audit: PoolDropAudit) {
let topic = Topic::new(POOL_AUDIT_TOPIC).expect("static pool audit topic is valid");
let mut headers = BTreeMap::new();
headers.insert("schema".to_string(), "harn.pool_drop.v1".to_string());
headers.insert("policy".to_string(), audit.policy.clone());
let payload = json!({
"pool_id": audit.pool_id,
"pool": audit.pool_name,
"task_id": audit.task_id,
"replacement_task_id": audit.replacement_task_id,
"reason": audit.reason,
"policy": audit.policy,
"queue_depth": audit.queue_depth,
"max_depth": audit.max_depth,
"occurred_at": audit.occurred_at,
});
let _ = ensure_pool_event_log()
.append(
&topic,
LogEvent::new("pool_drop", payload).with_headers(headers),
)
.await;
}
fn pool_submit_receipt(pool: &PoolEntry, task: &TaskState) -> PoolSubmitReceipt {
PoolSubmitReceipt {
pool_id: pool.id.clone(),
pool_name: pool.name.clone(),
task_id: task.id.clone(),
submitted_at: task.submitted_at.clone(),
priority: task.priority,
key: task.key.clone(),
idempotency_key: task.idempotency_key.clone(),
submitted_by: task.submitted_by.clone(),
}
}
async fn emit_pool_submit_receipt(receipt: PoolSubmitReceipt) {
let topic = Topic::new(POOL_AUDIT_TOPIC).expect("static pool audit topic is valid");
let mut headers = BTreeMap::new();
headers.insert("schema".to_string(), "harn.pool_submit.v1".to_string());
let payload = json!({
"pool_id": receipt.pool_id,
"pool": receipt.pool_name,
"task_id": receipt.task_id,
"submitted_at": receipt.submitted_at,
"priority": receipt.priority,
"key": receipt.key,
"idempotency_key": receipt.idempotency_key,
"submitted_by": receipt.submitted_by,
});
let _ = ensure_pool_event_log()
.append(
&topic,
LogEvent::new("pool_submit", payload).with_headers(headers),
)
.await;
}
fn pool_dequeue_receipt(
pool: &PoolEntry,
task: &TaskState,
slot_index: usize,
) -> PoolDequeueReceipt {
let now_ms = now_ms_for_pool();
let queued_for_ms = now_ms.saturating_sub(task.submitted_at_ms);
PoolDequeueReceipt {
pool_id: pool.id.clone(),
pool_name: pool.name.clone(),
task_id: task.id.clone(),
dequeued_at: task
.started_at
.clone()
.unwrap_or_else(|| uuid::Uuid::now_v7().to_string()),
queued_for_ms,
slot_index,
}
}
async fn emit_pool_dequeue_receipt(receipt: PoolDequeueReceipt) {
let topic = Topic::new(POOL_AUDIT_TOPIC).expect("static pool audit topic is valid");
let mut headers = BTreeMap::new();
headers.insert("schema".to_string(), "harn.pool_dequeue.v1".to_string());
let payload = json!({
"pool_id": receipt.pool_id,
"pool": receipt.pool_name,
"task_id": receipt.task_id,
"dequeued_at": receipt.dequeued_at,
"queued_for_ms": receipt.queued_for_ms,
"slot_index": receipt.slot_index,
});
let _ = ensure_pool_event_log()
.append(
&topic,
LogEvent::new("pool_dequeue", payload).with_headers(headers),
)
.await;
}
fn ensure_pool_event_log() -> Arc<crate::event_log::AnyEventLog> {
active_event_log()
.unwrap_or_else(|| install_memory_for_current_thread(POOL_EVENT_LOG_QUEUE_DEPTH))
}
fn policy_error(code: Code, message: String) -> VmError {
VmError::Runtime(format!("{}: {message}", code.as_str()))
}
fn pop_next_task(pool: &mut PoolEntry) -> Option<PendingTask> {
match &pool.queue_strategy {
QueueStrategy::Fifo => pool.queue.pop_front(),
QueueStrategy::Lifo => pool.queue.pop_back(),
QueueStrategy::Priority => {
let index = priority_queue_index(pool.queue.iter().map(|p| (p.priority, p.seq)))?;
pool.queue.remove(index)
}
QueueStrategy::FairRoundRobin { .. } => pop_fair_round_robin(pool),
}
}
fn priority_queue_index<I>(existing: I) -> Option<usize>
where
I: Iterator<Item = (i64, u64)>,
{
existing
.enumerate()
.max_by(
|(_, (left_priority, left_seq)), (_, (right_priority, right_seq))| {
left_priority
.cmp(right_priority)
.then_with(|| right_seq.cmp(left_seq))
},
)
.map(|(index, _)| index)
}
fn pop_fair_round_robin(pool: &mut PoolEntry) -> Option<PendingTask> {
if pool.queue.is_empty() {
return None;
}
let mut keys = Vec::<String>::new();
for pending in &pool.queue {
let key = fair_key(pending);
if !keys.contains(&key) {
keys.push(key);
}
}
let selected_key = match pool.round_robin_after.as_deref() {
Some(after) => keys
.iter()
.position(|key| key == after)
.map(|index| keys[(index + 1) % keys.len()].clone())
.unwrap_or_else(|| keys[0].clone()),
None => keys[0].clone(),
};
let index = pool
.queue
.iter()
.position(|pending| fair_key(pending) == selected_key)?;
pool.round_robin_after = Some(selected_key);
pool.queue.remove(index)
}
fn fair_key(pending: &PendingTask) -> String {
pending.key.clone().unwrap_or_default()
}
fn spawn_task(pool: Rc<RefCell<PoolEntry>>, pending: PendingTask) {
let PendingTask {
task_id,
closure,
state,
..
} = pending;
let submit_link = state.borrow().submit_span_link.clone();
let span_links: Vec<crate::tracing::SpanLink> = submit_link
.into_iter()
.map(|link| {
link.with_attributes(BTreeMap::from([(
"harn.link.kind".to_string(),
"pool_submit".to_string(),
)]))
})
.collect();
let (pool_id_for_span, pool_name_for_span) = {
let pool_ref = pool.borrow();
(pool_ref.id.clone(), pool_ref.name.clone())
};
let mut dequeue_span = PoolSpanGuard::start_detached(
crate::tracing::SpanKind::PoolDequeue,
format!("pool.dequeue {pool_name_for_span}"),
span_links,
);
dequeue_span.set_metadata("pool", serde_json::json!(pool_name_for_span));
dequeue_span.set_metadata("pool_id", serde_json::json!(pool_id_for_span));
dequeue_span.set_metadata("task_id", serde_json::json!(task_id));
{
let mut state_ref = state.borrow_mut();
state_ref.status = TaskStatus::Running;
state_ref.started_at = Some(uuid::Uuid::now_v7().to_string());
state_ref.heartbeat_at_ms = now_ms_for_pool();
}
let dequeue_receipt = {
let mut pool_ref = pool.borrow_mut();
pool_ref.active.insert(task_id.clone(), state.clone());
let slot_index = pool_ref.active.len().saturating_sub(1);
let receipt = pool_dequeue_receipt(&pool_ref, &state.borrow(), slot_index);
persist_task_if_durable(&pool_ref, &state.borrow());
receipt
};
dequeue_span.set_metadata(
"queued_for_ms",
serde_json::json!(dequeue_receipt.queued_for_ms),
);
dequeue_span.set_metadata("slot_index", serde_json::json!(dequeue_receipt.slot_index));
tokio::task::spawn_local(emit_pool_dequeue_receipt(dequeue_receipt));
let Some(mut child_vm) = crate::vm::clone_async_builtin_child_vm() else {
dequeue_span.end();
finalize_task(
&pool,
&state,
Err("pool: no VM execution context".to_string()),
);
return;
};
dequeue_span.end();
tokio::task::spawn_local(async move {
let outcome = child_vm
.call_closure(&closure, &[])
.await
.map_err(|error| error.to_string());
finalize_task(&pool, &state, outcome);
});
}
fn finalize_task(
pool: &Rc<RefCell<PoolEntry>>,
state: &Rc<RefCell<TaskState>>,
outcome: Result<VmValue, String>,
) {
let waiters: Vec<tokio::sync::oneshot::Sender<()>>;
let task_id;
{
let mut state_ref = state.borrow_mut();
state_ref.finished_at = Some(uuid::Uuid::now_v7().to_string());
state_ref.heartbeat_at_ms = now_ms_for_pool();
match outcome {
Ok(value) => {
state_ref.status = TaskStatus::Completed;
state_ref.result = Some(value);
}
Err(error) => {
state_ref.status = TaskStatus::Failed;
state_ref.error = Some(error);
}
}
task_id = state_ref.id.clone();
waiters = std::mem::take(&mut state_ref.waiters);
}
{
let mut pool_ref = pool.borrow_mut();
pool_ref.active.remove(&task_id);
persist_task_if_durable(&pool_ref, &state.borrow());
}
wake_task_waiters(waiters);
dispatch_ready(pool);
wake_space_waiters(pool);
}
async fn pool_wait_builtin(args: Vec<VmValue>) -> Result<VmValue, VmError> {
let target = args
.first()
.ok_or_else(|| VmError::Runtime("pool_wait: task handle is required".to_string()))?;
match target {
VmValue::List(items) => {
let mut results = Vec::with_capacity(items.len());
for item in items.iter() {
results.push(wait_single_task(item).await?);
}
Ok(VmValue::List(Rc::new(results)))
}
_ => wait_single_task(target).await,
}
}
async fn wait_single_task(value: &VmValue) -> Result<VmValue, VmError> {
let (pool_id, task_id) = task_handle_from_value(value, "pool_wait")?;
let entry = lookup_pool(&pool_id)?;
let state = {
let pool = entry.borrow();
pool.tasks
.get(&task_id)
.cloned()
.ok_or_else(|| VmError::Runtime(format!("pool_wait: task not found: {task_id}")))?
};
let receiver = {
let mut state_ref = state.borrow_mut();
if state_ref.status.is_terminal() {
return Ok(task_snapshot_value(&state_ref));
}
let (tx, rx) = tokio::sync::oneshot::channel();
state_ref.waiters.push(tx);
rx
};
let _ = receiver.await;
let snapshot = task_snapshot_value(&state.borrow());
Ok(snapshot)
}
const POOL_SYNC_PRIMITIVES: &[SyncBuiltin] = &[
SyncBuiltin::new("__pool_create", pool_create_sync)
.signature("__pool_create(options?)")
.arity(VmBuiltinArity::Range { min: 0, max: 1 })
.doc("Create a named agent pool and register it in the local pool registry."),
SyncBuiltin::new("__pool_get", pool_get_sync)
.signature("__pool_get(name_or_id)")
.arity(VmBuiltinArity::Exact(1))
.doc("Look up a pool by name or id; returns nil when missing."),
SyncBuiltin::new("__pool_list", pool_list_sync)
.signature("__pool_list()")
.arity(VmBuiltinArity::Exact(0))
.doc("List every pool registered in the local pool registry."),
SyncBuiltin::new("__pool_size", pool_size_sync)
.signature("__pool_size(pool)")
.arity(VmBuiltinArity::Exact(1))
.doc("Return active + queued task count for a pool."),
SyncBuiltin::new("__pool_snapshot", pool_snapshot_sync)
.signature("__pool_snapshot(pool)")
.arity(VmBuiltinArity::Exact(1))
.doc("Return the full pool snapshot for inspection."),
SyncBuiltin::new("__pool_simulate_restart", pool_reload_sync)
.signature("__pool_simulate_restart()")
.arity(VmBuiltinArity::Exact(0))
.doc("Drop the in-process pool registry; pipeline-scope pools reload from disk on next pool_create."),
];
const POOL_ASYNC_PRIMITIVES: &[AsyncBuiltin] = &[
async_builtin!("__pool_submit", pool_submit_builtin)
.signature("__pool_submit(pool, closure, options?)")
.arity(VmBuiltinArity::Range { min: 2, max: 3 })
.doc("Submit a closure to a pool; spawns when a slot is free, otherwise queues."),
async_builtin!("__pool_wait", pool_wait_builtin)
.signature("__pool_wait(handle_or_handles)")
.arity(VmBuiltinArity::Exact(1))
.doc("Block until one or more pool task handles reach a terminal state."),
];
const POOL_PRIMITIVES: BuiltinGroup<'static> = BuiltinGroup::new()
.category("pool")
.sync(POOL_SYNC_PRIMITIVES)
.async_(POOL_ASYNC_PRIMITIVES);
pub(crate) fn register_pool_builtins(vm: &mut Vm) {
register_builtin_group(vm, POOL_PRIMITIVES);
}
pub fn reset_pool_state() {
POOLS.with(|pools| pools.borrow_mut().clear());
POOL_NAMES.with(|names| names.borrow_mut().clear());
}
pub(crate) fn snapshot_pending_tasks() -> Vec<serde_json::Value> {
let now_ms = crate::stdlib::clock::now_wall_ms();
POOLS.with(|pools| {
let registry = pools.borrow();
let mut ordered: Vec<(&String, &Rc<RefCell<PoolEntry>>)> = registry.iter().collect();
ordered.sort_by(|a, b| a.0.cmp(b.0));
let mut out = Vec::new();
for (_pool_id, entry) in ordered {
let pool = entry.borrow();
for pending in &pool.queue {
let task = pending.state.borrow();
if task.status.is_terminal() {
continue;
}
out.push(pending_task_snapshot_json(&pool, &task, now_ms));
}
for state in pool.tasks.values() {
let task = state.borrow();
if task.status != TaskStatus::Running {
continue;
}
out.push(pending_task_snapshot_json(&pool, &task, now_ms));
}
}
out
})
}
fn pending_task_snapshot_json(
pool: &PoolEntry,
task: &TaskState,
now_ms: i64,
) -> serde_json::Value {
let queued_at_ms = task.submitted_at_ms;
let age_ms = now_ms.saturating_sub(queued_at_ms).max(0);
serde_json::json!({
"id": task.id.clone(),
"task_id": task.id.clone(),
"pool_id": pool.id.clone(),
"pool_name": pool.name.clone(),
"status": task.status.as_str(),
"priority": task.priority,
"key": task.key.clone(),
"idempotency_key": task.idempotency_key.clone(),
"submitted_at": task.submitted_at.clone(),
"submitted_at_ms": queued_at_ms,
"submitted_by": task.submitted_by.clone(),
"started_at": task.started_at.clone(),
"age_ms": age_ms,
})
}
#[cfg(test)]
mod tests {
use super::priority_queue_index;
fn dispatch_all(items: &[(i64, u64)]) -> Vec<u64> {
let mut queue: Vec<(i64, u64)> = items.to_vec();
let mut out = Vec::new();
while let Some(index) = priority_queue_index(queue.iter().copied()) {
let (_, seq) = queue.remove(index);
out.push(seq);
}
out
}
#[test]
fn higher_priority_dequeues_first_ties_break_by_seq() {
assert_eq!(
dispatch_all(&[(0, 1), (5, 2), (5, 3), (10, 4)]),
vec![4, 2, 3, 1]
);
}
#[test]
fn equal_priority_is_pure_fifo() {
assert_eq!(dispatch_all(&[(0, 1), (0, 2), (0, 3)]), vec![1, 2, 3]);
}
#[test]
fn empty_priority_queue_has_no_next_task() {
assert_eq!(priority_queue_index(std::iter::empty()), None);
}
}