1#![forbid(unsafe_code)]
9
10use std::collections::HashMap;
11
12use serde::{Deserialize, Serialize};
13
14use shigoto_types::{JobId, JobKindId, JobScope};
15
16#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
17pub struct BudgetSpec {
18 pub max_concurrent: u32,
19 pub max_failures_per_minute: u32,
20 pub queue_depth: u32,
21}
22
23impl BudgetSpec {
24 #[must_use]
27 pub fn max_concurrent(n: u32) -> Self {
28 Self {
29 max_concurrent: n,
30 max_failures_per_minute: u32::MAX,
31 queue_depth: u32::MAX,
32 }
33 }
34}
35
36#[derive(thiserror::Error, Debug, PartialEq, Eq)]
37pub enum BudgetError {
38 #[error("global budget exhausted")]
39 GlobalExhausted,
40 #[error("kind budget exhausted ({0})")]
41 KindExhausted(String),
42 #[error("scope budget exhausted")]
43 ScopeExhausted,
44}
45
46#[derive(Debug, Default)]
49pub struct BudgetTree {
50 pub global: Option<BudgetSpec>,
52 pub by_kind: HashMap<JobKindId, BudgetSpec>,
54 pub by_scope: HashMap<JobScope, BudgetSpec>,
56
57 running_global: u32,
59 running_by_kind: HashMap<JobKindId, u32>,
60 running_by_scope: HashMap<JobScope, u32>,
61}
62
63impl BudgetTree {
64 #[must_use]
65 pub fn new() -> Self {
66 Self::default()
67 }
68
69 pub fn try_allocate(&mut self, id: &JobId) -> Result<(), BudgetError> {
73 if let Some(spec) = &self.global {
75 if self.running_global >= spec.max_concurrent {
76 return Err(BudgetError::GlobalExhausted);
77 }
78 }
79 if let Some(spec) = self.by_kind.get(&id.kind) {
80 let running = self.running_by_kind.get(&id.kind).copied().unwrap_or(0);
81 if running >= spec.max_concurrent {
82 return Err(BudgetError::KindExhausted(id.kind.0.clone()));
83 }
84 }
85 if let Some(spec) = self.by_scope.get(&id.scope) {
86 let running = self.running_by_scope.get(&id.scope).copied().unwrap_or(0);
87 if running >= spec.max_concurrent {
88 return Err(BudgetError::ScopeExhausted);
89 }
90 }
91
92 self.running_global = self.running_global.saturating_add(1);
94 *self.running_by_kind.entry(id.kind.clone()).or_insert(0) += 1;
95 *self.running_by_scope.entry(id.scope.clone()).or_insert(0) += 1;
96 Ok(())
97 }
98
99 pub fn release(&mut self, id: &JobId) {
102 self.running_global = self.running_global.saturating_sub(1);
103 if let Some(c) = self.running_by_kind.get_mut(&id.kind) {
104 *c = c.saturating_sub(1);
105 }
106 if let Some(c) = self.running_by_scope.get_mut(&id.scope) {
107 *c = c.saturating_sub(1);
108 }
109 }
110
111 #[must_use]
113 pub fn running_global(&self) -> u32 {
114 self.running_global
115 }
116
117 #[must_use]
119 pub fn running_kind(&self, kind: &JobKindId) -> u32 {
120 self.running_by_kind.get(kind).copied().unwrap_or(0)
121 }
122
123 #[must_use]
125 pub fn running_scope(&self, scope: &JobScope) -> u32 {
126 self.running_by_scope.get(scope).copied().unwrap_or(0)
127 }
128}
129
130#[cfg(test)]
131mod tests {
132 use super::*;
133 use shigoto_types::{JobKindId, JobScope, JobSubject};
134
135 fn mk_id(kind: &str, scope: JobScope, subject: &str) -> JobId {
136 JobId {
137 scope,
138 kind: JobKindId::new(kind),
139 subject: JobSubject::Pinned(subject.into()),
140 }
141 }
142
143 #[test]
144 fn empty_budget_allocates_freely() {
145 let mut b = BudgetTree::new();
146 let id = mk_id("k", JobScope::Global, "x");
147 assert!(b.try_allocate(&id).is_ok());
148 assert_eq!(b.running_global(), 1);
149 }
150
151 #[test]
152 fn global_limit_blocks_third_allocation() {
153 let mut b = BudgetTree::new();
154 b.global = Some(BudgetSpec::max_concurrent(2));
155 let a = mk_id("k", JobScope::Global, "a");
156 let bb = mk_id("k", JobScope::Global, "b");
157 let c = mk_id("k", JobScope::Global, "c");
158 assert!(b.try_allocate(&a).is_ok());
159 assert!(b.try_allocate(&bb).is_ok());
160 assert_eq!(b.try_allocate(&c), Err(BudgetError::GlobalExhausted));
161 assert_eq!(b.running_global(), 2);
162 }
163
164 #[test]
165 fn kind_limit_isolates_kinds() {
166 let mut b = BudgetTree::new();
167 b.by_kind
168 .insert(JobKindId::new("hot"), BudgetSpec::max_concurrent(1));
169 let h1 = mk_id("hot", JobScope::Global, "x");
170 let h2 = mk_id("hot", JobScope::Global, "y");
171 let cold = mk_id("cold", JobScope::Global, "z");
172 assert!(b.try_allocate(&h1).is_ok());
173 assert_eq!(
174 b.try_allocate(&h2),
175 Err(BudgetError::KindExhausted("hot".into()))
176 );
177 assert!(b.try_allocate(&cold).is_ok());
179 }
180
181 #[test]
182 fn scope_limit_isolates_scopes() {
183 let mut b = BudgetTree::new();
184 let ws = JobScope::Workspace("pleme-io".into());
185 b.by_scope.insert(ws.clone(), BudgetSpec::max_concurrent(1));
186 let a = mk_id("k", ws.clone(), "a");
187 let bb = mk_id("k", ws.clone(), "b");
188 let other = mk_id("k", JobScope::Global, "c");
189 assert!(b.try_allocate(&a).is_ok());
190 assert_eq!(b.try_allocate(&bb), Err(BudgetError::ScopeExhausted));
191 assert!(b.try_allocate(&other).is_ok());
193 }
194
195 #[test]
196 fn min_intersection_narrowest_wins() {
197 let mut b = BudgetTree::new();
199 b.global = Some(BudgetSpec::max_concurrent(10));
200 b.by_kind
201 .insert(JobKindId::new("k"), BudgetSpec::max_concurrent(1));
202 let ws = JobScope::Workspace("pleme-io".into());
203 b.by_scope.insert(ws.clone(), BudgetSpec::max_concurrent(5));
204
205 let a = mk_id("k", ws.clone(), "a");
206 let bb = mk_id("k", ws.clone(), "b");
207 assert!(b.try_allocate(&a).is_ok());
208 assert_eq!(
210 b.try_allocate(&bb),
211 Err(BudgetError::KindExhausted("k".into()))
212 );
213 }
214
215 #[test]
216 fn release_decrements_all_three_counters() {
217 let mut b = BudgetTree::new();
218 b.global = Some(BudgetSpec::max_concurrent(1));
219 let id = mk_id("k", JobScope::Global, "x");
220 assert!(b.try_allocate(&id).is_ok());
221 assert_eq!(b.try_allocate(&id), Err(BudgetError::GlobalExhausted));
223 b.release(&id);
224 assert!(b.try_allocate(&id).is_ok());
226 }
227
228 #[test]
229 fn release_is_saturating_no_underflow() {
230 let mut b = BudgetTree::new();
231 let id = mk_id("k", JobScope::Global, "x");
232 b.release(&id);
234 b.release(&id);
235 assert_eq!(b.running_global(), 0);
236 }
237
238 #[test]
239 fn failed_allocation_does_not_mutate_counters() {
240 let mut b = BudgetTree::new();
242 b.by_kind
243 .insert(JobKindId::new("k"), BudgetSpec::max_concurrent(0));
244
245 let id = mk_id("k", JobScope::Global, "x");
246 assert!(b.try_allocate(&id).is_err());
247 assert_eq!(b.running_global(), 0);
250 }
251}