1use std::collections::HashMap;
47use std::sync::Arc;
48use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
49
50use parking_lot::RwLock;
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
54pub enum ResourceDimension {
55 Cpu,
57 RandomIops,
59 SequentialBandwidth,
61 Memory,
63 Connections,
65}
66
67impl ResourceDimension {
68 pub fn name(&self) -> &'static str {
69 match self {
70 ResourceDimension::Cpu => "cpu",
71 ResourceDimension::RandomIops => "random_iops",
72 ResourceDimension::SequentialBandwidth => "seq_bandwidth",
73 ResourceDimension::Memory => "memory",
74 ResourceDimension::Connections => "connections",
75 }
76 }
77}
78
79#[derive(Debug, Clone, Default)]
81pub struct OperationCost {
82 pub cpu_us: u64,
84 pub random_iops: u64,
86 pub sequential_bytes: u64,
88 pub memory_bytes: u64,
90 pub priority: u32,
92}
93
94impl OperationCost {
95 pub fn zero() -> Self {
97 Self::default()
98 }
99
100 pub fn point_read() -> Self {
102 Self {
103 cpu_us: 10,
104 random_iops: 1,
105 sequential_bytes: 0,
106 memory_bytes: 4096,
107 priority: 100,
108 }
109 }
110
111 pub fn scan(rows: usize, row_bytes: usize) -> Self {
113 Self {
114 cpu_us: (rows * 5) as u64, random_iops: 0,
116 sequential_bytes: (rows * row_bytes) as u64,
117 memory_bytes: (rows * row_bytes).min(64 * 1024 * 1024) as u64, priority: 50,
119 }
120 }
121
122 pub fn vector_search(dimension: usize, ef_search: usize, candidates: usize) -> Self {
124 let distance_calcs = ef_search * candidates;
126 Self {
127 cpu_us: (distance_calcs * dimension / 100) as u64, random_iops: (ef_search / 10).max(1) as u64, sequential_bytes: 0,
130 memory_bytes: (ef_search * dimension * 4) as u64, priority: 75,
132 }
133 }
134
135 pub fn write(bytes: usize) -> Self {
137 Self {
138 cpu_us: (bytes / 100).max(10) as u64,
139 random_iops: 0,
140 sequential_bytes: bytes as u64, memory_bytes: bytes as u64,
142 priority: 100,
143 }
144 }
145
146 pub fn total_weighted_cost(&self) -> u64 {
148 self.cpu_us
149 + self.random_iops * 100
150 + self.sequential_bytes / 1024
151 + self.memory_bytes / 4096
152 }
153}
154
155pub struct TokenBucket {
157 tokens: AtomicI64,
159 capacity: i64,
161 refill_rate: i64,
163 last_refill: AtomicU64,
165}
166
167impl TokenBucket {
168 pub fn new(capacity: i64, refill_rate: i64) -> Self {
170 Self {
171 tokens: AtomicI64::new(capacity),
172 capacity,
173 refill_rate,
174 last_refill: AtomicU64::new(
175 std::time::SystemTime::now()
176 .duration_since(std::time::UNIX_EPOCH)
177 .unwrap_or_default()
178 .as_millis() as u64,
179 ),
180 }
181 }
182
183 pub fn try_acquire(&self, tokens: i64) -> bool {
185 self.refill();
186
187 loop {
188 let current = self.tokens.load(Ordering::Acquire);
189 if current < tokens {
190 return false;
191 }
192 if self
193 .tokens
194 .compare_exchange_weak(
195 current,
196 current - tokens,
197 Ordering::AcqRel,
198 Ordering::Relaxed,
199 )
200 .is_ok()
201 {
202 return true;
203 }
204 }
205 }
206
207 fn refill(&self) {
209 let now = std::time::SystemTime::now()
210 .duration_since(std::time::UNIX_EPOCH)
211 .unwrap_or_default()
212 .as_millis() as u64;
213
214 let last = self.last_refill.load(Ordering::Relaxed);
215 let elapsed_ms = now.saturating_sub(last);
216
217 if elapsed_ms > 0 {
218 let tokens_to_add = (self.refill_rate * elapsed_ms as i64) / 1000;
219 if tokens_to_add > 0 {
220 if self
221 .last_refill
222 .compare_exchange(last, now, Ordering::AcqRel, Ordering::Relaxed)
223 .is_ok()
224 {
225 let current = self.tokens.load(Ordering::Relaxed);
226 let new_tokens = (current + tokens_to_add).min(self.capacity);
227 self.tokens.store(new_tokens, Ordering::Release);
228 }
229 }
230 }
231 }
232
233 pub fn release(&self, tokens: i64) {
235 let current = self.tokens.load(Ordering::Relaxed);
236 let new_tokens = (current + tokens).min(self.capacity);
237 self.tokens.store(new_tokens, Ordering::Release);
238 }
239
240 pub fn available(&self) -> i64 {
242 self.refill();
243 self.tokens.load(Ordering::Relaxed)
244 }
245
246 pub fn utilization(&self) -> f64 {
248 1.0 - (self.available() as f64 / self.capacity as f64)
249 }
250}
251
252pub struct TenantQuota {
254 pub tenant_id: String,
256 pub weight: u32,
258 buckets: HashMap<ResourceDimension, TokenBucket>,
260 deficit: AtomicI64,
262 pending_count: AtomicU64,
264 total_requests: AtomicU64,
266 rejected_requests: AtomicU64,
268}
269
270impl TenantQuota {
271 pub fn new(tenant_id: String, weight: u32) -> Self {
273 let mut buckets = HashMap::new();
274
275 buckets.insert(
277 ResourceDimension::Cpu,
278 TokenBucket::new(10_000_000, 1_000_000), );
280 buckets.insert(
281 ResourceDimension::RandomIops,
282 TokenBucket::new(10_000, 1_000), );
284 buckets.insert(
285 ResourceDimension::SequentialBandwidth,
286 TokenBucket::new(1_000_000_000, 100_000_000), );
288 buckets.insert(
289 ResourceDimension::Memory,
290 TokenBucket::new(1_000_000_000, 500_000_000), );
292 buckets.insert(
293 ResourceDimension::Connections,
294 TokenBucket::new(100, 10), );
296
297 Self {
298 tenant_id,
299 weight,
300 buckets,
301 deficit: AtomicI64::new(0),
302 pending_count: AtomicU64::new(0),
303 total_requests: AtomicU64::new(0),
304 rejected_requests: AtomicU64::new(0),
305 }
306 }
307
308 pub fn try_acquire(&self, cost: &OperationCost) -> bool {
310 let cpu_ok = self
312 .buckets
313 .get(&ResourceDimension::Cpu)
314 .map(|b| b.available() >= cost.cpu_us as i64)
315 .unwrap_or(true);
316
317 let iops_ok = self
318 .buckets
319 .get(&ResourceDimension::RandomIops)
320 .map(|b| b.available() >= cost.random_iops as i64)
321 .unwrap_or(true);
322
323 let bandwidth_ok = self
324 .buckets
325 .get(&ResourceDimension::SequentialBandwidth)
326 .map(|b| b.available() >= cost.sequential_bytes as i64)
327 .unwrap_or(true);
328
329 let memory_ok = self
330 .buckets
331 .get(&ResourceDimension::Memory)
332 .map(|b| b.available() >= cost.memory_bytes as i64)
333 .unwrap_or(true);
334
335 if cpu_ok && iops_ok && bandwidth_ok && memory_ok {
336 if let Some(b) = self.buckets.get(&ResourceDimension::Cpu) {
338 b.try_acquire(cost.cpu_us as i64);
339 }
340 if let Some(b) = self.buckets.get(&ResourceDimension::RandomIops) {
341 b.try_acquire(cost.random_iops as i64);
342 }
343 if let Some(b) = self.buckets.get(&ResourceDimension::SequentialBandwidth) {
344 b.try_acquire(cost.sequential_bytes as i64);
345 }
346 if let Some(b) = self.buckets.get(&ResourceDimension::Memory) {
347 b.try_acquire(cost.memory_bytes as i64);
348 }
349 self.total_requests.fetch_add(1, Ordering::Relaxed);
350 true
351 } else {
352 self.rejected_requests.fetch_add(1, Ordering::Relaxed);
353 false
354 }
355 }
356
357 pub fn release(&self, cost: &OperationCost) {
359 if let Some(b) = self.buckets.get(&ResourceDimension::Memory) {
360 b.release(cost.memory_bytes as i64);
361 }
362 }
363
364 pub fn utilization(&self) -> HashMap<ResourceDimension, f64> {
366 self.buckets
367 .iter()
368 .map(|(dim, bucket)| (*dim, bucket.utilization()))
369 .collect()
370 }
371}
372
373#[derive(Debug, Clone)]
375pub enum AdmissionDecision {
376 Admit { cost: OperationCost },
378 Reject {
380 reason: String,
381 retry_after_ms: Option<u64>,
382 },
383 PartialAdmit {
385 cost: OperationCost,
386 max_results: usize,
387 recall_warning: String,
388 },
389}
390
391#[derive(Debug, Clone)]
393pub struct AdmissionConfig {
394 pub global_limits: HashMap<ResourceDimension, (i64, i64)>, pub default_tenant_weight: u32,
398 pub allow_partial_results: bool,
400 pub queue_depth_warning: usize,
402 pub queue_depth_rejection: usize,
404}
405
406impl Default for AdmissionConfig {
407 fn default() -> Self {
408 let mut global_limits = HashMap::new();
409 global_limits.insert(ResourceDimension::Cpu, (100_000_000, 10_000_000));
410 global_limits.insert(ResourceDimension::RandomIops, (100_000, 10_000));
411 global_limits.insert(
412 ResourceDimension::SequentialBandwidth,
413 (10_000_000_000, 1_000_000_000),
414 );
415 global_limits.insert(ResourceDimension::Memory, (10_000_000_000, 2_000_000_000));
416 global_limits.insert(ResourceDimension::Connections, (1000, 100));
417
418 Self {
419 global_limits,
420 default_tenant_weight: 100,
421 allow_partial_results: false,
422 queue_depth_warning: 100,
423 queue_depth_rejection: 1000,
424 }
425 }
426}
427
428pub struct AdmissionController {
430 config: AdmissionConfig,
431 global_buckets: HashMap<ResourceDimension, TokenBucket>,
433 tenants: RwLock<HashMap<String, Arc<TenantQuota>>>,
435 queue_depth: AtomicU64,
437 metrics: AdmissionMetrics,
439}
440
441#[derive(Default)]
443pub struct AdmissionMetrics {
444 pub total_requests: AtomicU64,
445 pub admitted_requests: AtomicU64,
446 pub rejected_requests: AtomicU64,
447 pub partial_requests: AtomicU64,
448 pub avg_queue_depth: AtomicU64,
449}
450
451impl AdmissionController {
452 pub fn new(config: AdmissionConfig) -> Self {
454 let mut global_buckets = HashMap::new();
455 for (dim, (capacity, rate)) in &config.global_limits {
456 global_buckets.insert(*dim, TokenBucket::new(*capacity, *rate));
457 }
458
459 Self {
460 config,
461 global_buckets,
462 tenants: RwLock::new(HashMap::new()),
463 queue_depth: AtomicU64::new(0),
464 metrics: AdmissionMetrics::default(),
465 }
466 }
467
468 pub fn register_tenant(&self, tenant_id: &str, weight: u32) {
470 let mut tenants = self.tenants.write();
471 if !tenants.contains_key(tenant_id) {
472 tenants.insert(
473 tenant_id.to_string(),
474 Arc::new(TenantQuota::new(tenant_id.to_string(), weight)),
475 );
476 }
477 }
478
479 fn get_tenant(&self, tenant_id: &str) -> Arc<TenantQuota> {
481 {
482 let tenants = self.tenants.read();
483 if let Some(tenant) = tenants.get(tenant_id) {
484 return tenant.clone();
485 }
486 }
487
488 let mut tenants = self.tenants.write();
490 tenants
491 .entry(tenant_id.to_string())
492 .or_insert_with(|| {
493 Arc::new(TenantQuota::new(
494 tenant_id.to_string(),
495 self.config.default_tenant_weight,
496 ))
497 })
498 .clone()
499 }
500
501 pub fn evaluate(
503 &self,
504 tenant_id: &str,
505 cost: OperationCost,
506 allow_partial: bool,
507 ) -> AdmissionDecision {
508 self.metrics.total_requests.fetch_add(1, Ordering::Relaxed);
509
510 let depth = self.queue_depth.load(Ordering::Relaxed);
512 if depth >= self.config.queue_depth_rejection as u64 {
513 self.metrics
514 .rejected_requests
515 .fetch_add(1, Ordering::Relaxed);
516 return AdmissionDecision::Reject {
517 reason: format!("Queue depth {} exceeds limit", depth),
518 retry_after_ms: Some(100),
519 };
520 }
521
522 let global_ok = self.check_global_limits(&cost);
524 if !global_ok {
525 self.metrics
526 .rejected_requests
527 .fetch_add(1, Ordering::Relaxed);
528 return AdmissionDecision::Reject {
529 reason: "Global resource limits exceeded".to_string(),
530 retry_after_ms: Some(50),
531 };
532 }
533
534 let tenant = self.get_tenant(tenant_id);
536 if tenant.try_acquire(&cost) {
537 self.queue_depth.fetch_add(1, Ordering::Relaxed);
538 self.metrics
539 .admitted_requests
540 .fetch_add(1, Ordering::Relaxed);
541 AdmissionDecision::Admit { cost }
542 } else if allow_partial && self.config.allow_partial_results {
543 let reduced_cost = OperationCost {
545 cpu_us: cost.cpu_us / 4,
546 random_iops: cost.random_iops / 4,
547 sequential_bytes: cost.sequential_bytes / 4,
548 memory_bytes: cost.memory_bytes / 4,
549 priority: cost.priority,
550 };
551 if tenant.try_acquire(&reduced_cost) {
552 self.queue_depth.fetch_add(1, Ordering::Relaxed);
553 self.metrics
554 .partial_requests
555 .fetch_add(1, Ordering::Relaxed);
556 AdmissionDecision::PartialAdmit {
557 cost: reduced_cost,
558 max_results: 25, recall_warning: "Results limited due to load - recall may be degraded"
560 .to_string(),
561 }
562 } else {
563 self.metrics
564 .rejected_requests
565 .fetch_add(1, Ordering::Relaxed);
566 AdmissionDecision::Reject {
567 reason: format!("Tenant {} quota exceeded", tenant_id),
568 retry_after_ms: Some(100),
569 }
570 }
571 } else {
572 self.metrics
573 .rejected_requests
574 .fetch_add(1, Ordering::Relaxed);
575 AdmissionDecision::Reject {
576 reason: format!("Tenant {} quota exceeded", tenant_id),
577 retry_after_ms: Some(100),
578 }
579 }
580 }
581
582 fn check_global_limits(&self, cost: &OperationCost) -> bool {
584 let cpu_ok = self
585 .global_buckets
586 .get(&ResourceDimension::Cpu)
587 .map(|b| b.available() >= cost.cpu_us as i64)
588 .unwrap_or(true);
589
590 let iops_ok = self
591 .global_buckets
592 .get(&ResourceDimension::RandomIops)
593 .map(|b| b.available() >= cost.random_iops as i64)
594 .unwrap_or(true);
595
596 cpu_ok && iops_ok
597 }
598
599 pub fn complete(&self, tenant_id: &str, cost: &OperationCost) {
601 self.queue_depth.fetch_sub(1, Ordering::Relaxed);
602 let tenant = self.get_tenant(tenant_id);
603 tenant.release(cost);
604 }
605
606 pub fn system_load(&self) -> SystemLoad {
608 let mut utilization = HashMap::new();
609 for (dim, bucket) in &self.global_buckets {
610 utilization.insert(*dim, bucket.utilization());
611 }
612
613 SystemLoad {
614 queue_depth: self.queue_depth.load(Ordering::Relaxed),
615 utilization,
616 total_requests: self.metrics.total_requests.load(Ordering::Relaxed),
617 admitted_requests: self.metrics.admitted_requests.load(Ordering::Relaxed),
618 rejected_requests: self.metrics.rejected_requests.load(Ordering::Relaxed),
619 }
620 }
621}
622
623#[derive(Debug)]
625pub struct SystemLoad {
626 pub queue_depth: u64,
627 pub utilization: HashMap<ResourceDimension, f64>,
628 pub total_requests: u64,
629 pub admitted_requests: u64,
630 pub rejected_requests: u64,
631}
632
633impl SystemLoad {
634 pub fn is_overloaded(&self) -> bool {
636 self.utilization.values().any(|&u| u > 0.9)
637 }
638
639 pub fn admission_rate(&self) -> f64 {
641 if self.total_requests == 0 {
642 1.0
643 } else {
644 self.admitted_requests as f64 / self.total_requests as f64
645 }
646 }
647}
648
649#[cfg(test)]
650mod tests {
651 use super::*;
652
653 #[test]
654 fn test_token_bucket_basic() {
655 let bucket = TokenBucket::new(100, 10);
656 assert_eq!(bucket.available(), 100);
657
658 assert!(bucket.try_acquire(50));
659 assert_eq!(bucket.available(), 50);
660
661 assert!(!bucket.try_acquire(60));
662 assert_eq!(bucket.available(), 50);
663
664 bucket.release(25);
665 assert_eq!(bucket.available(), 75);
666 }
667
668 #[test]
669 fn test_operation_cost_estimation() {
670 let point_read = OperationCost::point_read();
671 assert!(point_read.random_iops > 0);
672 assert_eq!(point_read.sequential_bytes, 0);
673
674 let scan = OperationCost::scan(1000, 100);
675 assert_eq!(scan.sequential_bytes, 100_000);
676 assert_eq!(scan.random_iops, 0);
677
678 let vector = OperationCost::vector_search(128, 64, 100);
679 assert!(vector.cpu_us > 0);
680 }
681
682 #[test]
683 fn test_admission_controller_basic() {
684 let controller = AdmissionController::new(AdmissionConfig::default());
685 controller.register_tenant("test", 100);
686
687 let cost = OperationCost::point_read();
688 let decision = controller.evaluate("test", cost.clone(), false);
689
690 assert!(matches!(decision, AdmissionDecision::Admit { .. }));
691
692 controller.complete("test", &cost);
693 }
694
695 #[test]
696 fn test_tenant_quota_exhaustion() {
697 let quota = TenantQuota::new("test".to_string(), 100);
698
699 let huge_cost = OperationCost {
701 cpu_us: 100_000_000, random_iops: 0,
703 sequential_bytes: 0,
704 memory_bytes: 0,
705 priority: 100,
706 };
707
708 assert!(!quota.try_acquire(&huge_cost));
709 }
710}