1use parking_lot::RwLock;
29use std::collections::HashMap;
30use std::fmt;
31use std::sync::Arc;
32use std::time::{Duration, Instant};
33
34#[derive(Debug, Clone)]
40pub struct ResourceQuota {
41 pub max_kernels: Option<u64>,
43 pub max_gpu_memory_mb: Option<u64>,
45 pub max_messages_per_sec: Option<u64>,
47 pub max_k2k_endpoints: Option<u64>,
49 pub max_pubsub_subscriptions: Option<u64>,
51 pub max_checkpoint_storage_mb: Option<u64>,
53 pub max_cpu_time_per_hour: Option<u64>,
55 pub max_api_requests_per_min: Option<u64>,
57}
58
59impl ResourceQuota {
60 pub fn new() -> Self {
62 Self {
63 max_kernels: None,
64 max_gpu_memory_mb: None,
65 max_messages_per_sec: None,
66 max_k2k_endpoints: None,
67 max_pubsub_subscriptions: None,
68 max_checkpoint_storage_mb: None,
69 max_cpu_time_per_hour: None,
70 max_api_requests_per_min: None,
71 }
72 }
73
74 pub fn unlimited() -> Self {
76 Self::new()
77 }
78
79 pub fn with_max_kernels(mut self, max: u64) -> Self {
81 self.max_kernels = Some(max);
82 self
83 }
84
85 pub fn with_max_gpu_memory_mb(mut self, max: u64) -> Self {
87 self.max_gpu_memory_mb = Some(max);
88 self
89 }
90
91 pub fn with_max_messages_per_sec(mut self, max: u64) -> Self {
93 self.max_messages_per_sec = Some(max);
94 self
95 }
96
97 pub fn with_max_k2k_endpoints(mut self, max: u64) -> Self {
99 self.max_k2k_endpoints = Some(max);
100 self
101 }
102
103 pub fn with_max_pubsub_subscriptions(mut self, max: u64) -> Self {
105 self.max_pubsub_subscriptions = Some(max);
106 self
107 }
108
109 pub fn with_max_checkpoint_storage_mb(mut self, max: u64) -> Self {
111 self.max_checkpoint_storage_mb = Some(max);
112 self
113 }
114
115 pub fn with_max_cpu_time_per_hour(mut self, max: u64) -> Self {
117 self.max_cpu_time_per_hour = Some(max);
118 self
119 }
120
121 pub fn with_max_api_requests_per_min(mut self, max: u64) -> Self {
123 self.max_api_requests_per_min = Some(max);
124 self
125 }
126
127 pub fn check_kernel_limit(&self, current: u64) -> bool {
129 self.max_kernels.map(|max| current < max).unwrap_or(true)
130 }
131
132 pub fn check_gpu_memory_limit(&self, current_mb: u64, requested_mb: u64) -> bool {
134 self.max_gpu_memory_mb
135 .map(|max| current_mb + requested_mb <= max)
136 .unwrap_or(true)
137 }
138
139 pub fn check_message_rate(&self, current_rate: u64) -> bool {
141 self.max_messages_per_sec
142 .map(|max| current_rate < max)
143 .unwrap_or(true)
144 }
145
146 pub fn tier_small() -> Self {
148 Self::new()
149 .with_max_kernels(10)
150 .with_max_gpu_memory_mb(2048)
151 .with_max_messages_per_sec(1000)
152 .with_max_k2k_endpoints(20)
153 .with_max_pubsub_subscriptions(50)
154 .with_max_checkpoint_storage_mb(1024)
155 .with_max_api_requests_per_min(100)
156 }
157
158 pub fn tier_medium() -> Self {
160 Self::new()
161 .with_max_kernels(50)
162 .with_max_gpu_memory_mb(8192)
163 .with_max_messages_per_sec(10000)
164 .with_max_k2k_endpoints(100)
165 .with_max_pubsub_subscriptions(200)
166 .with_max_checkpoint_storage_mb(10240)
167 .with_max_api_requests_per_min(1000)
168 }
169
170 pub fn tier_large() -> Self {
172 Self::new()
173 .with_max_kernels(200)
174 .with_max_gpu_memory_mb(32768)
175 .with_max_messages_per_sec(100000)
176 .with_max_k2k_endpoints(500)
177 .with_max_pubsub_subscriptions(1000)
178 .with_max_checkpoint_storage_mb(102400)
179 .with_max_api_requests_per_min(10000)
180 }
181}
182
183impl Default for ResourceQuota {
184 fn default() -> Self {
185 Self::new()
186 }
187}
188
189#[derive(Debug, Clone)]
195pub struct ResourceUsage {
196 pub kernels: u64,
198 pub gpu_memory_mb: u64,
200 pub messages_this_window: u64,
202 pub k2k_endpoints: u64,
204 pub pubsub_subscriptions: u64,
206 pub checkpoint_storage_mb: u64,
208 pub api_requests_this_window: u64,
210 pub window_start: Instant,
212}
213
214impl ResourceUsage {
215 pub fn new() -> Self {
217 Self {
218 kernels: 0,
219 gpu_memory_mb: 0,
220 messages_this_window: 0,
221 k2k_endpoints: 0,
222 pubsub_subscriptions: 0,
223 checkpoint_storage_mb: 0,
224 api_requests_this_window: 0,
225 window_start: Instant::now(),
226 }
227 }
228
229 pub fn reset_window(&mut self) {
231 self.messages_this_window = 0;
232 self.api_requests_this_window = 0;
233 self.window_start = Instant::now();
234 }
235
236 pub fn utilization(&self, quota: &ResourceQuota) -> QuotaUtilization {
238 QuotaUtilization {
239 kernel_pct: quota
240 .max_kernels
241 .map(|max| self.kernels as f64 / max as f64 * 100.0),
242 gpu_memory_pct: quota
243 .max_gpu_memory_mb
244 .map(|max| self.gpu_memory_mb as f64 / max as f64 * 100.0),
245 message_rate_pct: quota
246 .max_messages_per_sec
247 .map(|max| self.messages_this_window as f64 / max as f64 * 100.0),
248 }
249 }
250}
251
252impl Default for ResourceUsage {
253 fn default() -> Self {
254 Self::new()
255 }
256}
257
258#[derive(Debug, Clone)]
260pub struct QuotaUtilization {
261 pub kernel_pct: Option<f64>,
263 pub gpu_memory_pct: Option<f64>,
265 pub message_rate_pct: Option<f64>,
267}
268
269#[derive(Debug, Clone)]
275pub struct TenantContext {
276 pub tenant_id: String,
278 pub display_name: Option<String>,
280 pub metadata: HashMap<String, String>,
282 pub created_at: Instant,
284}
285
286impl TenantContext {
287 pub fn new(tenant_id: impl Into<String>) -> Self {
289 Self {
290 tenant_id: tenant_id.into(),
291 display_name: None,
292 metadata: HashMap::new(),
293 created_at: Instant::now(),
294 }
295 }
296
297 pub fn with_display_name(mut self, name: impl Into<String>) -> Self {
299 self.display_name = Some(name.into());
300 self
301 }
302
303 pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
305 self.metadata.insert(key.into(), value.into());
306 self
307 }
308
309 pub fn resource_name(&self, resource: &str) -> String {
311 format!("{}:{}", self.tenant_id, resource)
312 }
313}
314
315#[derive(Debug, Clone)]
321pub enum TenantError {
322 NotFound(String),
324 QuotaExceeded(String),
326 AlreadyExists(String),
328 Suspended(String),
330 Other(String),
332}
333
334impl fmt::Display for TenantError {
335 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
336 match self {
337 Self::NotFound(msg) => write!(f, "Tenant not found: {}", msg),
338 Self::QuotaExceeded(msg) => write!(f, "Quota exceeded: {}", msg),
339 Self::AlreadyExists(msg) => write!(f, "Tenant already exists: {}", msg),
340 Self::Suspended(msg) => write!(f, "Tenant suspended: {}", msg),
341 Self::Other(msg) => write!(f, "Tenant error: {}", msg),
342 }
343 }
344}
345
346impl std::error::Error for TenantError {}
347
348pub type TenantResult<T> = Result<T, TenantError>;
350
351struct TenantEntry {
353 _context: TenantContext,
355 quota: ResourceQuota,
357 usage: RwLock<ResourceUsage>,
359 active: std::sync::atomic::AtomicBool,
361 _registered_at: Instant,
363}
364
365pub struct TenantRegistry {
367 tenants: RwLock<HashMap<String, Arc<TenantEntry>>>,
369 default_quota: ResourceQuota,
371 window_duration: Duration,
373}
374
375impl TenantRegistry {
376 pub fn new() -> Self {
378 Self {
379 tenants: RwLock::new(HashMap::new()),
380 default_quota: ResourceQuota::tier_small(),
381 window_duration: Duration::from_secs(60), }
383 }
384
385 pub fn with_default_quota(mut self, quota: ResourceQuota) -> Self {
387 self.default_quota = quota;
388 self
389 }
390
391 pub fn with_window_duration(mut self, duration: Duration) -> Self {
393 self.window_duration = duration;
394 self
395 }
396
397 pub fn with_tenant(self, tenant_id: impl Into<String>, quota: ResourceQuota) -> Self {
399 let tenant_id = tenant_id.into();
400 let entry = TenantEntry {
401 _context: TenantContext::new(&tenant_id),
402 quota,
403 usage: RwLock::new(ResourceUsage::new()),
404 active: std::sync::atomic::AtomicBool::new(true),
405 _registered_at: Instant::now(),
406 };
407
408 self.tenants.write().insert(tenant_id, Arc::new(entry));
409 self
410 }
411
412 pub fn register_tenant(
414 &self,
415 tenant_id: impl Into<String>,
416 quota: ResourceQuota,
417 ) -> TenantResult<()> {
418 let tenant_id = tenant_id.into();
419 let mut tenants = self.tenants.write();
420
421 if tenants.contains_key(&tenant_id) {
422 return Err(TenantError::AlreadyExists(tenant_id));
423 }
424
425 let entry = TenantEntry {
426 _context: TenantContext::new(&tenant_id),
427 quota,
428 usage: RwLock::new(ResourceUsage::new()),
429 active: std::sync::atomic::AtomicBool::new(true),
430 _registered_at: Instant::now(),
431 };
432
433 tenants.insert(tenant_id, Arc::new(entry));
434 Ok(())
435 }
436
437 pub fn get_quota(&self, tenant_id: &str) -> Option<ResourceQuota> {
439 self.tenants.read().get(tenant_id).map(|e| e.quota.clone())
440 }
441
442 pub fn get_usage(&self, tenant_id: &str) -> Option<ResourceUsage> {
444 self.tenants
445 .read()
446 .get(tenant_id)
447 .map(|e| e.usage.read().clone())
448 }
449
450 pub fn tenant_exists(&self, tenant_id: &str) -> bool {
452 self.tenants.read().contains_key(tenant_id)
453 }
454
455 pub fn is_tenant_active(&self, tenant_id: &str) -> bool {
457 self.tenants
458 .read()
459 .get(tenant_id)
460 .map(|e| e.active.load(std::sync::atomic::Ordering::Acquire))
461 .unwrap_or(false)
462 }
463
464 pub fn suspend_tenant(&self, tenant_id: &str) -> TenantResult<()> {
466 let tenants = self.tenants.read();
467 let entry = tenants
468 .get(tenant_id)
469 .ok_or_else(|| TenantError::NotFound(tenant_id.to_string()))?;
470 entry
471 .active
472 .store(false, std::sync::atomic::Ordering::Release);
473 Ok(())
474 }
475
476 pub fn resume_tenant(&self, tenant_id: &str) -> TenantResult<()> {
478 let tenants = self.tenants.read();
479 let entry = tenants
480 .get(tenant_id)
481 .ok_or_else(|| TenantError::NotFound(tenant_id.to_string()))?;
482 entry
483 .active
484 .store(true, std::sync::atomic::Ordering::Release);
485 Ok(())
486 }
487
488 pub fn try_allocate_kernel(&self, tenant_id: &str) -> TenantResult<()> {
490 let tenants = self.tenants.read();
491 let entry = tenants
492 .get(tenant_id)
493 .ok_or_else(|| TenantError::NotFound(tenant_id.to_string()))?;
494
495 if !entry.active.load(std::sync::atomic::Ordering::Acquire) {
496 return Err(TenantError::Suspended(tenant_id.to_string()));
497 }
498
499 let mut usage = entry.usage.write();
500 if !entry.quota.check_kernel_limit(usage.kernels) {
501 return Err(TenantError::QuotaExceeded(format!(
502 "Kernel limit reached: {}/{}",
503 usage.kernels,
504 entry.quota.max_kernels.unwrap_or(0)
505 )));
506 }
507
508 usage.kernels += 1;
509 Ok(())
510 }
511
512 pub fn release_kernel(&self, tenant_id: &str) {
514 if let Some(entry) = self.tenants.read().get(tenant_id) {
515 let mut usage = entry.usage.write();
516 usage.kernels = usage.kernels.saturating_sub(1);
517 }
518 }
519
520 pub fn try_allocate_gpu_memory(&self, tenant_id: &str, mb: u64) -> TenantResult<()> {
522 let tenants = self.tenants.read();
523 let entry = tenants
524 .get(tenant_id)
525 .ok_or_else(|| TenantError::NotFound(tenant_id.to_string()))?;
526
527 if !entry.active.load(std::sync::atomic::Ordering::Acquire) {
528 return Err(TenantError::Suspended(tenant_id.to_string()));
529 }
530
531 let mut usage = entry.usage.write();
532 if !entry.quota.check_gpu_memory_limit(usage.gpu_memory_mb, mb) {
533 return Err(TenantError::QuotaExceeded(format!(
534 "GPU memory limit reached: {}MB + {}MB > {}MB",
535 usage.gpu_memory_mb,
536 mb,
537 entry.quota.max_gpu_memory_mb.unwrap_or(0)
538 )));
539 }
540
541 usage.gpu_memory_mb += mb;
542 Ok(())
543 }
544
545 pub fn release_gpu_memory(&self, tenant_id: &str, mb: u64) {
547 if let Some(entry) = self.tenants.read().get(tenant_id) {
548 let mut usage = entry.usage.write();
549 usage.gpu_memory_mb = usage.gpu_memory_mb.saturating_sub(mb);
550 }
551 }
552
553 pub fn record_message(&self, tenant_id: &str) -> TenantResult<()> {
555 let tenants = self.tenants.read();
556 let entry = tenants
557 .get(tenant_id)
558 .ok_or_else(|| TenantError::NotFound(tenant_id.to_string()))?;
559
560 if !entry.active.load(std::sync::atomic::Ordering::Acquire) {
561 return Err(TenantError::Suspended(tenant_id.to_string()));
562 }
563
564 let mut usage = entry.usage.write();
565
566 if usage.window_start.elapsed() >= self.window_duration {
568 usage.reset_window();
569 }
570
571 if !entry.quota.check_message_rate(usage.messages_this_window) {
572 return Err(TenantError::QuotaExceeded(format!(
573 "Message rate limit reached: {}/{} per {:?}",
574 usage.messages_this_window,
575 entry.quota.max_messages_per_sec.unwrap_or(0),
576 self.window_duration
577 )));
578 }
579
580 usage.messages_this_window += 1;
581 Ok(())
582 }
583
584 pub fn get_utilization(&self, tenant_id: &str) -> Option<QuotaUtilization> {
586 self.tenants
587 .read()
588 .get(tenant_id)
589 .map(|entry| entry.usage.read().utilization(&entry.quota))
590 }
591
592 pub fn tenant_ids(&self) -> Vec<String> {
594 self.tenants.read().keys().cloned().collect()
595 }
596
597 pub fn tenant_count(&self) -> usize {
599 self.tenants.read().len()
600 }
601}
602
603impl Default for TenantRegistry {
604 fn default() -> Self {
605 Self::new()
606 }
607}
608
609#[cfg(test)]
614mod tests {
615 use super::*;
616
617 #[test]
618 fn test_resource_quota() {
619 let quota = ResourceQuota::new()
620 .with_max_kernels(10)
621 .with_max_gpu_memory_mb(8192);
622
623 assert!(quota.check_kernel_limit(5));
624 assert!(quota.check_kernel_limit(9));
625 assert!(!quota.check_kernel_limit(10));
626
627 assert!(quota.check_gpu_memory_limit(4096, 2048));
628 assert!(!quota.check_gpu_memory_limit(8192, 1));
629 }
630
631 #[test]
632 fn test_tier_quotas() {
633 let small = ResourceQuota::tier_small();
634 assert_eq!(small.max_kernels, Some(10));
635 assert_eq!(small.max_gpu_memory_mb, Some(2048));
636
637 let large = ResourceQuota::tier_large();
638 assert_eq!(large.max_kernels, Some(200));
639 assert_eq!(large.max_gpu_memory_mb, Some(32768));
640 }
641
642 #[test]
643 fn test_tenant_context() {
644 let ctx = TenantContext::new("tenant_a")
645 .with_display_name("Tenant A")
646 .with_metadata("tier", "enterprise");
647
648 assert_eq!(ctx.tenant_id, "tenant_a");
649 assert_eq!(ctx.display_name, Some("Tenant A".to_string()));
650 assert_eq!(ctx.resource_name("kernel_1"), "tenant_a:kernel_1");
651 }
652
653 #[test]
654 fn test_tenant_registry() {
655 let registry = TenantRegistry::new()
656 .with_tenant("tenant_a", ResourceQuota::tier_small())
657 .with_tenant("tenant_b", ResourceQuota::tier_medium());
658
659 assert!(registry.tenant_exists("tenant_a"));
660 assert!(registry.tenant_exists("tenant_b"));
661 assert!(!registry.tenant_exists("tenant_c"));
662
663 let quota_a = registry.get_quota("tenant_a").unwrap();
664 assert_eq!(quota_a.max_kernels, Some(10));
665
666 let quota_b = registry.get_quota("tenant_b").unwrap();
667 assert_eq!(quota_b.max_kernels, Some(50));
668 }
669
670 #[test]
671 fn test_kernel_allocation() {
672 let registry =
673 TenantRegistry::new().with_tenant("tenant_a", ResourceQuota::new().with_max_kernels(2));
674
675 assert!(registry.try_allocate_kernel("tenant_a").is_ok());
677 assert!(registry.try_allocate_kernel("tenant_a").is_ok());
678
679 assert!(registry.try_allocate_kernel("tenant_a").is_err());
681
682 registry.release_kernel("tenant_a");
684
685 assert!(registry.try_allocate_kernel("tenant_a").is_ok());
687 }
688
689 #[test]
690 fn test_gpu_memory_allocation() {
691 let registry = TenantRegistry::new().with_tenant(
692 "tenant_a",
693 ResourceQuota::new().with_max_gpu_memory_mb(1024),
694 );
695
696 assert!(registry.try_allocate_gpu_memory("tenant_a", 512).is_ok());
697 assert!(registry.try_allocate_gpu_memory("tenant_a", 256).is_ok());
698 assert!(registry.try_allocate_gpu_memory("tenant_a", 512).is_err());
700
701 registry.release_gpu_memory("tenant_a", 256);
703 assert!(registry.try_allocate_gpu_memory("tenant_a", 512).is_ok());
704 }
705
706 #[test]
707 fn test_utilization() {
708 let quota = ResourceQuota::new()
709 .with_max_kernels(100)
710 .with_max_gpu_memory_mb(8192);
711
712 let mut usage = ResourceUsage::new();
713 usage.kernels = 50;
714 usage.gpu_memory_mb = 4096;
715
716 let utilization = usage.utilization("a);
717 assert_eq!(utilization.kernel_pct, Some(50.0));
718 assert_eq!(utilization.gpu_memory_pct, Some(50.0));
719 }
720
721 #[test]
722 fn test_unknown_tenant() {
723 let registry = TenantRegistry::new();
724
725 assert!(registry.try_allocate_kernel("unknown").is_err());
726 assert!(registry.get_quota("unknown").is_none());
727 }
728
729 #[test]
730 fn test_suspend_and_resume_tenant() {
731 let registry = TenantRegistry::new()
732 .with_tenant("tenant_a", ResourceQuota::new().with_max_kernels(10));
733
734 assert!(registry.is_tenant_active("tenant_a"));
735 assert!(registry.try_allocate_kernel("tenant_a").is_ok());
736
737 registry.suspend_tenant("tenant_a").unwrap();
739 assert!(!registry.is_tenant_active("tenant_a"));
740
741 assert!(matches!(
743 registry.try_allocate_kernel("tenant_a"),
744 Err(TenantError::Suspended(_))
745 ));
746 assert!(matches!(
747 registry.try_allocate_gpu_memory("tenant_a", 100),
748 Err(TenantError::Suspended(_))
749 ));
750 assert!(matches!(
751 registry.record_message("tenant_a"),
752 Err(TenantError::Suspended(_))
753 ));
754
755 registry.resume_tenant("tenant_a").unwrap();
757 assert!(registry.is_tenant_active("tenant_a"));
758 assert!(registry.try_allocate_kernel("tenant_a").is_ok());
759 }
760
761 #[test]
762 fn test_suspend_unknown_tenant() {
763 let registry = TenantRegistry::new();
764 assert!(matches!(
765 registry.suspend_tenant("unknown"),
766 Err(TenantError::NotFound(_))
767 ));
768 assert!(matches!(
769 registry.resume_tenant("unknown"),
770 Err(TenantError::NotFound(_))
771 ));
772 }
773}