Skip to main content

oxirs_core/sla/
admission_controller.rs

1//! Token-bucket admission controller — one bucket per tenant.
2//!
3//! Each tenant is registered with an [`SlaClass`]; the controller maintains a
4//! leaky-bucket that refills at the rate specified by
5//! [`super::thresholds::SlaThresholds::token_refill_rate`] and can burst up to
6//! [`super::thresholds::SlaThresholds::token_bucket_capacity`] tokens.
7//!
8//! A call to [`AdmissionController::try_admit`] deducts 1.0 token and returns
9//! `Ok(())` on success or an [`AdmissionError`] on rejection.
10
11use std::collections::HashMap;
12use std::sync::{Arc, Mutex};
13use std::time::Instant;
14use thiserror::Error;
15
16use super::class::SlaClass;
17
18// ─────────────────────────────────────────────────────────────────────────────
19// AdmissionError
20// ─────────────────────────────────────────────────────────────────────────────
21
22/// Errors returned by [`AdmissionController::try_admit`].
23#[derive(Debug, Error, Clone, PartialEq, Eq)]
24pub enum AdmissionError {
25    /// The tenant exhausted its token bucket and cannot accept more requests
26    /// at this time.
27    #[error("Rate limit exceeded for tenant '{tenant_id}'")]
28    RateLimitExceeded {
29        /// The tenant identifier whose bucket is empty.
30        tenant_id: String,
31    },
32
33    /// The tenant has not been registered with [`AdmissionController::register_tenant`].
34    #[error("Tenant '{tenant_id}' is not registered with the admission controller")]
35    TenantNotRegistered {
36        /// The tenant identifier that was not found in the registry.
37        tenant_id: String,
38    },
39}
40
41// ─────────────────────────────────────────────────────────────────────────────
42// TokenBucket (private)
43// ─────────────────────────────────────────────────────────────────────────────
44
45struct TokenBucket {
46    tokens: f64,
47    capacity: f64,
48    refill_rate: f64, // tokens per second
49    last_refill: Instant,
50    sla_class: SlaClass,
51}
52
53impl TokenBucket {
54    fn new(sla: SlaClass) -> Self {
55        let t = sla.thresholds();
56        TokenBucket {
57            tokens: t.token_bucket_capacity,
58            capacity: t.token_bucket_capacity,
59            refill_rate: t.token_refill_rate,
60            last_refill: Instant::now(),
61            sla_class: sla,
62        }
63    }
64
65    /// Bring the bucket up to date with elapsed wall time.
66    fn refill(&mut self) {
67        let elapsed_secs = self.last_refill.elapsed().as_secs_f64();
68        self.tokens = (self.tokens + elapsed_secs * self.refill_rate).min(self.capacity);
69        self.last_refill = Instant::now();
70    }
71
72    /// Attempt to consume `cost` tokens.  Returns `true` if admitted.
73    fn try_consume(&mut self, cost: f64) -> bool {
74        self.refill();
75        if self.tokens >= cost {
76            self.tokens -= cost;
77            true
78        } else {
79            false
80        }
81    }
82
83    fn sla_class(&self) -> SlaClass {
84        self.sla_class
85    }
86
87    /// Current token level (after lazy refill).
88    fn available_tokens(&mut self) -> f64 {
89        self.refill();
90        self.tokens
91    }
92}
93
94// ─────────────────────────────────────────────────────────────────────────────
95// AdmissionController
96// ─────────────────────────────────────────────────────────────────────────────
97
98/// Thread-safe admission controller backed by per-tenant token buckets.
99///
100/// ```rust
101/// use oxirs_core::sla::{SlaClass, AdmissionController};
102///
103/// let ctrl = AdmissionController::new();
104/// ctrl.register_tenant("premium_user", SlaClass::Platinum);
105/// assert!(ctrl.try_admit("premium_user").is_ok());
106/// ```
107#[derive(Clone)]
108pub struct AdmissionController {
109    buckets: Arc<Mutex<HashMap<String, TokenBucket>>>,
110}
111
112impl Default for AdmissionController {
113    fn default() -> Self {
114        Self::new()
115    }
116}
117
118impl AdmissionController {
119    /// Create an empty controller (no tenants registered).
120    pub fn new() -> Self {
121        AdmissionController {
122            buckets: Arc::new(Mutex::new(HashMap::new())),
123        }
124    }
125
126    /// Register (or re-register) a tenant with the given SLA class.
127    ///
128    /// Re-registering an existing tenant resets its bucket to full capacity.
129    pub fn register_tenant(&self, tenant_id: &str, sla: SlaClass) {
130        let mut buckets = self.buckets.lock().unwrap_or_else(|e| e.into_inner());
131        buckets.insert(tenant_id.to_owned(), TokenBucket::new(sla));
132    }
133
134    /// Try to admit one query unit (cost = 1.0 token) for `tenant_id`.
135    ///
136    /// Returns `Ok(())` when admitted, `Err(AdmissionError)` otherwise.
137    pub fn try_admit(&self, tenant_id: &str) -> Result<(), AdmissionError> {
138        let mut buckets = self.buckets.lock().unwrap_or_else(|e| e.into_inner());
139        match buckets.get_mut(tenant_id) {
140            Some(bucket) => {
141                if bucket.try_consume(1.0) {
142                    Ok(())
143                } else {
144                    Err(AdmissionError::RateLimitExceeded {
145                        tenant_id: tenant_id.to_owned(),
146                    })
147                }
148            }
149            None => Err(AdmissionError::TenantNotRegistered {
150                tenant_id: tenant_id.to_owned(),
151            }),
152        }
153    }
154
155    /// Try to admit a request with a custom token cost.
156    pub fn try_admit_with_cost(&self, tenant_id: &str, cost: f64) -> Result<(), AdmissionError> {
157        let mut buckets = self.buckets.lock().unwrap_or_else(|e| e.into_inner());
158        match buckets.get_mut(tenant_id) {
159            Some(bucket) => {
160                if bucket.try_consume(cost) {
161                    Ok(())
162                } else {
163                    Err(AdmissionError::RateLimitExceeded {
164                        tenant_id: tenant_id.to_owned(),
165                    })
166                }
167            }
168            None => Err(AdmissionError::TenantNotRegistered {
169                tenant_id: tenant_id.to_owned(),
170            }),
171        }
172    }
173
174    /// Return the SLA class for a registered tenant, if any.
175    pub fn sla_class(&self, tenant_id: &str) -> Option<SlaClass> {
176        let mut buckets = self.buckets.lock().unwrap_or_else(|e| e.into_inner());
177        buckets.get_mut(tenant_id).map(|b| b.sla_class())
178    }
179
180    /// Return the current available token count for `tenant_id`.
181    ///
182    /// Returns `None` when the tenant is not registered.
183    pub fn available_tokens(&self, tenant_id: &str) -> Option<f64> {
184        let mut buckets = self.buckets.lock().unwrap_or_else(|e| e.into_inner());
185        buckets.get_mut(tenant_id).map(|b| b.available_tokens())
186    }
187
188    /// Return the number of registered tenants.
189    pub fn tenant_count(&self) -> usize {
190        self.buckets.lock().unwrap_or_else(|e| e.into_inner()).len()
191    }
192
193    /// Deregister a tenant.  Returns `true` if the tenant existed.
194    pub fn deregister_tenant(&self, tenant_id: &str) -> bool {
195        self.buckets
196            .lock()
197            .unwrap_or_else(|e| e.into_inner())
198            .remove(tenant_id)
199            .is_some()
200    }
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206
207    #[test]
208    fn test_admit_registered_tenant() {
209        let ctrl = AdmissionController::new();
210        ctrl.register_tenant("t1", SlaClass::Platinum);
211        // Platinum has capacity=200; first few requests trivially succeed
212        for _ in 0..10 {
213            assert!(ctrl.try_admit("t1").is_ok());
214        }
215    }
216
217    #[test]
218    fn test_reject_unknown_tenant() {
219        let ctrl = AdmissionController::new();
220        let err = ctrl
221            .try_admit("ghost")
222            .expect_err("ghost is not registered");
223        assert!(matches!(err, AdmissionError::TenantNotRegistered { .. }));
224    }
225
226    #[test]
227    fn test_bronze_bucket_exhausts() {
228        let ctrl = AdmissionController::new();
229        ctrl.register_tenant("bronze", SlaClass::Bronze);
230
231        // Bronze capacity = 5; drain it then check rejection
232        let mut admitted = 0usize;
233        let mut rejected = 0usize;
234        for _ in 0..10 {
235            if ctrl.try_admit("bronze").is_ok() {
236                admitted += 1;
237            } else {
238                rejected += 1;
239            }
240        }
241        assert!(admitted > 0, "should admit at least the first few");
242        assert!(
243            rejected > 0,
244            "should eventually reject once tokens depleted"
245        );
246    }
247
248    #[test]
249    fn test_deregister_tenant() {
250        let ctrl = AdmissionController::new();
251        ctrl.register_tenant("t2", SlaClass::Gold);
252        assert_eq!(ctrl.tenant_count(), 1);
253        assert!(ctrl.deregister_tenant("t2"));
254        assert_eq!(ctrl.tenant_count(), 0);
255        let err = ctrl
256            .try_admit("t2")
257            .expect_err("t2 was deregistered, should reject");
258        assert!(matches!(err, AdmissionError::TenantNotRegistered { .. }));
259    }
260
261    #[test]
262    fn test_sla_class_query() {
263        let ctrl = AdmissionController::new();
264        ctrl.register_tenant("s", SlaClass::Silver);
265        assert_eq!(ctrl.sla_class("s"), Some(SlaClass::Silver));
266        assert_eq!(ctrl.sla_class("nonexistent"), None);
267    }
268
269    #[test]
270    fn test_custom_cost_admit() {
271        let ctrl = AdmissionController::new();
272        ctrl.register_tenant("gold", SlaClass::Gold);
273        // Gold capacity = 50; consume in one big gulp
274        assert!(ctrl.try_admit_with_cost("gold", 45.0).is_ok());
275        // Only 5 tokens left; a cost-of-10 should fail
276        assert!(ctrl.try_admit_with_cost("gold", 10.0).is_err());
277    }
278}