use std::cell::Cell;
use serde_json::Value;
use crate::orchestration::{
acknowledge_partial_handoff, lifecycle_audit_log_snapshot, record_lifecycle_audit,
unsettled_state_snapshot_async, HookControl, HookEvent,
};
pub const DRAIN_DEFAULT_BUDGET: usize = 5;
pub const DRAIN_HARD_CAP: usize = 20;
thread_local! {
static SETTLEMENT_AGENT_ACTIVE: Cell<bool> = const { Cell::new(false) };
}
pub fn settlement_agent_active() -> bool {
SETTLEMENT_AGENT_ACTIVE.with(Cell::get)
}
struct SettlementAgentGuard;
impl SettlementAgentGuard {
fn new() -> Self {
SETTLEMENT_AGENT_ACTIVE.with(|cell| cell.set(true));
Self
}
}
impl Drop for SettlementAgentGuard {
fn drop(&mut self) {
SETTLEMENT_AGENT_ACTIVE.with(|cell| cell.set(false));
}
}
pub fn decode_drain_budget(options: &Value) -> usize {
let raw = match options {
Value::Number(n) => n.as_u64().map(|v| v as usize),
Value::Object(map) => map
.get("drain_budget_iterations")
.and_then(Value::as_u64)
.map(|v| v as usize),
_ => None,
};
raw.filter(|v| *v > 0)
.unwrap_or(DRAIN_DEFAULT_BUDGET)
.clamp(1, DRAIN_HARD_CAP)
}
#[derive(Debug, Default, Clone, Copy)]
struct IterationProgress {
subagents: usize,
triggers: usize,
handoffs: usize,
llm_calls: usize,
pool: usize,
}
impl IterationProgress {
fn total(self) -> usize {
self.subagents + self.triggers + self.handoffs + self.llm_calls + self.pool
}
}
pub async fn run_settlement_agent_loop(
initial_unsettled: Value,
return_value: Value,
options: Value,
) -> Value {
let budget = decode_drain_budget(&options);
let _guard = SettlementAgentGuard::new();
let audit_baseline = lifecycle_audit_log_snapshot().len();
let live = unsettled_state_snapshot_async().await;
if live.is_empty() && unsettled_is_empty(&initial_unsettled) {
return serde_json::json!({
"status": "no_unsettled",
"method": "spawn_settlement_agent",
"iterations": 0,
"items_processed": 0,
"budget": budget,
"remaining": live.to_json(),
"return_value": return_value,
"audit_count": 0,
});
}
let mut iterations: usize = 0;
let mut total_processed: usize = 0;
let mut last_snapshot_json = live.to_json();
while iterations < budget {
iterations += 1;
let snapshot = unsettled_state_snapshot_async().await;
last_snapshot_json = snapshot.to_json();
if snapshot.is_empty() {
break;
}
let progress = drain_one_iteration(&snapshot).await;
total_processed += progress.total();
if progress.total() == 0 {
break;
}
}
let final_snapshot = unsettled_state_snapshot_async().await;
let exhausted = !final_snapshot.is_empty();
if exhausted {
record_lifecycle_audit(
"drain_unsettled_remaining",
serde_json::json!({
"iterations": iterations,
"budget": budget,
"items_processed": total_processed,
"remaining": final_snapshot.to_json(),
}),
);
last_snapshot_json = final_snapshot.to_json();
} else {
record_lifecycle_audit(
"pipeline_finalized",
serde_json::json!({
"reason": "drained",
"iterations": iterations,
"items_processed": total_processed,
}),
);
}
let audit_count = lifecycle_audit_log_snapshot()
.len()
.saturating_sub(audit_baseline);
let status = if exhausted { "exhausted" } else { "completed" };
serde_json::json!({
"status": status,
"method": "spawn_settlement_agent",
"iterations": iterations,
"items_processed": total_processed,
"budget": budget,
"remaining": last_snapshot_json,
"return_value": return_value,
"audit_count": audit_count,
})
}
fn unsettled_is_empty(value: &Value) -> bool {
fn empty_bucket(value: &Value, key: &str) -> bool {
value
.get(key)
.and_then(Value::as_array)
.map(|items| items.is_empty())
.unwrap_or(true)
}
empty_bucket(value, "suspended_subagents")
&& empty_bucket(value, "queued_triggers")
&& empty_bucket(value, "partial_handoffs")
&& empty_bucket(value, "in_flight_llm_calls")
&& empty_bucket(value, "pool_pending_tasks")
}
async fn drain_one_iteration(
snapshot: &crate::orchestration::UnsettledStateSnapshot,
) -> IterationProgress {
let mut progress = IterationProgress::default();
if let Some(item) = snapshot.suspended_subagents.first() {
let handle = item
.get("handle")
.or_else(|| item.get("id"))
.cloned()
.unwrap_or(Value::Null);
emit_drain_decision(
"cancel",
"suspended_subagent",
handle,
item.clone(),
Some("default disposition: cancel suspended subagent at drain"),
)
.await;
progress.subagents += 1;
}
if let Some(item) = snapshot.queued_triggers.first() {
let id = item.get("id").cloned().unwrap_or(Value::Null);
emit_drain_decision(
"acknowledge",
"queued_trigger",
id,
item.clone(),
Some("default disposition: acknowledge stale queued trigger at drain"),
)
.await;
progress.triggers += 1;
}
if let Some(item) = snapshot.partial_handoffs.first() {
let envelope_id = item
.get("envelope_id")
.and_then(Value::as_str)
.unwrap_or_default()
.to_string();
let item_id = item
.get("envelope_id")
.cloned()
.unwrap_or_else(|| Value::String(envelope_id.clone()));
let _ = acknowledge_partial_handoff(
&envelope_id,
serde_json::json!({"disposition": "deferred", "source": "settlement_agent"}),
);
emit_drain_decision(
"acknowledge",
"partial_handoff",
item_id,
item.clone(),
Some("default disposition: acknowledge partial handoff as deferred"),
)
.await;
progress.handoffs += 1;
}
if let Some(item) = snapshot.in_flight_llm_calls.first() {
let id = item.get("call_id").cloned().unwrap_or(Value::Null);
emit_drain_decision(
"drain",
"in_flight_llm_call",
id,
item.clone(),
Some("default disposition: record drain decision for in-flight llm call"),
)
.await;
progress.llm_calls += 1;
}
if let Some(item) = snapshot.pool_pending_tasks.first() {
let id = item
.get("task_id")
.or_else(|| item.get("id"))
.cloned()
.unwrap_or(Value::Null);
emit_drain_decision(
"defer",
"pool_pending_task",
id,
item.clone(),
Some("default disposition: defer pool pending task at drain"),
)
.await;
progress.pool += 1;
}
progress
}
async fn emit_drain_decision(
action: &str,
category: &str,
item_id: Value,
item: Value,
reason: Option<&str>,
) {
let item_payload = serde_json::json!({
"category": category,
"id": item_id,
"summary": item,
});
let payload = serde_json::json!({
"action": action,
"item": item_payload,
"reason": reason.unwrap_or("default disposition"),
});
let hook_payload = serde_json::json!({
"event": HookEvent::OnDrainDecision.as_str(),
"action": payload.get("action").cloned().unwrap_or(Value::Null),
"item": payload.get("item").cloned().unwrap_or(Value::Null),
"payload": payload.clone(),
});
let mut effective = payload;
match super::hooks::run_lifecycle_hooks_with_control(HookEvent::OnDrainDecision, &hook_payload)
.await
{
Ok(HookControl::Allow) | Err(_) => {}
Ok(HookControl::Block { .. }) => return,
Ok(HookControl::Modify { payload: modified }) => {
if let Some(p) = modified.get("payload") {
effective = p.clone();
}
}
Ok(HookControl::Decision { .. }) => {}
}
record_lifecycle_audit("drain_decision", effective);
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn decode_drain_budget_uses_default_for_null() {
assert_eq!(decode_drain_budget(&Value::Null), DRAIN_DEFAULT_BUDGET);
}
#[test]
fn decode_drain_budget_clamps_to_hard_cap() {
assert_eq!(
decode_drain_budget(&serde_json::json!(1000)),
DRAIN_HARD_CAP
);
}
#[test]
fn decode_drain_budget_reads_object_field() {
assert_eq!(
decode_drain_budget(&serde_json::json!({"drain_budget_iterations": 3})),
3
);
}
#[test]
fn decode_drain_budget_rejects_zero_and_negative() {
assert_eq!(
decode_drain_budget(&serde_json::json!(0)),
DRAIN_DEFAULT_BUDGET
);
assert_eq!(
decode_drain_budget(&serde_json::json!(-5)),
DRAIN_DEFAULT_BUDGET
);
}
#[test]
fn unsettled_is_empty_handles_missing_buckets() {
assert!(unsettled_is_empty(&serde_json::json!({})));
assert!(unsettled_is_empty(
&serde_json::json!({"suspended_subagents": []})
));
assert!(!unsettled_is_empty(
&serde_json::json!({"queued_triggers": [{"id": "x"}]})
));
}
}