Skip to main content

car_multi/
budget.rs

1//! Runtime-enforced coordination budget.
2//!
3//! [`TokenAccounting`](crate::types::TokenAccounting) is *self-reported* by an
4//! `AgentRunner` and explicitly not a trust boundary. This module adds the other
5//! half: a shared, runtime-owned ceiling that a coordination pattern checks
6//! **before spawning each agent**. Once the reported spend crosses a limit, the
7//! runtime refuses to start further agents.
8//!
9//! ## What "enforced" means here
10//!
11//! The runtime does not meter tokens itself — it sums what runners report and
12//! gates the *next* spawn on that running total. So the contract is:
13//!
14//! > Once a limit is crossed, no new agent is started.
15//!
16//! Overshoot is therefore bounded by at most one agent's (or, for a parallel
17//! batch, one batch's) worth of spend — the in-flight work that was already
18//! launched when the limit was crossed. For sequential and iterative patterns
19//! (sequential swarm, pipeline, supervisor rounds, debate's second round) this
20//! gives tight, between-agent enforcement. For a single parallel batch it acts
21//! as a pre-flight gate plus the hard `max_agents` cap. This is the honest
22//! ceiling a deterministic runtime can offer over a model it does not own.
23//!
24//! ## Deny-vs-fallback policy
25//!
26//! Patterns react to a denied spawn by one of two rules, chosen by whether the
27//! denied agent is load-bearing:
28//!
29//! - **Optional finalizers** (swarm/vote synthesizer, map_reduce reducer) fall
30//!   back to a non-agent combination (concatenation, first vote) — the caller
31//!   still gets the work already paid for.
32//! - **Load-bearing agents** (adversarial reviewer, supervisor reviewer) cause
33//!   the run to terminate with a budget-exhausted signal rather than return a
34//!   baseless pass/fail.
35//!
36//! The top-level *entry* agent a caller explicitly invokes (the main agent of
37//! `Delegator` / `SpawnSubtask`) is intentionally **not** gated — gating it
38//! would let a zero-budget config refuse the whole run before any delegation.
39//! Budgets cap that agent's spawned children, not the agent itself.
40//!
41//! Spend reported by an agent that then *fails* (runner `Err`, join panic) is
42//! not metered — the error path carries no token payload — so the ceiling can
43//! under-count failed work. Integration sites note this where it applies.
44
45use crate::types::{AgentOutput, TokenAccounting};
46use serde::{Deserialize, Serialize};
47use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
48
49/// Caps on what a single coordination run may consume.
50///
51/// Every field is optional; `None` means "no limit on this dimension". A
52/// [`BudgetLimits`] with all fields `None` is unbounded and imposes no overhead.
53#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
54pub struct BudgetLimits {
55    /// Cap on cumulative reported input (prompt) tokens.
56    #[serde(default)]
57    pub max_input_tokens: Option<u64>,
58    /// Cap on cumulative reported output (completion) tokens.
59    #[serde(default)]
60    pub max_output_tokens: Option<u64>,
61    /// Cap on cumulative reported input + output tokens.
62    #[serde(default)]
63    pub max_total_tokens: Option<u64>,
64    /// Cap on cumulative reported cost in US dollars.
65    #[serde(default)]
66    pub max_cost_usd: Option<f64>,
67    /// Hard cap on the number of agents started, regardless of token spend.
68    #[serde(default)]
69    pub max_agents: Option<u32>,
70}
71
72impl BudgetLimits {
73    /// True when no dimension is constrained — the budget is a no-op.
74    pub fn is_unbounded(&self) -> bool {
75        self.max_input_tokens.is_none()
76            && self.max_output_tokens.is_none()
77            && self.max_total_tokens.is_none()
78            && self.max_cost_usd.is_none()
79            && self.max_agents.is_none()
80    }
81}
82
83/// Cost is accumulated as integer micro-dollars so the running total stays in an
84/// atomic. `$1.00` == `1_000_000` micros.
85fn usd_to_micros(usd: f64) -> u64 {
86    if !usd.is_finite() || usd <= 0.0 {
87        return 0;
88    }
89    (usd * 1_000_000.0).round() as u64
90}
91
92fn micros_to_usd(micros: u64) -> f64 {
93    micros as f64 / 1_000_000.0
94}
95
96/// A point-in-time read of a [`CoordinationBudget`]'s spend and limits.
97#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
98pub struct BudgetSnapshot {
99    pub input_tokens: u64,
100    pub output_tokens: u64,
101    pub total_tokens: u64,
102    pub cost_usd: f64,
103    pub agents_started: u32,
104    pub limits: BudgetLimits,
105    /// True when a *spend* limit (tokens/cost) is already crossed. This does not
106    /// reflect the `max_agents` cap, which is checked atomically at spawn time —
107    /// a budget limited only by `max_agents` can deny the next spawn while this
108    /// flag is still `false`.
109    pub exhausted: bool,
110}
111
112/// Why a coordination budget refused to start an agent.
113#[derive(Debug, Clone, PartialEq)]
114pub enum BudgetError {
115    /// A token or cost limit was already crossed before this agent could start.
116    Exhausted {
117        reason: String,
118        snapshot: BudgetSnapshot,
119    },
120    /// Starting another agent would exceed the `max_agents` cap.
121    AgentLimit { max: u32, started: u32 },
122}
123
124impl std::fmt::Display for BudgetError {
125    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126        match self {
127            BudgetError::Exhausted { reason, .. } => {
128                write!(f, "coordination budget exhausted: {}", reason)
129            }
130            BudgetError::AgentLimit { max, started } => write!(
131                f,
132                "coordination budget agent cap reached: {} of {} agents already started",
133                started, max
134            ),
135        }
136    }
137}
138
139impl std::error::Error for BudgetError {}
140
141/// A shared, runtime-owned spend ceiling for one coordination run.
142///
143/// Patterns hold an `Arc<CoordinationBudget>` (via [`SharedInfra`](crate::SharedInfra))
144/// and call [`try_begin_agent`](Self::try_begin_agent) before each spawn and
145/// [`record_output`](Self::record_output) after each agent returns. All
146/// accounting is lock-free.
147#[derive(Debug)]
148pub struct CoordinationBudget {
149    limits: BudgetLimits,
150    spent_input: AtomicU64,
151    spent_output: AtomicU64,
152    spent_cost_micros: AtomicU64,
153    agents_started: AtomicU32,
154}
155
156impl CoordinationBudget {
157    /// A budget with the given limits.
158    pub fn new(limits: BudgetLimits) -> Self {
159        Self {
160            limits,
161            spent_input: AtomicU64::new(0),
162            spent_output: AtomicU64::new(0),
163            spent_cost_micros: AtomicU64::new(0),
164            agents_started: AtomicU32::new(0),
165        }
166    }
167
168    /// A budget that never denies anything. Used as the default in
169    /// [`SharedInfra::new`](crate::SharedInfra::new) so the budget code path is
170    /// always present but free when no limits are set.
171    pub fn unbounded() -> Self {
172        Self::new(BudgetLimits::default())
173    }
174
175    /// True when this budget imposes no limits.
176    pub fn is_unbounded(&self) -> bool {
177        self.limits.is_unbounded()
178    }
179
180    /// The reason a spend limit is crossed, if any (ignores the agent-count cap,
181    /// which is checked atomically at spawn time).
182    fn spend_exhaustion(&self) -> Option<String> {
183        let input = self.spent_input.load(Ordering::Relaxed);
184        let output = self.spent_output.load(Ordering::Relaxed);
185        let cost_micros = self.spent_cost_micros.load(Ordering::Relaxed);
186
187        if let Some(max) = self.limits.max_input_tokens {
188            if input >= max {
189                return Some(format!("input tokens {} >= limit {}", input, max));
190            }
191        }
192        if let Some(max) = self.limits.max_output_tokens {
193            if output >= max {
194                return Some(format!("output tokens {} >= limit {}", output, max));
195            }
196        }
197        if let Some(max) = self.limits.max_total_tokens {
198            // Saturating: counters are summed from self-reported (untrusted)
199            // tokens, so a plain `+` could overflow and panic in debug builds.
200            let total = input.saturating_add(output);
201            if total >= max {
202                return Some(format!("total tokens {} >= limit {}", total, max));
203            }
204        }
205        if let Some(max) = self.limits.max_cost_usd {
206            let cost = micros_to_usd(cost_micros);
207            if cost_micros >= usd_to_micros(max) {
208                return Some(format!("cost ${:.4} >= limit ${:.4}", cost, max));
209            }
210        }
211        None
212    }
213
214    /// True when a spend limit is already crossed.
215    pub fn is_exhausted(&self) -> bool {
216        self.spend_exhaustion().is_some()
217    }
218
219    /// Attempt to start one more agent.
220    ///
221    /// Returns `Ok(())` and atomically reserves an agent slot when within budget;
222    /// returns the relevant [`BudgetError`] otherwise without reserving. Spend
223    /// limits are checked first (so a crossed token/cost ceiling reports
224    /// `Exhausted`, not `AgentLimit`).
225    pub fn try_begin_agent(&self) -> Result<(), BudgetError> {
226        // The spend check and the agent-cap CAS below are intentionally not one
227        // atomic step. Between them, concurrent `record()` calls from in-flight
228        // agents can push spend over a limit, so two concurrent callers can both
229        // pass this check and both reserve a slot. That is exactly the documented
230        // "overshoot bounded by in-flight work" contract — do not wrap it in a
231        // lock. (In a parallel batch all `try_begin_agent` calls run before any
232        // `record`, so spend is still zero here; in sequential patterns this is
233        // single-threaded.)
234        if let Some(reason) = self.spend_exhaustion() {
235            return Err(BudgetError::Exhausted {
236                reason,
237                snapshot: self.snapshot(),
238            });
239        }
240
241        match self.limits.max_agents {
242            None => {
243                self.agents_started.fetch_add(1, Ordering::Relaxed);
244                Ok(())
245            }
246            Some(max) => {
247                // CAS loop: only reserve a slot if strictly below the cap, so
248                // concurrent spawns can never over-reserve.
249                let mut current = self.agents_started.load(Ordering::Relaxed);
250                loop {
251                    if current >= max {
252                        return Err(BudgetError::AgentLimit {
253                            max,
254                            started: current,
255                        });
256                    }
257                    match self.agents_started.compare_exchange_weak(
258                        current,
259                        current + 1,
260                        Ordering::Relaxed,
261                        Ordering::Relaxed,
262                    ) {
263                        Ok(_) => return Ok(()),
264                        Err(observed) => current = observed,
265                    }
266                }
267            }
268        }
269    }
270
271    /// Add reported spend to the running totals.
272    pub fn record(&self, tokens: &TokenAccounting) {
273        self.spent_input
274            .fetch_add(tokens.input_tokens, Ordering::Relaxed);
275        self.spent_output
276            .fetch_add(tokens.output_tokens, Ordering::Relaxed);
277        self.spent_cost_micros
278            .fetch_add(usd_to_micros(tokens.cost_usd), Ordering::Relaxed);
279    }
280
281    /// Record an agent's reported spend, if it carried any.
282    pub fn record_output(&self, out: &AgentOutput) {
283        if let Some(tokens) = &out.tokens {
284            self.record(tokens);
285        }
286    }
287
288    /// A read of current spend and limits. The four counters are loaded
289    /// independently (Relaxed), so a snapshot taken during concurrent `record()`
290    /// calls may tear across counters. That is fine for observability — each
291    /// counter is a monotonic sum and the gate tolerates in-flight overshoot.
292    pub fn snapshot(&self) -> BudgetSnapshot {
293        let input = self.spent_input.load(Ordering::Relaxed);
294        let output = self.spent_output.load(Ordering::Relaxed);
295        let cost = micros_to_usd(self.spent_cost_micros.load(Ordering::Relaxed));
296        BudgetSnapshot {
297            input_tokens: input,
298            output_tokens: output,
299            total_tokens: input.saturating_add(output),
300            cost_usd: cost,
301            agents_started: self.agents_started.load(Ordering::Relaxed),
302            limits: self.limits.clone(),
303            exhausted: self.spend_exhaustion().is_some(),
304        }
305    }
306}
307
308impl Default for CoordinationBudget {
309    fn default() -> Self {
310        Self::unbounded()
311    }
312}
313
314/// Build the placeholder [`AgentOutput`] recorded for an agent the budget
315/// refused to start, so result shapes stay intact and the skip is observable.
316///
317/// The output carries a structured [`car_ir::AgentOutcome`] with status
318/// `GiveUp` and a `StopReason` evidence tagged `{"budget_skipped": true}`, so
319/// callers can detect a budget skip deterministically via the outcome rather
320/// than by string-matching the `error` text.
321pub fn budget_skipped_output(name: &str, err: &BudgetError) -> AgentOutput {
322    let reason = err.to_string();
323    let outcome = car_ir::AgentOutcome::give_up(&reason).with_evidence(car_ir::Evidence {
324        kind: car_ir::EvidenceKind::StopReason,
325        description: reason.clone(),
326        data: Some(serde_json::json!({ "budget_skipped": true })),
327    });
328    AgentOutput {
329        name: name.to_string(),
330        answer: String::new(),
331        turns: 0,
332        tool_calls: 0,
333        duration_ms: 0.0,
334        error: Some(reason),
335        outcome: Some(outcome),
336        tokens: None,
337    }
338}
339
340/// True when `out` was produced by [`budget_skipped_output`] — i.e. the budget
341/// refused to start this agent. Lets callers branch on budget skips without
342/// inspecting error strings.
343pub fn is_budget_skipped(out: &AgentOutput) -> bool {
344    out.outcome.as_ref().is_some_and(|o| {
345        o.evidence.iter().any(|e| {
346            e.data
347                .as_ref()
348                .and_then(|d| d.get("budget_skipped"))
349                .and_then(|v| v.as_bool())
350                .unwrap_or(false)
351        })
352    })
353}
354
355#[cfg(test)]
356mod tests {
357    use super::*;
358
359    fn toks(input: u64, output: u64, cost: f64) -> TokenAccounting {
360        TokenAccounting::new(input, output, cost)
361    }
362
363    #[test]
364    fn unbounded_never_denies() {
365        let b = CoordinationBudget::unbounded();
366        assert!(b.is_unbounded());
367        for _ in 0..1000 {
368            assert!(b.try_begin_agent().is_ok());
369            b.record(&toks(10_000, 10_000, 100.0));
370        }
371        assert!(!b.is_exhausted());
372    }
373
374    #[test]
375    fn total_token_limit_stops_next_spawn() {
376        let b = CoordinationBudget::new(BudgetLimits {
377            max_total_tokens: Some(100),
378            ..Default::default()
379        });
380        assert!(b.try_begin_agent().is_ok());
381        b.record(&toks(60, 50, 0.0)); // 110 >= 100
382        match b.try_begin_agent() {
383            Err(BudgetError::Exhausted { reason, snapshot }) => {
384                assert!(reason.contains("total tokens"));
385                assert_eq!(snapshot.total_tokens, 110);
386                assert!(snapshot.exhausted);
387            }
388            other => panic!("expected Exhausted, got {:?}", other),
389        }
390    }
391
392    #[test]
393    fn input_and_output_limits_are_independent() {
394        let b = CoordinationBudget::new(BudgetLimits {
395            max_output_tokens: Some(50),
396            ..Default::default()
397        });
398        assert!(b.try_begin_agent().is_ok());
399        b.record(&toks(10_000, 10, 0.0)); // input huge, output under cap
400        assert!(b.try_begin_agent().is_ok(), "input spend must not trip output cap");
401        b.record(&toks(0, 60, 0.0)); // now output over cap
402        assert!(matches!(
403            b.try_begin_agent(),
404            Err(BudgetError::Exhausted { .. })
405        ));
406    }
407
408    #[test]
409    fn cost_limit_enforced() {
410        let b = CoordinationBudget::new(BudgetLimits {
411            max_cost_usd: Some(1.0),
412            ..Default::default()
413        });
414        assert!(b.try_begin_agent().is_ok());
415        b.record(&toks(0, 0, 0.99));
416        assert!(b.try_begin_agent().is_ok());
417        b.record(&toks(0, 0, 0.02)); // 1.01 >= 1.00
418        assert!(matches!(
419            b.try_begin_agent(),
420            Err(BudgetError::Exhausted { .. })
421        ));
422    }
423
424    #[test]
425    fn agent_cap_enforced_exactly() {
426        let b = CoordinationBudget::new(BudgetLimits {
427            max_agents: Some(3),
428            ..Default::default()
429        });
430        assert!(b.try_begin_agent().is_ok());
431        assert!(b.try_begin_agent().is_ok());
432        assert!(b.try_begin_agent().is_ok());
433        match b.try_begin_agent() {
434            Err(BudgetError::AgentLimit { max, started }) => {
435                assert_eq!(max, 3);
436                assert_eq!(started, 3);
437            }
438            other => panic!("expected AgentLimit, got {:?}", other),
439        }
440        // A denied spawn must not consume a slot.
441        assert_eq!(b.snapshot().agents_started, 3);
442    }
443
444    #[test]
445    fn spend_limit_takes_priority_over_agent_cap() {
446        let b = CoordinationBudget::new(BudgetLimits {
447            max_total_tokens: Some(10),
448            max_agents: Some(100),
449            ..Default::default()
450        });
451        assert!(b.try_begin_agent().is_ok());
452        b.record(&toks(20, 0, 0.0));
453        assert!(matches!(
454            b.try_begin_agent(),
455            Err(BudgetError::Exhausted { .. })
456        ));
457    }
458
459    #[test]
460    fn skipped_output_is_detectable_without_string_matching() {
461        let err = BudgetError::AgentLimit {
462            max: 2,
463            started: 2,
464        };
465        let out = budget_skipped_output("worker", &err);
466        assert!(super::is_budget_skipped(&out));
467        assert!(!out.succeeded());
468        assert_eq!(
469            out.outcome.as_ref().unwrap().status,
470            car_ir::OutcomeStatus::GiveUp
471        );
472        // A normal output is not mistaken for a budget skip.
473        let normal = AgentOutput {
474            name: "x".into(),
475            answer: "done".into(),
476            turns: 1,
477            tool_calls: 0,
478            duration_ms: 1.0,
479            error: None,
480            outcome: Some(car_ir::AgentOutcome::success("ok")),
481            tokens: None,
482        };
483        assert!(!super::is_budget_skipped(&normal));
484    }
485
486    #[test]
487    fn huge_reported_tokens_do_not_panic() {
488        let b = CoordinationBudget::new(BudgetLimits {
489            max_total_tokens: Some(100),
490            ..Default::default()
491        });
492        // input + output near u64::MAX must saturate, not panic (debug) or wrap.
493        b.record(&toks(u64::MAX, u64::MAX, 0.0));
494        assert!(b.is_exhausted());
495        assert_eq!(b.snapshot().total_tokens, u64::MAX);
496    }
497
498    #[test]
499    fn concurrent_agent_cap_never_over_reserves() {
500        use std::sync::Arc;
501        let b = Arc::new(CoordinationBudget::new(BudgetLimits {
502            max_agents: Some(50),
503            ..Default::default()
504        }));
505        let mut handles = Vec::new();
506        for _ in 0..16 {
507            let b = Arc::clone(&b);
508            handles.push(std::thread::spawn(move || {
509                let mut ok = 0;
510                for _ in 0..100 {
511                    if b.try_begin_agent().is_ok() {
512                        ok += 1;
513                    }
514                }
515                ok
516            }));
517        }
518        let granted: u32 = handles.into_iter().map(|h| h.join().unwrap()).sum();
519        assert_eq!(granted, 50);
520        assert_eq!(b.snapshot().agents_started, 50);
521    }
522}