oxirs_core/sla/
admission_controller.rs1use std::collections::HashMap;
12use std::sync::{Arc, Mutex};
13use std::time::Instant;
14use thiserror::Error;
15
16use super::class::SlaClass;
17
18#[derive(Debug, Error, Clone, PartialEq, Eq)]
24pub enum AdmissionError {
25 #[error("Rate limit exceeded for tenant '{tenant_id}'")]
28 RateLimitExceeded {
29 tenant_id: String,
31 },
32
33 #[error("Tenant '{tenant_id}' is not registered with the admission controller")]
35 TenantNotRegistered {
36 tenant_id: String,
38 },
39}
40
41struct TokenBucket {
46 tokens: f64,
47 capacity: f64,
48 refill_rate: f64, last_refill: Instant,
50 sla_class: SlaClass,
51}
52
53impl TokenBucket {
54 fn new(sla: SlaClass) -> Self {
55 let t = sla.thresholds();
56 TokenBucket {
57 tokens: t.token_bucket_capacity,
58 capacity: t.token_bucket_capacity,
59 refill_rate: t.token_refill_rate,
60 last_refill: Instant::now(),
61 sla_class: sla,
62 }
63 }
64
65 fn refill(&mut self) {
67 let elapsed_secs = self.last_refill.elapsed().as_secs_f64();
68 self.tokens = (self.tokens + elapsed_secs * self.refill_rate).min(self.capacity);
69 self.last_refill = Instant::now();
70 }
71
72 fn try_consume(&mut self, cost: f64) -> bool {
74 self.refill();
75 if self.tokens >= cost {
76 self.tokens -= cost;
77 true
78 } else {
79 false
80 }
81 }
82
83 fn sla_class(&self) -> SlaClass {
84 self.sla_class
85 }
86
87 fn available_tokens(&mut self) -> f64 {
89 self.refill();
90 self.tokens
91 }
92}
93
94#[derive(Clone)]
108pub struct AdmissionController {
109 buckets: Arc<Mutex<HashMap<String, TokenBucket>>>,
110}
111
112impl Default for AdmissionController {
113 fn default() -> Self {
114 Self::new()
115 }
116}
117
118impl AdmissionController {
119 pub fn new() -> Self {
121 AdmissionController {
122 buckets: Arc::new(Mutex::new(HashMap::new())),
123 }
124 }
125
126 pub fn register_tenant(&self, tenant_id: &str, sla: SlaClass) {
130 let mut buckets = self.buckets.lock().unwrap_or_else(|e| e.into_inner());
131 buckets.insert(tenant_id.to_owned(), TokenBucket::new(sla));
132 }
133
134 pub fn try_admit(&self, tenant_id: &str) -> Result<(), AdmissionError> {
138 let mut buckets = self.buckets.lock().unwrap_or_else(|e| e.into_inner());
139 match buckets.get_mut(tenant_id) {
140 Some(bucket) => {
141 if bucket.try_consume(1.0) {
142 Ok(())
143 } else {
144 Err(AdmissionError::RateLimitExceeded {
145 tenant_id: tenant_id.to_owned(),
146 })
147 }
148 }
149 None => Err(AdmissionError::TenantNotRegistered {
150 tenant_id: tenant_id.to_owned(),
151 }),
152 }
153 }
154
155 pub fn try_admit_with_cost(&self, tenant_id: &str, cost: f64) -> Result<(), AdmissionError> {
157 let mut buckets = self.buckets.lock().unwrap_or_else(|e| e.into_inner());
158 match buckets.get_mut(tenant_id) {
159 Some(bucket) => {
160 if bucket.try_consume(cost) {
161 Ok(())
162 } else {
163 Err(AdmissionError::RateLimitExceeded {
164 tenant_id: tenant_id.to_owned(),
165 })
166 }
167 }
168 None => Err(AdmissionError::TenantNotRegistered {
169 tenant_id: tenant_id.to_owned(),
170 }),
171 }
172 }
173
174 pub fn sla_class(&self, tenant_id: &str) -> Option<SlaClass> {
176 let mut buckets = self.buckets.lock().unwrap_or_else(|e| e.into_inner());
177 buckets.get_mut(tenant_id).map(|b| b.sla_class())
178 }
179
180 pub fn available_tokens(&self, tenant_id: &str) -> Option<f64> {
184 let mut buckets = self.buckets.lock().unwrap_or_else(|e| e.into_inner());
185 buckets.get_mut(tenant_id).map(|b| b.available_tokens())
186 }
187
188 pub fn tenant_count(&self) -> usize {
190 self.buckets.lock().unwrap_or_else(|e| e.into_inner()).len()
191 }
192
193 pub fn deregister_tenant(&self, tenant_id: &str) -> bool {
195 self.buckets
196 .lock()
197 .unwrap_or_else(|e| e.into_inner())
198 .remove(tenant_id)
199 .is_some()
200 }
201}
202
203#[cfg(test)]
204mod tests {
205 use super::*;
206
207 #[test]
208 fn test_admit_registered_tenant() {
209 let ctrl = AdmissionController::new();
210 ctrl.register_tenant("t1", SlaClass::Platinum);
211 for _ in 0..10 {
213 assert!(ctrl.try_admit("t1").is_ok());
214 }
215 }
216
217 #[test]
218 fn test_reject_unknown_tenant() {
219 let ctrl = AdmissionController::new();
220 let err = ctrl
221 .try_admit("ghost")
222 .expect_err("ghost is not registered");
223 assert!(matches!(err, AdmissionError::TenantNotRegistered { .. }));
224 }
225
226 #[test]
227 fn test_bronze_bucket_exhausts() {
228 let ctrl = AdmissionController::new();
229 ctrl.register_tenant("bronze", SlaClass::Bronze);
230
231 let mut admitted = 0usize;
233 let mut rejected = 0usize;
234 for _ in 0..10 {
235 if ctrl.try_admit("bronze").is_ok() {
236 admitted += 1;
237 } else {
238 rejected += 1;
239 }
240 }
241 assert!(admitted > 0, "should admit at least the first few");
242 assert!(
243 rejected > 0,
244 "should eventually reject once tokens depleted"
245 );
246 }
247
248 #[test]
249 fn test_deregister_tenant() {
250 let ctrl = AdmissionController::new();
251 ctrl.register_tenant("t2", SlaClass::Gold);
252 assert_eq!(ctrl.tenant_count(), 1);
253 assert!(ctrl.deregister_tenant("t2"));
254 assert_eq!(ctrl.tenant_count(), 0);
255 let err = ctrl
256 .try_admit("t2")
257 .expect_err("t2 was deregistered, should reject");
258 assert!(matches!(err, AdmissionError::TenantNotRegistered { .. }));
259 }
260
261 #[test]
262 fn test_sla_class_query() {
263 let ctrl = AdmissionController::new();
264 ctrl.register_tenant("s", SlaClass::Silver);
265 assert_eq!(ctrl.sla_class("s"), Some(SlaClass::Silver));
266 assert_eq!(ctrl.sla_class("nonexistent"), None);
267 }
268
269 #[test]
270 fn test_custom_cost_admit() {
271 let ctrl = AdmissionController::new();
272 ctrl.register_tenant("gold", SlaClass::Gold);
273 assert!(ctrl.try_admit_with_cost("gold", 45.0).is_ok());
275 assert!(ctrl.try_admit_with_cost("gold", 10.0).is_err());
277 }
278}