use crate::types::{AgentOutput, TokenAccounting};
use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
pub struct BudgetLimits {
#[serde(default)]
pub max_input_tokens: Option<u64>,
#[serde(default)]
pub max_output_tokens: Option<u64>,
#[serde(default)]
pub max_total_tokens: Option<u64>,
#[serde(default)]
pub max_cost_usd: Option<f64>,
#[serde(default)]
pub max_agents: Option<u32>,
}
impl BudgetLimits {
pub fn is_unbounded(&self) -> bool {
self.max_input_tokens.is_none()
&& self.max_output_tokens.is_none()
&& self.max_total_tokens.is_none()
&& self.max_cost_usd.is_none()
&& self.max_agents.is_none()
}
}
fn usd_to_micros(usd: f64) -> u64 {
if !usd.is_finite() || usd <= 0.0 {
return 0;
}
(usd * 1_000_000.0).round() as u64
}
fn micros_to_usd(micros: u64) -> f64 {
micros as f64 / 1_000_000.0
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct BudgetSnapshot {
pub input_tokens: u64,
pub output_tokens: u64,
pub total_tokens: u64,
pub cost_usd: f64,
pub agents_started: u32,
pub limits: BudgetLimits,
pub exhausted: bool,
}
#[derive(Debug, Clone, PartialEq)]
pub enum BudgetError {
Exhausted {
reason: String,
snapshot: BudgetSnapshot,
},
AgentLimit { max: u32, started: u32 },
}
impl std::fmt::Display for BudgetError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BudgetError::Exhausted { reason, .. } => {
write!(f, "coordination budget exhausted: {}", reason)
}
BudgetError::AgentLimit { max, started } => write!(
f,
"coordination budget agent cap reached: {} of {} agents already started",
started, max
),
}
}
}
impl std::error::Error for BudgetError {}
#[derive(Debug)]
pub struct CoordinationBudget {
limits: BudgetLimits,
spent_input: AtomicU64,
spent_output: AtomicU64,
spent_cost_micros: AtomicU64,
agents_started: AtomicU32,
}
impl CoordinationBudget {
pub fn new(limits: BudgetLimits) -> Self {
Self {
limits,
spent_input: AtomicU64::new(0),
spent_output: AtomicU64::new(0),
spent_cost_micros: AtomicU64::new(0),
agents_started: AtomicU32::new(0),
}
}
pub fn unbounded() -> Self {
Self::new(BudgetLimits::default())
}
pub fn is_unbounded(&self) -> bool {
self.limits.is_unbounded()
}
fn spend_exhaustion(&self) -> Option<String> {
let input = self.spent_input.load(Ordering::Relaxed);
let output = self.spent_output.load(Ordering::Relaxed);
let cost_micros = self.spent_cost_micros.load(Ordering::Relaxed);
if let Some(max) = self.limits.max_input_tokens {
if input >= max {
return Some(format!("input tokens {} >= limit {}", input, max));
}
}
if let Some(max) = self.limits.max_output_tokens {
if output >= max {
return Some(format!("output tokens {} >= limit {}", output, max));
}
}
if let Some(max) = self.limits.max_total_tokens {
let total = input.saturating_add(output);
if total >= max {
return Some(format!("total tokens {} >= limit {}", total, max));
}
}
if let Some(max) = self.limits.max_cost_usd {
let cost = micros_to_usd(cost_micros);
if cost_micros >= usd_to_micros(max) {
return Some(format!("cost ${:.4} >= limit ${:.4}", cost, max));
}
}
None
}
pub fn is_exhausted(&self) -> bool {
self.spend_exhaustion().is_some()
}
pub fn try_begin_agent(&self) -> Result<(), BudgetError> {
if let Some(reason) = self.spend_exhaustion() {
return Err(BudgetError::Exhausted {
reason,
snapshot: self.snapshot(),
});
}
match self.limits.max_agents {
None => {
self.agents_started.fetch_add(1, Ordering::Relaxed);
Ok(())
}
Some(max) => {
let mut current = self.agents_started.load(Ordering::Relaxed);
loop {
if current >= max {
return Err(BudgetError::AgentLimit {
max,
started: current,
});
}
match self.agents_started.compare_exchange_weak(
current,
current + 1,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => return Ok(()),
Err(observed) => current = observed,
}
}
}
}
}
pub fn record(&self, tokens: &TokenAccounting) {
self.spent_input
.fetch_add(tokens.input_tokens, Ordering::Relaxed);
self.spent_output
.fetch_add(tokens.output_tokens, Ordering::Relaxed);
self.spent_cost_micros
.fetch_add(usd_to_micros(tokens.cost_usd), Ordering::Relaxed);
}
pub fn record_output(&self, out: &AgentOutput) {
if let Some(tokens) = &out.tokens {
self.record(tokens);
}
}
pub fn snapshot(&self) -> BudgetSnapshot {
let input = self.spent_input.load(Ordering::Relaxed);
let output = self.spent_output.load(Ordering::Relaxed);
let cost = micros_to_usd(self.spent_cost_micros.load(Ordering::Relaxed));
BudgetSnapshot {
input_tokens: input,
output_tokens: output,
total_tokens: input.saturating_add(output),
cost_usd: cost,
agents_started: self.agents_started.load(Ordering::Relaxed),
limits: self.limits.clone(),
exhausted: self.spend_exhaustion().is_some(),
}
}
}
impl Default for CoordinationBudget {
fn default() -> Self {
Self::unbounded()
}
}
pub fn budget_skipped_output(name: &str, err: &BudgetError) -> AgentOutput {
let reason = err.to_string();
let outcome = car_ir::AgentOutcome::give_up(&reason).with_evidence(car_ir::Evidence {
kind: car_ir::EvidenceKind::StopReason,
description: reason.clone(),
data: Some(serde_json::json!({ "budget_skipped": true })),
});
AgentOutput {
name: name.to_string(),
answer: String::new(),
turns: 0,
tool_calls: 0,
duration_ms: 0.0,
error: Some(reason),
outcome: Some(outcome),
tokens: None,
tools_used: Vec::new(),
}
}
pub fn is_budget_skipped(out: &AgentOutput) -> bool {
out.outcome.as_ref().is_some_and(|o| {
o.evidence.iter().any(|e| {
e.data
.as_ref()
.and_then(|d| d.get("budget_skipped"))
.and_then(|v| v.as_bool())
.unwrap_or(false)
})
})
}
#[cfg(test)]
mod tests {
use super::*;
fn toks(input: u64, output: u64, cost: f64) -> TokenAccounting {
TokenAccounting::new(input, output, cost)
}
#[test]
fn unbounded_never_denies() {
let b = CoordinationBudget::unbounded();
assert!(b.is_unbounded());
for _ in 0..1000 {
assert!(b.try_begin_agent().is_ok());
b.record(&toks(10_000, 10_000, 100.0));
}
assert!(!b.is_exhausted());
}
#[test]
fn total_token_limit_stops_next_spawn() {
let b = CoordinationBudget::new(BudgetLimits {
max_total_tokens: Some(100),
..Default::default()
});
assert!(b.try_begin_agent().is_ok());
b.record(&toks(60, 50, 0.0)); match b.try_begin_agent() {
Err(BudgetError::Exhausted { reason, snapshot }) => {
assert!(reason.contains("total tokens"));
assert_eq!(snapshot.total_tokens, 110);
assert!(snapshot.exhausted);
}
other => panic!("expected Exhausted, got {:?}", other),
}
}
#[test]
fn input_and_output_limits_are_independent() {
let b = CoordinationBudget::new(BudgetLimits {
max_output_tokens: Some(50),
..Default::default()
});
assert!(b.try_begin_agent().is_ok());
b.record(&toks(10_000, 10, 0.0)); assert!(b.try_begin_agent().is_ok(), "input spend must not trip output cap");
b.record(&toks(0, 60, 0.0)); assert!(matches!(
b.try_begin_agent(),
Err(BudgetError::Exhausted { .. })
));
}
#[test]
fn cost_limit_enforced() {
let b = CoordinationBudget::new(BudgetLimits {
max_cost_usd: Some(1.0),
..Default::default()
});
assert!(b.try_begin_agent().is_ok());
b.record(&toks(0, 0, 0.99));
assert!(b.try_begin_agent().is_ok());
b.record(&toks(0, 0, 0.02)); assert!(matches!(
b.try_begin_agent(),
Err(BudgetError::Exhausted { .. })
));
}
#[test]
fn agent_cap_enforced_exactly() {
let b = CoordinationBudget::new(BudgetLimits {
max_agents: Some(3),
..Default::default()
});
assert!(b.try_begin_agent().is_ok());
assert!(b.try_begin_agent().is_ok());
assert!(b.try_begin_agent().is_ok());
match b.try_begin_agent() {
Err(BudgetError::AgentLimit { max, started }) => {
assert_eq!(max, 3);
assert_eq!(started, 3);
}
other => panic!("expected AgentLimit, got {:?}", other),
}
assert_eq!(b.snapshot().agents_started, 3);
}
#[test]
fn spend_limit_takes_priority_over_agent_cap() {
let b = CoordinationBudget::new(BudgetLimits {
max_total_tokens: Some(10),
max_agents: Some(100),
..Default::default()
});
assert!(b.try_begin_agent().is_ok());
b.record(&toks(20, 0, 0.0));
assert!(matches!(
b.try_begin_agent(),
Err(BudgetError::Exhausted { .. })
));
}
#[test]
fn skipped_output_is_detectable_without_string_matching() {
let err = BudgetError::AgentLimit {
max: 2,
started: 2,
};
let out = budget_skipped_output("worker", &err);
assert!(super::is_budget_skipped(&out));
assert!(!out.succeeded());
assert_eq!(
out.outcome.as_ref().unwrap().status,
car_ir::OutcomeStatus::GiveUp
);
let normal = AgentOutput {
name: "x".into(),
answer: "done".into(),
turns: 1,
tool_calls: 0,
duration_ms: 1.0,
error: None,
outcome: Some(car_ir::AgentOutcome::success("ok")),
tokens: None,
tools_used: Vec::new(),
};
assert!(!super::is_budget_skipped(&normal));
}
#[test]
fn huge_reported_tokens_do_not_panic() {
let b = CoordinationBudget::new(BudgetLimits {
max_total_tokens: Some(100),
..Default::default()
});
b.record(&toks(u64::MAX, u64::MAX, 0.0));
assert!(b.is_exhausted());
assert_eq!(b.snapshot().total_tokens, u64::MAX);
}
#[test]
fn concurrent_agent_cap_never_over_reserves() {
use std::sync::Arc;
let b = Arc::new(CoordinationBudget::new(BudgetLimits {
max_agents: Some(50),
..Default::default()
}));
let mut handles = Vec::new();
for _ in 0..16 {
let b = Arc::clone(&b);
handles.push(std::thread::spawn(move || {
let mut ok = 0;
for _ in 0..100 {
if b.try_begin_agent().is_ok() {
ok += 1;
}
}
ok
}));
}
let granted: u32 = handles.into_iter().map(|h| h.join().unwrap()).sum();
assert_eq!(granted, 50);
assert_eq!(b.snapshot().agents_started, 50);
}
}