Skip to main content

actionqueue_budget/
tracker.rs

1//! Per-task budget state tracking.
2//!
3//! Tracks in-memory budget state derived from durable WAL events. The tracker
4//! is the authoritative in-memory view of consumption against allocations.
5//! The dispatch loop consults the tracker to gate dispatch and determine when
6//! to signal suspension.
7
8use std::collections::HashMap;
9
10use actionqueue_core::budget::BudgetDimension;
11use actionqueue_core::ids::TaskId;
12use tracing;
13
14/// In-memory state for one (task, dimension) budget entry.
15#[derive(Debug, Clone)]
16pub struct BudgetState {
17    /// Maximum consumption allowed before dispatch is blocked.
18    pub limit: u64,
19    /// Total consumption recorded so far.
20    pub consumed: u64,
21    /// True once consumption has reached or exceeded the limit.
22    pub exhausted: bool,
23}
24
25impl BudgetState {
26    fn new(limit: u64) -> Self {
27        Self { limit, consumed: 0, exhausted: false }
28    }
29}
30
31/// Result of a consume operation.
32#[derive(Debug, Clone, PartialEq, Eq)]
33pub enum ConsumeResult {
34    /// Consumption recorded; budget has not been exhausted.
35    WithinBudget,
36    /// Consumption pushed total past the limit.
37    Exhausted {
38        /// Amount consumed beyond the limit.
39        overage: u64,
40    },
41}
42
43/// In-memory per-task budget tracker.
44///
45/// Reconstructed from WAL events at bootstrap via [`BudgetTracker::allocate`]
46/// and [`BudgetTracker::consume`] calls. The dispatch loop updates the tracker
47/// each tick as WorkerResults arrive.
48#[derive(Debug, Default)]
49pub struct BudgetTracker {
50    budgets: HashMap<(TaskId, BudgetDimension), BudgetState>,
51}
52
53impl BudgetTracker {
54    /// Creates an empty tracker.
55    pub fn new() -> Self {
56        Self::default()
57    }
58
59    /// Registers a budget allocation for a task/dimension pair.
60    ///
61    /// If an allocation already exists for this pair, it is replaced (re-allocation).
62    pub fn allocate(&mut self, task_id: TaskId, dimension: BudgetDimension, limit: u64) {
63        tracing::debug!(%task_id, %dimension, limit, "budget allocated");
64        self.budgets.insert((task_id, dimension), BudgetState::new(limit));
65    }
66
67    /// Records consumption and returns whether the budget was exhausted by this call.
68    ///
69    /// No-ops if no allocation exists for the (task, dimension) pair.
70    pub fn consume(
71        &mut self,
72        task_id: TaskId,
73        dimension: BudgetDimension,
74        amount: u64,
75    ) -> ConsumeResult {
76        let Some(state) = self.budgets.get_mut(&(task_id, dimension)) else {
77            return ConsumeResult::WithinBudget;
78        };
79
80        tracing::debug!(%task_id, %dimension, amount, "budget consumption recorded");
81        state.consumed = state.consumed.saturating_add(amount);
82        if state.consumed >= state.limit {
83            let overage = state.consumed.saturating_sub(state.limit);
84            state.exhausted = true;
85            tracing::warn!(
86                %task_id, %dimension,
87                consumed = state.consumed, limit = state.limit,
88                "budget exhausted"
89            );
90            ConsumeResult::Exhausted { overage }
91        } else {
92            ConsumeResult::WithinBudget
93        }
94    }
95
96    /// Replenishes a budget: sets new limit and clears the exhausted flag.
97    pub fn replenish(&mut self, task_id: TaskId, dimension: BudgetDimension, new_limit: u64) {
98        if let Some(state) = self.budgets.get_mut(&(task_id, dimension)) {
99            tracing::debug!(%task_id, %dimension, new_limit, "budget replenished");
100            state.limit = new_limit;
101            state.consumed = 0;
102            state.exhausted = false;
103        }
104    }
105
106    /// Returns true if the given dimension budget is exhausted for this task.
107    pub fn is_exhausted(&self, task_id: TaskId, dimension: BudgetDimension) -> bool {
108        self.budgets.get(&(task_id, dimension)).is_some_and(|s| s.exhausted)
109    }
110
111    /// Returns true if ANY dimension budget is exhausted for this task.
112    pub fn is_any_exhausted(&self, task_id: TaskId) -> bool {
113        [BudgetDimension::Token, BudgetDimension::CostCents, BudgetDimension::TimeSecs]
114            .iter()
115            .any(|&dim| self.is_exhausted(task_id, dim))
116    }
117
118    /// Returns the remaining budget for a (task, dimension) pair.
119    ///
120    /// Returns `None` if no allocation exists.
121    pub fn remaining(&self, task_id: TaskId, dimension: BudgetDimension) -> Option<u64> {
122        self.budgets.get(&(task_id, dimension)).map(|s| s.limit.saturating_sub(s.consumed))
123    }
124
125    /// Returns consumed amount as a percentage of the limit (0-100).
126    ///
127    /// Returns `None` if no allocation exists.
128    pub fn threshold_pct(&self, task_id: TaskId, dimension: BudgetDimension) -> Option<u8> {
129        self.budgets.get(&(task_id, dimension)).map(|s| {
130            if s.limit == 0 {
131                100
132            } else {
133                ((s.consumed.min(s.limit) * 100) / s.limit) as u8
134            }
135        })
136    }
137
138    /// Returns the state for a (task, dimension) pair.
139    pub fn get(&self, task_id: TaskId, dimension: BudgetDimension) -> Option<&BudgetState> {
140        self.budgets.get(&(task_id, dimension))
141    }
142
143    /// Removes all budget state for a fully-terminal task.
144    ///
145    /// Called by the dispatch loop after a task reaches terminal state.
146    /// Safe to call because terminal tasks no longer dispatch runs that
147    /// would consume budget or check exhaustion.
148    pub fn gc_task(&mut self, task_id: TaskId) {
149        self.budgets.retain(|&(tid, _), _| tid != task_id);
150    }
151}
152
153#[cfg(test)]
154mod tests {
155    use actionqueue_core::budget::BudgetDimension;
156    use actionqueue_core::ids::TaskId;
157
158    use super::{BudgetTracker, ConsumeResult};
159
160    #[test]
161    fn allocate_and_consume_within_budget() {
162        let mut tracker = BudgetTracker::new();
163        let task = TaskId::new();
164        tracker.allocate(task, BudgetDimension::Token, 1000);
165        let result = tracker.consume(task, BudgetDimension::Token, 500);
166        assert_eq!(result, ConsumeResult::WithinBudget);
167        assert!(!tracker.is_exhausted(task, BudgetDimension::Token));
168        assert_eq!(tracker.remaining(task, BudgetDimension::Token), Some(500));
169    }
170
171    #[test]
172    fn consume_exhausts_budget() {
173        let mut tracker = BudgetTracker::new();
174        let task = TaskId::new();
175        tracker.allocate(task, BudgetDimension::Token, 500);
176        let result = tracker.consume(task, BudgetDimension::Token, 500);
177        assert_eq!(result, ConsumeResult::Exhausted { overage: 0 });
178        assert!(tracker.is_exhausted(task, BudgetDimension::Token));
179        assert!(tracker.is_any_exhausted(task));
180    }
181
182    #[test]
183    fn consume_over_limit_reports_overage() {
184        let mut tracker = BudgetTracker::new();
185        let task = TaskId::new();
186        tracker.allocate(task, BudgetDimension::CostCents, 100);
187        let result = tracker.consume(task, BudgetDimension::CostCents, 150);
188        assert_eq!(result, ConsumeResult::Exhausted { overage: 50 });
189    }
190
191    #[test]
192    fn replenish_clears_exhausted_flag() {
193        let mut tracker = BudgetTracker::new();
194        let task = TaskId::new();
195        tracker.allocate(task, BudgetDimension::Token, 100);
196        tracker.consume(task, BudgetDimension::Token, 100);
197        assert!(tracker.is_exhausted(task, BudgetDimension::Token));
198        tracker.replenish(task, BudgetDimension::Token, 200);
199        assert!(!tracker.is_exhausted(task, BudgetDimension::Token));
200        assert_eq!(tracker.remaining(task, BudgetDimension::Token), Some(200));
201    }
202
203    #[test]
204    fn threshold_pct_returns_correct_percentage() {
205        let mut tracker = BudgetTracker::new();
206        let task = TaskId::new();
207        tracker.allocate(task, BudgetDimension::Token, 200);
208        tracker.consume(task, BudgetDimension::Token, 100);
209        assert_eq!(tracker.threshold_pct(task, BudgetDimension::Token), Some(50));
210    }
211
212    #[test]
213    fn no_allocation_returns_within_budget() {
214        let mut tracker = BudgetTracker::new();
215        let task = TaskId::new();
216        let result = tracker.consume(task, BudgetDimension::Token, 100);
217        assert_eq!(result, ConsumeResult::WithinBudget);
218    }
219
220    #[test]
221    fn threshold_pct_zero_limit_returns_100() {
222        let mut tracker = BudgetTracker::new();
223        let task = TaskId::new();
224        tracker.allocate(task, BudgetDimension::Token, 0);
225        assert_eq!(tracker.threshold_pct(task, BudgetDimension::Token), Some(100));
226    }
227
228    #[test]
229    fn threshold_pct_no_consumption_returns_zero() {
230        let mut tracker = BudgetTracker::new();
231        let task = TaskId::new();
232        tracker.allocate(task, BudgetDimension::Token, 100);
233        assert_eq!(tracker.threshold_pct(task, BudgetDimension::Token), Some(0));
234    }
235
236    #[test]
237    fn threshold_pct_full_consumption_returns_100() {
238        let mut tracker = BudgetTracker::new();
239        let task = TaskId::new();
240        tracker.allocate(task, BudgetDimension::CostCents, 200);
241        tracker.consume(task, BudgetDimension::CostCents, 200);
242        assert_eq!(tracker.threshold_pct(task, BudgetDimension::CostCents), Some(100));
243    }
244
245    #[test]
246    fn threshold_pct_over_consumption_capped_at_100() {
247        let mut tracker = BudgetTracker::new();
248        let task = TaskId::new();
249        tracker.allocate(task, BudgetDimension::TimeSecs, 100);
250        tracker.consume(task, BudgetDimension::TimeSecs, 200);
251        assert_eq!(tracker.threshold_pct(task, BudgetDimension::TimeSecs), Some(100));
252    }
253
254    #[test]
255    fn threshold_pct_no_allocation_returns_none() {
256        let tracker = BudgetTracker::new();
257        let task = TaskId::new();
258        assert_eq!(tracker.threshold_pct(task, BudgetDimension::Token), None);
259    }
260
261    #[test]
262    fn gc_task_removes_all_dimensions() {
263        let mut tracker = BudgetTracker::new();
264        let task = TaskId::new();
265        tracker.allocate(task, BudgetDimension::Token, 100);
266        tracker.allocate(task, BudgetDimension::CostCents, 200);
267        tracker.gc_task(task);
268        assert!(tracker.get(task, BudgetDimension::Token).is_none());
269        assert!(tracker.get(task, BudgetDimension::CostCents).is_none());
270    }
271
272    #[test]
273    fn gc_task_does_not_affect_other_tasks() {
274        let mut tracker = BudgetTracker::new();
275        let task1 = TaskId::new();
276        let task2 = TaskId::new();
277        tracker.allocate(task1, BudgetDimension::Token, 100);
278        tracker.allocate(task2, BudgetDimension::Token, 200);
279        tracker.gc_task(task1);
280        assert!(tracker.get(task1, BudgetDimension::Token).is_none());
281        assert!(tracker.get(task2, BudgetDimension::Token).is_some());
282    }
283
284    #[test]
285    fn gc_task_is_idempotent() {
286        let mut tracker = BudgetTracker::new();
287        let task = TaskId::new();
288        tracker.allocate(task, BudgetDimension::Token, 100);
289        tracker.gc_task(task);
290        tracker.gc_task(task); // must not panic
291    }
292
293    #[test]
294    fn is_any_exhausted_checks_all_dimensions() {
295        let mut tracker = BudgetTracker::new();
296        let task = TaskId::new();
297        tracker.allocate(task, BudgetDimension::Token, 100);
298        tracker.allocate(task, BudgetDimension::CostCents, 100);
299        tracker.consume(task, BudgetDimension::Token, 100);
300        assert!(tracker.is_any_exhausted(task));
301        assert!(!tracker.is_exhausted(task, BudgetDimension::CostCents));
302    }
303}