1use crate::types::{AgentOutput, TokenAccounting};
46use serde::{Deserialize, Serialize};
47use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
48
49#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
54pub struct BudgetLimits {
55 #[serde(default)]
57 pub max_input_tokens: Option<u64>,
58 #[serde(default)]
60 pub max_output_tokens: Option<u64>,
61 #[serde(default)]
63 pub max_total_tokens: Option<u64>,
64 #[serde(default)]
66 pub max_cost_usd: Option<f64>,
67 #[serde(default)]
69 pub max_agents: Option<u32>,
70}
71
72impl BudgetLimits {
73 pub fn is_unbounded(&self) -> bool {
75 self.max_input_tokens.is_none()
76 && self.max_output_tokens.is_none()
77 && self.max_total_tokens.is_none()
78 && self.max_cost_usd.is_none()
79 && self.max_agents.is_none()
80 }
81}
82
83fn usd_to_micros(usd: f64) -> u64 {
86 if !usd.is_finite() || usd <= 0.0 {
87 return 0;
88 }
89 (usd * 1_000_000.0).round() as u64
90}
91
92fn micros_to_usd(micros: u64) -> f64 {
93 micros as f64 / 1_000_000.0
94}
95
96#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
98pub struct BudgetSnapshot {
99 pub input_tokens: u64,
100 pub output_tokens: u64,
101 pub total_tokens: u64,
102 pub cost_usd: f64,
103 pub agents_started: u32,
104 pub limits: BudgetLimits,
105 pub exhausted: bool,
110}
111
112#[derive(Debug, Clone, PartialEq)]
114pub enum BudgetError {
115 Exhausted {
117 reason: String,
118 snapshot: BudgetSnapshot,
119 },
120 AgentLimit { max: u32, started: u32 },
122}
123
124impl std::fmt::Display for BudgetError {
125 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126 match self {
127 BudgetError::Exhausted { reason, .. } => {
128 write!(f, "coordination budget exhausted: {}", reason)
129 }
130 BudgetError::AgentLimit { max, started } => write!(
131 f,
132 "coordination budget agent cap reached: {} of {} agents already started",
133 started, max
134 ),
135 }
136 }
137}
138
139impl std::error::Error for BudgetError {}
140
141#[derive(Debug)]
148pub struct CoordinationBudget {
149 limits: BudgetLimits,
150 spent_input: AtomicU64,
151 spent_output: AtomicU64,
152 spent_cost_micros: AtomicU64,
153 agents_started: AtomicU32,
154}
155
156impl CoordinationBudget {
157 pub fn new(limits: BudgetLimits) -> Self {
159 Self {
160 limits,
161 spent_input: AtomicU64::new(0),
162 spent_output: AtomicU64::new(0),
163 spent_cost_micros: AtomicU64::new(0),
164 agents_started: AtomicU32::new(0),
165 }
166 }
167
168 pub fn unbounded() -> Self {
172 Self::new(BudgetLimits::default())
173 }
174
175 pub fn is_unbounded(&self) -> bool {
177 self.limits.is_unbounded()
178 }
179
180 fn spend_exhaustion(&self) -> Option<String> {
183 let input = self.spent_input.load(Ordering::Relaxed);
184 let output = self.spent_output.load(Ordering::Relaxed);
185 let cost_micros = self.spent_cost_micros.load(Ordering::Relaxed);
186
187 if let Some(max) = self.limits.max_input_tokens {
188 if input >= max {
189 return Some(format!("input tokens {} >= limit {}", input, max));
190 }
191 }
192 if let Some(max) = self.limits.max_output_tokens {
193 if output >= max {
194 return Some(format!("output tokens {} >= limit {}", output, max));
195 }
196 }
197 if let Some(max) = self.limits.max_total_tokens {
198 let total = input.saturating_add(output);
201 if total >= max {
202 return Some(format!("total tokens {} >= limit {}", total, max));
203 }
204 }
205 if let Some(max) = self.limits.max_cost_usd {
206 let cost = micros_to_usd(cost_micros);
207 if cost_micros >= usd_to_micros(max) {
208 return Some(format!("cost ${:.4} >= limit ${:.4}", cost, max));
209 }
210 }
211 None
212 }
213
214 pub fn is_exhausted(&self) -> bool {
216 self.spend_exhaustion().is_some()
217 }
218
219 pub fn try_begin_agent(&self) -> Result<(), BudgetError> {
226 if let Some(reason) = self.spend_exhaustion() {
235 return Err(BudgetError::Exhausted {
236 reason,
237 snapshot: self.snapshot(),
238 });
239 }
240
241 match self.limits.max_agents {
242 None => {
243 self.agents_started.fetch_add(1, Ordering::Relaxed);
244 Ok(())
245 }
246 Some(max) => {
247 let mut current = self.agents_started.load(Ordering::Relaxed);
250 loop {
251 if current >= max {
252 return Err(BudgetError::AgentLimit {
253 max,
254 started: current,
255 });
256 }
257 match self.agents_started.compare_exchange_weak(
258 current,
259 current + 1,
260 Ordering::Relaxed,
261 Ordering::Relaxed,
262 ) {
263 Ok(_) => return Ok(()),
264 Err(observed) => current = observed,
265 }
266 }
267 }
268 }
269 }
270
271 pub fn record(&self, tokens: &TokenAccounting) {
273 self.spent_input
274 .fetch_add(tokens.input_tokens, Ordering::Relaxed);
275 self.spent_output
276 .fetch_add(tokens.output_tokens, Ordering::Relaxed);
277 self.spent_cost_micros
278 .fetch_add(usd_to_micros(tokens.cost_usd), Ordering::Relaxed);
279 }
280
281 pub fn record_output(&self, out: &AgentOutput) {
283 if let Some(tokens) = &out.tokens {
284 self.record(tokens);
285 }
286 }
287
288 pub fn snapshot(&self) -> BudgetSnapshot {
293 let input = self.spent_input.load(Ordering::Relaxed);
294 let output = self.spent_output.load(Ordering::Relaxed);
295 let cost = micros_to_usd(self.spent_cost_micros.load(Ordering::Relaxed));
296 BudgetSnapshot {
297 input_tokens: input,
298 output_tokens: output,
299 total_tokens: input.saturating_add(output),
300 cost_usd: cost,
301 agents_started: self.agents_started.load(Ordering::Relaxed),
302 limits: self.limits.clone(),
303 exhausted: self.spend_exhaustion().is_some(),
304 }
305 }
306}
307
308impl Default for CoordinationBudget {
309 fn default() -> Self {
310 Self::unbounded()
311 }
312}
313
314pub fn budget_skipped_output(name: &str, err: &BudgetError) -> AgentOutput {
322 let reason = err.to_string();
323 let outcome = car_ir::AgentOutcome::give_up(&reason).with_evidence(car_ir::Evidence {
324 kind: car_ir::EvidenceKind::StopReason,
325 description: reason.clone(),
326 data: Some(serde_json::json!({ "budget_skipped": true })),
327 });
328 AgentOutput {
329 name: name.to_string(),
330 answer: String::new(),
331 turns: 0,
332 tool_calls: 0,
333 duration_ms: 0.0,
334 error: Some(reason),
335 outcome: Some(outcome),
336 tokens: None,
337 }
338}
339
340pub fn is_budget_skipped(out: &AgentOutput) -> bool {
344 out.outcome.as_ref().is_some_and(|o| {
345 o.evidence.iter().any(|e| {
346 e.data
347 .as_ref()
348 .and_then(|d| d.get("budget_skipped"))
349 .and_then(|v| v.as_bool())
350 .unwrap_or(false)
351 })
352 })
353}
354
355#[cfg(test)]
356mod tests {
357 use super::*;
358
359 fn toks(input: u64, output: u64, cost: f64) -> TokenAccounting {
360 TokenAccounting::new(input, output, cost)
361 }
362
363 #[test]
364 fn unbounded_never_denies() {
365 let b = CoordinationBudget::unbounded();
366 assert!(b.is_unbounded());
367 for _ in 0..1000 {
368 assert!(b.try_begin_agent().is_ok());
369 b.record(&toks(10_000, 10_000, 100.0));
370 }
371 assert!(!b.is_exhausted());
372 }
373
374 #[test]
375 fn total_token_limit_stops_next_spawn() {
376 let b = CoordinationBudget::new(BudgetLimits {
377 max_total_tokens: Some(100),
378 ..Default::default()
379 });
380 assert!(b.try_begin_agent().is_ok());
381 b.record(&toks(60, 50, 0.0)); match b.try_begin_agent() {
383 Err(BudgetError::Exhausted { reason, snapshot }) => {
384 assert!(reason.contains("total tokens"));
385 assert_eq!(snapshot.total_tokens, 110);
386 assert!(snapshot.exhausted);
387 }
388 other => panic!("expected Exhausted, got {:?}", other),
389 }
390 }
391
392 #[test]
393 fn input_and_output_limits_are_independent() {
394 let b = CoordinationBudget::new(BudgetLimits {
395 max_output_tokens: Some(50),
396 ..Default::default()
397 });
398 assert!(b.try_begin_agent().is_ok());
399 b.record(&toks(10_000, 10, 0.0)); assert!(b.try_begin_agent().is_ok(), "input spend must not trip output cap");
401 b.record(&toks(0, 60, 0.0)); assert!(matches!(
403 b.try_begin_agent(),
404 Err(BudgetError::Exhausted { .. })
405 ));
406 }
407
408 #[test]
409 fn cost_limit_enforced() {
410 let b = CoordinationBudget::new(BudgetLimits {
411 max_cost_usd: Some(1.0),
412 ..Default::default()
413 });
414 assert!(b.try_begin_agent().is_ok());
415 b.record(&toks(0, 0, 0.99));
416 assert!(b.try_begin_agent().is_ok());
417 b.record(&toks(0, 0, 0.02)); assert!(matches!(
419 b.try_begin_agent(),
420 Err(BudgetError::Exhausted { .. })
421 ));
422 }
423
424 #[test]
425 fn agent_cap_enforced_exactly() {
426 let b = CoordinationBudget::new(BudgetLimits {
427 max_agents: Some(3),
428 ..Default::default()
429 });
430 assert!(b.try_begin_agent().is_ok());
431 assert!(b.try_begin_agent().is_ok());
432 assert!(b.try_begin_agent().is_ok());
433 match b.try_begin_agent() {
434 Err(BudgetError::AgentLimit { max, started }) => {
435 assert_eq!(max, 3);
436 assert_eq!(started, 3);
437 }
438 other => panic!("expected AgentLimit, got {:?}", other),
439 }
440 assert_eq!(b.snapshot().agents_started, 3);
442 }
443
444 #[test]
445 fn spend_limit_takes_priority_over_agent_cap() {
446 let b = CoordinationBudget::new(BudgetLimits {
447 max_total_tokens: Some(10),
448 max_agents: Some(100),
449 ..Default::default()
450 });
451 assert!(b.try_begin_agent().is_ok());
452 b.record(&toks(20, 0, 0.0));
453 assert!(matches!(
454 b.try_begin_agent(),
455 Err(BudgetError::Exhausted { .. })
456 ));
457 }
458
459 #[test]
460 fn skipped_output_is_detectable_without_string_matching() {
461 let err = BudgetError::AgentLimit {
462 max: 2,
463 started: 2,
464 };
465 let out = budget_skipped_output("worker", &err);
466 assert!(super::is_budget_skipped(&out));
467 assert!(!out.succeeded());
468 assert_eq!(
469 out.outcome.as_ref().unwrap().status,
470 car_ir::OutcomeStatus::GiveUp
471 );
472 let normal = AgentOutput {
474 name: "x".into(),
475 answer: "done".into(),
476 turns: 1,
477 tool_calls: 0,
478 duration_ms: 1.0,
479 error: None,
480 outcome: Some(car_ir::AgentOutcome::success("ok")),
481 tokens: None,
482 };
483 assert!(!super::is_budget_skipped(&normal));
484 }
485
486 #[test]
487 fn huge_reported_tokens_do_not_panic() {
488 let b = CoordinationBudget::new(BudgetLimits {
489 max_total_tokens: Some(100),
490 ..Default::default()
491 });
492 b.record(&toks(u64::MAX, u64::MAX, 0.0));
494 assert!(b.is_exhausted());
495 assert_eq!(b.snapshot().total_tokens, u64::MAX);
496 }
497
498 #[test]
499 fn concurrent_agent_cap_never_over_reserves() {
500 use std::sync::Arc;
501 let b = Arc::new(CoordinationBudget::new(BudgetLimits {
502 max_agents: Some(50),
503 ..Default::default()
504 }));
505 let mut handles = Vec::new();
506 for _ in 0..16 {
507 let b = Arc::clone(&b);
508 handles.push(std::thread::spawn(move || {
509 let mut ok = 0;
510 for _ in 0..100 {
511 if b.try_begin_agent().is_ok() {
512 ok += 1;
513 }
514 }
515 ok
516 }));
517 }
518 let granted: u32 = handles.into_iter().map(|h| h.join().unwrap()).sum();
519 assert_eq!(granted, 50);
520 assert_eq!(b.snapshot().agents_started, 50);
521 }
522}