1use std::collections::HashMap;
47use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
48use std::sync::Arc;
49use std::time::{Duration, Instant};
50
51use parking_lot::{Mutex, RwLock};
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
55pub enum ResourceDimension {
56 Cpu,
58 RandomIops,
60 SequentialBandwidth,
62 Memory,
64 Connections,
66}
67
68impl ResourceDimension {
69 pub fn name(&self) -> &'static str {
70 match self {
71 ResourceDimension::Cpu => "cpu",
72 ResourceDimension::RandomIops => "random_iops",
73 ResourceDimension::SequentialBandwidth => "seq_bandwidth",
74 ResourceDimension::Memory => "memory",
75 ResourceDimension::Connections => "connections",
76 }
77 }
78}
79
80#[derive(Debug, Clone, Default)]
82pub struct OperationCost {
83 pub cpu_us: u64,
85 pub random_iops: u64,
87 pub sequential_bytes: u64,
89 pub memory_bytes: u64,
91 pub priority: u32,
93}
94
95impl OperationCost {
96 pub fn zero() -> Self {
98 Self::default()
99 }
100
101 pub fn point_read() -> Self {
103 Self {
104 cpu_us: 10,
105 random_iops: 1,
106 sequential_bytes: 0,
107 memory_bytes: 4096,
108 priority: 100,
109 }
110 }
111
112 pub fn scan(rows: usize, row_bytes: usize) -> Self {
114 Self {
115 cpu_us: (rows * 5) as u64, random_iops: 0,
117 sequential_bytes: (rows * row_bytes) as u64,
118 memory_bytes: (rows * row_bytes).min(64 * 1024 * 1024) as u64, priority: 50,
120 }
121 }
122
123 pub fn vector_search(dimension: usize, ef_search: usize, candidates: usize) -> Self {
125 let distance_calcs = ef_search * candidates;
127 Self {
128 cpu_us: (distance_calcs * dimension / 100) as u64, random_iops: (ef_search / 10).max(1) as u64, sequential_bytes: 0,
131 memory_bytes: (ef_search * dimension * 4) as u64, priority: 75,
133 }
134 }
135
136 pub fn write(bytes: usize) -> Self {
138 Self {
139 cpu_us: (bytes / 100).max(10) as u64,
140 random_iops: 0,
141 sequential_bytes: bytes as u64, memory_bytes: bytes as u64,
143 priority: 100,
144 }
145 }
146
147 pub fn total_weighted_cost(&self) -> u64 {
149 self.cpu_us + self.random_iops * 100 + self.sequential_bytes / 1024 + self.memory_bytes / 4096
150 }
151}
152
153pub struct TokenBucket {
155 tokens: AtomicI64,
157 capacity: i64,
159 refill_rate: i64,
161 last_refill: AtomicU64,
163}
164
165impl TokenBucket {
166 pub fn new(capacity: i64, refill_rate: i64) -> Self {
168 Self {
169 tokens: AtomicI64::new(capacity),
170 capacity,
171 refill_rate,
172 last_refill: AtomicU64::new(
173 std::time::SystemTime::now()
174 .duration_since(std::time::UNIX_EPOCH)
175 .unwrap_or_default()
176 .as_millis() as u64,
177 ),
178 }
179 }
180
181 pub fn try_acquire(&self, tokens: i64) -> bool {
183 self.refill();
184
185 loop {
186 let current = self.tokens.load(Ordering::Acquire);
187 if current < tokens {
188 return false;
189 }
190 if self
191 .tokens
192 .compare_exchange_weak(
193 current,
194 current - tokens,
195 Ordering::AcqRel,
196 Ordering::Relaxed,
197 )
198 .is_ok()
199 {
200 return true;
201 }
202 }
203 }
204
205 fn refill(&self) {
207 let now = std::time::SystemTime::now()
208 .duration_since(std::time::UNIX_EPOCH)
209 .unwrap_or_default()
210 .as_millis() as u64;
211
212 let last = self.last_refill.load(Ordering::Relaxed);
213 let elapsed_ms = now.saturating_sub(last);
214
215 if elapsed_ms > 0 {
216 let tokens_to_add = (self.refill_rate * elapsed_ms as i64) / 1000;
217 if tokens_to_add > 0 {
218 if self
219 .last_refill
220 .compare_exchange(last, now, Ordering::AcqRel, Ordering::Relaxed)
221 .is_ok()
222 {
223 let current = self.tokens.load(Ordering::Relaxed);
224 let new_tokens = (current + tokens_to_add).min(self.capacity);
225 self.tokens.store(new_tokens, Ordering::Release);
226 }
227 }
228 }
229 }
230
231 pub fn release(&self, tokens: i64) {
233 let current = self.tokens.load(Ordering::Relaxed);
234 let new_tokens = (current + tokens).min(self.capacity);
235 self.tokens.store(new_tokens, Ordering::Release);
236 }
237
238 pub fn available(&self) -> i64 {
240 self.refill();
241 self.tokens.load(Ordering::Relaxed)
242 }
243
244 pub fn utilization(&self) -> f64 {
246 1.0 - (self.available() as f64 / self.capacity as f64)
247 }
248}
249
250pub struct TenantQuota {
252 pub tenant_id: String,
254 pub weight: u32,
256 buckets: HashMap<ResourceDimension, TokenBucket>,
258 deficit: AtomicI64,
260 pending_count: AtomicU64,
262 total_requests: AtomicU64,
264 rejected_requests: AtomicU64,
266}
267
268impl TenantQuota {
269 pub fn new(tenant_id: String, weight: u32) -> Self {
271 let mut buckets = HashMap::new();
272
273 buckets.insert(
275 ResourceDimension::Cpu,
276 TokenBucket::new(10_000_000, 1_000_000), );
278 buckets.insert(
279 ResourceDimension::RandomIops,
280 TokenBucket::new(10_000, 1_000), );
282 buckets.insert(
283 ResourceDimension::SequentialBandwidth,
284 TokenBucket::new(1_000_000_000, 100_000_000), );
286 buckets.insert(
287 ResourceDimension::Memory,
288 TokenBucket::new(1_000_000_000, 500_000_000), );
290 buckets.insert(
291 ResourceDimension::Connections,
292 TokenBucket::new(100, 10), );
294
295 Self {
296 tenant_id,
297 weight,
298 buckets,
299 deficit: AtomicI64::new(0),
300 pending_count: AtomicU64::new(0),
301 total_requests: AtomicU64::new(0),
302 rejected_requests: AtomicU64::new(0),
303 }
304 }
305
306 pub fn try_acquire(&self, cost: &OperationCost) -> bool {
308 let cpu_ok = self
310 .buckets
311 .get(&ResourceDimension::Cpu)
312 .map(|b| b.available() >= cost.cpu_us as i64)
313 .unwrap_or(true);
314
315 let iops_ok = self
316 .buckets
317 .get(&ResourceDimension::RandomIops)
318 .map(|b| b.available() >= cost.random_iops as i64)
319 .unwrap_or(true);
320
321 let bandwidth_ok = self
322 .buckets
323 .get(&ResourceDimension::SequentialBandwidth)
324 .map(|b| b.available() >= cost.sequential_bytes as i64)
325 .unwrap_or(true);
326
327 let memory_ok = self
328 .buckets
329 .get(&ResourceDimension::Memory)
330 .map(|b| b.available() >= cost.memory_bytes as i64)
331 .unwrap_or(true);
332
333 if cpu_ok && iops_ok && bandwidth_ok && memory_ok {
334 if let Some(b) = self.buckets.get(&ResourceDimension::Cpu) {
336 b.try_acquire(cost.cpu_us as i64);
337 }
338 if let Some(b) = self.buckets.get(&ResourceDimension::RandomIops) {
339 b.try_acquire(cost.random_iops as i64);
340 }
341 if let Some(b) = self.buckets.get(&ResourceDimension::SequentialBandwidth) {
342 b.try_acquire(cost.sequential_bytes as i64);
343 }
344 if let Some(b) = self.buckets.get(&ResourceDimension::Memory) {
345 b.try_acquire(cost.memory_bytes as i64);
346 }
347 self.total_requests.fetch_add(1, Ordering::Relaxed);
348 true
349 } else {
350 self.rejected_requests.fetch_add(1, Ordering::Relaxed);
351 false
352 }
353 }
354
355 pub fn release(&self, cost: &OperationCost) {
357 if let Some(b) = self.buckets.get(&ResourceDimension::Memory) {
358 b.release(cost.memory_bytes as i64);
359 }
360 }
361
362 pub fn utilization(&self) -> HashMap<ResourceDimension, f64> {
364 self.buckets
365 .iter()
366 .map(|(dim, bucket)| (*dim, bucket.utilization()))
367 .collect()
368 }
369}
370
371#[derive(Debug, Clone)]
373pub enum AdmissionDecision {
374 Admit { cost: OperationCost },
376 Reject {
378 reason: String,
379 retry_after_ms: Option<u64>,
380 },
381 PartialAdmit {
383 cost: OperationCost,
384 max_results: usize,
385 recall_warning: String,
386 },
387}
388
389#[derive(Debug, Clone)]
391pub struct AdmissionConfig {
392 pub global_limits: HashMap<ResourceDimension, (i64, i64)>, pub default_tenant_weight: u32,
396 pub allow_partial_results: bool,
398 pub queue_depth_warning: usize,
400 pub queue_depth_rejection: usize,
402}
403
404impl Default for AdmissionConfig {
405 fn default() -> Self {
406 let mut global_limits = HashMap::new();
407 global_limits.insert(ResourceDimension::Cpu, (100_000_000, 10_000_000));
408 global_limits.insert(ResourceDimension::RandomIops, (100_000, 10_000));
409 global_limits.insert(ResourceDimension::SequentialBandwidth, (10_000_000_000, 1_000_000_000));
410 global_limits.insert(ResourceDimension::Memory, (10_000_000_000, 2_000_000_000));
411 global_limits.insert(ResourceDimension::Connections, (1000, 100));
412
413 Self {
414 global_limits,
415 default_tenant_weight: 100,
416 allow_partial_results: false,
417 queue_depth_warning: 100,
418 queue_depth_rejection: 1000,
419 }
420 }
421}
422
423pub struct AdmissionController {
425 config: AdmissionConfig,
426 global_buckets: HashMap<ResourceDimension, TokenBucket>,
428 tenants: RwLock<HashMap<String, Arc<TenantQuota>>>,
430 queue_depth: AtomicU64,
432 metrics: AdmissionMetrics,
434}
435
436#[derive(Default)]
438pub struct AdmissionMetrics {
439 pub total_requests: AtomicU64,
440 pub admitted_requests: AtomicU64,
441 pub rejected_requests: AtomicU64,
442 pub partial_requests: AtomicU64,
443 pub avg_queue_depth: AtomicU64,
444}
445
446impl AdmissionController {
447 pub fn new(config: AdmissionConfig) -> Self {
449 let mut global_buckets = HashMap::new();
450 for (dim, (capacity, rate)) in &config.global_limits {
451 global_buckets.insert(*dim, TokenBucket::new(*capacity, *rate));
452 }
453
454 Self {
455 config,
456 global_buckets,
457 tenants: RwLock::new(HashMap::new()),
458 queue_depth: AtomicU64::new(0),
459 metrics: AdmissionMetrics::default(),
460 }
461 }
462
463 pub fn register_tenant(&self, tenant_id: &str, weight: u32) {
465 let mut tenants = self.tenants.write();
466 if !tenants.contains_key(tenant_id) {
467 tenants.insert(
468 tenant_id.to_string(),
469 Arc::new(TenantQuota::new(tenant_id.to_string(), weight)),
470 );
471 }
472 }
473
474 fn get_tenant(&self, tenant_id: &str) -> Arc<TenantQuota> {
476 {
477 let tenants = self.tenants.read();
478 if let Some(tenant) = tenants.get(tenant_id) {
479 return tenant.clone();
480 }
481 }
482
483 let mut tenants = self.tenants.write();
485 tenants
486 .entry(tenant_id.to_string())
487 .or_insert_with(|| {
488 Arc::new(TenantQuota::new(
489 tenant_id.to_string(),
490 self.config.default_tenant_weight,
491 ))
492 })
493 .clone()
494 }
495
496 pub fn evaluate(
498 &self,
499 tenant_id: &str,
500 cost: OperationCost,
501 allow_partial: bool,
502 ) -> AdmissionDecision {
503 self.metrics.total_requests.fetch_add(1, Ordering::Relaxed);
504
505 let depth = self.queue_depth.load(Ordering::Relaxed);
507 if depth >= self.config.queue_depth_rejection as u64 {
508 self.metrics.rejected_requests.fetch_add(1, Ordering::Relaxed);
509 return AdmissionDecision::Reject {
510 reason: format!("Queue depth {} exceeds limit", depth),
511 retry_after_ms: Some(100),
512 };
513 }
514
515 let global_ok = self.check_global_limits(&cost);
517 if !global_ok {
518 self.metrics.rejected_requests.fetch_add(1, Ordering::Relaxed);
519 return AdmissionDecision::Reject {
520 reason: "Global resource limits exceeded".to_string(),
521 retry_after_ms: Some(50),
522 };
523 }
524
525 let tenant = self.get_tenant(tenant_id);
527 if tenant.try_acquire(&cost) {
528 self.queue_depth.fetch_add(1, Ordering::Relaxed);
529 self.metrics.admitted_requests.fetch_add(1, Ordering::Relaxed);
530 AdmissionDecision::Admit { cost }
531 } else if allow_partial && self.config.allow_partial_results {
532 let reduced_cost = OperationCost {
534 cpu_us: cost.cpu_us / 4,
535 random_iops: cost.random_iops / 4,
536 sequential_bytes: cost.sequential_bytes / 4,
537 memory_bytes: cost.memory_bytes / 4,
538 priority: cost.priority,
539 };
540 if tenant.try_acquire(&reduced_cost) {
541 self.queue_depth.fetch_add(1, Ordering::Relaxed);
542 self.metrics.partial_requests.fetch_add(1, Ordering::Relaxed);
543 AdmissionDecision::PartialAdmit {
544 cost: reduced_cost,
545 max_results: 25, recall_warning: "Results limited due to load - recall may be degraded".to_string(),
547 }
548 } else {
549 self.metrics.rejected_requests.fetch_add(1, Ordering::Relaxed);
550 AdmissionDecision::Reject {
551 reason: format!("Tenant {} quota exceeded", tenant_id),
552 retry_after_ms: Some(100),
553 }
554 }
555 } else {
556 self.metrics.rejected_requests.fetch_add(1, Ordering::Relaxed);
557 AdmissionDecision::Reject {
558 reason: format!("Tenant {} quota exceeded", tenant_id),
559 retry_after_ms: Some(100),
560 }
561 }
562 }
563
564 fn check_global_limits(&self, cost: &OperationCost) -> bool {
566 let cpu_ok = self
567 .global_buckets
568 .get(&ResourceDimension::Cpu)
569 .map(|b| b.available() >= cost.cpu_us as i64)
570 .unwrap_or(true);
571
572 let iops_ok = self
573 .global_buckets
574 .get(&ResourceDimension::RandomIops)
575 .map(|b| b.available() >= cost.random_iops as i64)
576 .unwrap_or(true);
577
578 cpu_ok && iops_ok
579 }
580
581 pub fn complete(&self, tenant_id: &str, cost: &OperationCost) {
583 self.queue_depth.fetch_sub(1, Ordering::Relaxed);
584 let tenant = self.get_tenant(tenant_id);
585 tenant.release(cost);
586 }
587
588 pub fn system_load(&self) -> SystemLoad {
590 let mut utilization = HashMap::new();
591 for (dim, bucket) in &self.global_buckets {
592 utilization.insert(*dim, bucket.utilization());
593 }
594
595 SystemLoad {
596 queue_depth: self.queue_depth.load(Ordering::Relaxed),
597 utilization,
598 total_requests: self.metrics.total_requests.load(Ordering::Relaxed),
599 admitted_requests: self.metrics.admitted_requests.load(Ordering::Relaxed),
600 rejected_requests: self.metrics.rejected_requests.load(Ordering::Relaxed),
601 }
602 }
603}
604
605#[derive(Debug)]
607pub struct SystemLoad {
608 pub queue_depth: u64,
609 pub utilization: HashMap<ResourceDimension, f64>,
610 pub total_requests: u64,
611 pub admitted_requests: u64,
612 pub rejected_requests: u64,
613}
614
615impl SystemLoad {
616 pub fn is_overloaded(&self) -> bool {
618 self.utilization.values().any(|&u| u > 0.9)
619 }
620
621 pub fn admission_rate(&self) -> f64 {
623 if self.total_requests == 0 {
624 1.0
625 } else {
626 self.admitted_requests as f64 / self.total_requests as f64
627 }
628 }
629}
630
631#[cfg(test)]
632mod tests {
633 use super::*;
634
635 #[test]
636 fn test_token_bucket_basic() {
637 let bucket = TokenBucket::new(100, 10);
638 assert_eq!(bucket.available(), 100);
639
640 assert!(bucket.try_acquire(50));
641 assert_eq!(bucket.available(), 50);
642
643 assert!(!bucket.try_acquire(60));
644 assert_eq!(bucket.available(), 50);
645
646 bucket.release(25);
647 assert_eq!(bucket.available(), 75);
648 }
649
650 #[test]
651 fn test_operation_cost_estimation() {
652 let point_read = OperationCost::point_read();
653 assert!(point_read.random_iops > 0);
654 assert_eq!(point_read.sequential_bytes, 0);
655
656 let scan = OperationCost::scan(1000, 100);
657 assert_eq!(scan.sequential_bytes, 100_000);
658 assert_eq!(scan.random_iops, 0);
659
660 let vector = OperationCost::vector_search(128, 64, 100);
661 assert!(vector.cpu_us > 0);
662 }
663
664 #[test]
665 fn test_admission_controller_basic() {
666 let controller = AdmissionController::new(AdmissionConfig::default());
667 controller.register_tenant("test", 100);
668
669 let cost = OperationCost::point_read();
670 let decision = controller.evaluate("test", cost.clone(), false);
671
672 assert!(matches!(decision, AdmissionDecision::Admit { .. }));
673
674 controller.complete("test", &cost);
675 }
676
677 #[test]
678 fn test_tenant_quota_exhaustion() {
679 let quota = TenantQuota::new("test".to_string(), 100);
680
681 let huge_cost = OperationCost {
683 cpu_us: 100_000_000, random_iops: 0,
685 sequential_bytes: 0,
686 memory_bytes: 0,
687 priority: 100,
688 };
689
690 assert!(!quota.try_acquire(&huge_cost));
691 }
692}