1use parking_lot::RwLock;
29use std::collections::HashMap;
30use std::fmt;
31use std::sync::Arc;
32use std::time::{Duration, Instant};
33
34#[derive(Debug, Clone)]
40pub struct ResourceQuota {
41 pub max_kernels: Option<u64>,
43 pub max_gpu_memory_mb: Option<u64>,
45 pub max_messages_per_sec: Option<u64>,
47 pub max_k2k_endpoints: Option<u64>,
49 pub max_pubsub_subscriptions: Option<u64>,
51 pub max_checkpoint_storage_mb: Option<u64>,
53 pub max_cpu_time_per_hour: Option<u64>,
55 pub max_api_requests_per_min: Option<u64>,
57}
58
59impl ResourceQuota {
60 pub fn new() -> Self {
62 Self {
63 max_kernels: None,
64 max_gpu_memory_mb: None,
65 max_messages_per_sec: None,
66 max_k2k_endpoints: None,
67 max_pubsub_subscriptions: None,
68 max_checkpoint_storage_mb: None,
69 max_cpu_time_per_hour: None,
70 max_api_requests_per_min: None,
71 }
72 }
73
74 pub fn unlimited() -> Self {
76 Self::new()
77 }
78
79 pub fn with_max_kernels(mut self, max: u64) -> Self {
81 self.max_kernels = Some(max);
82 self
83 }
84
85 pub fn with_max_gpu_memory_mb(mut self, max: u64) -> Self {
87 self.max_gpu_memory_mb = Some(max);
88 self
89 }
90
91 pub fn with_max_messages_per_sec(mut self, max: u64) -> Self {
93 self.max_messages_per_sec = Some(max);
94 self
95 }
96
97 pub fn with_max_k2k_endpoints(mut self, max: u64) -> Self {
99 self.max_k2k_endpoints = Some(max);
100 self
101 }
102
103 pub fn with_max_pubsub_subscriptions(mut self, max: u64) -> Self {
105 self.max_pubsub_subscriptions = Some(max);
106 self
107 }
108
109 pub fn with_max_checkpoint_storage_mb(mut self, max: u64) -> Self {
111 self.max_checkpoint_storage_mb = Some(max);
112 self
113 }
114
115 pub fn with_max_cpu_time_per_hour(mut self, max: u64) -> Self {
117 self.max_cpu_time_per_hour = Some(max);
118 self
119 }
120
121 pub fn with_max_api_requests_per_min(mut self, max: u64) -> Self {
123 self.max_api_requests_per_min = Some(max);
124 self
125 }
126
127 pub fn check_kernel_limit(&self, current: u64) -> bool {
129 self.max_kernels.map(|max| current < max).unwrap_or(true)
130 }
131
132 pub fn check_gpu_memory_limit(&self, current_mb: u64, requested_mb: u64) -> bool {
134 self.max_gpu_memory_mb
135 .map(|max| current_mb + requested_mb <= max)
136 .unwrap_or(true)
137 }
138
139 pub fn check_message_rate(&self, current_rate: u64) -> bool {
141 self.max_messages_per_sec
142 .map(|max| current_rate < max)
143 .unwrap_or(true)
144 }
145
146 pub fn tier_small() -> Self {
148 Self::new()
149 .with_max_kernels(10)
150 .with_max_gpu_memory_mb(2048)
151 .with_max_messages_per_sec(1000)
152 .with_max_k2k_endpoints(20)
153 .with_max_pubsub_subscriptions(50)
154 .with_max_checkpoint_storage_mb(1024)
155 .with_max_api_requests_per_min(100)
156 }
157
158 pub fn tier_medium() -> Self {
160 Self::new()
161 .with_max_kernels(50)
162 .with_max_gpu_memory_mb(8192)
163 .with_max_messages_per_sec(10000)
164 .with_max_k2k_endpoints(100)
165 .with_max_pubsub_subscriptions(200)
166 .with_max_checkpoint_storage_mb(10240)
167 .with_max_api_requests_per_min(1000)
168 }
169
170 pub fn tier_large() -> Self {
172 Self::new()
173 .with_max_kernels(200)
174 .with_max_gpu_memory_mb(32768)
175 .with_max_messages_per_sec(100000)
176 .with_max_k2k_endpoints(500)
177 .with_max_pubsub_subscriptions(1000)
178 .with_max_checkpoint_storage_mb(102400)
179 .with_max_api_requests_per_min(10000)
180 }
181}
182
183impl Default for ResourceQuota {
184 fn default() -> Self {
185 Self::new()
186 }
187}
188
189#[derive(Debug, Clone)]
195pub struct ResourceUsage {
196 pub kernels: u64,
198 pub gpu_memory_mb: u64,
200 pub messages_this_window: u64,
202 pub k2k_endpoints: u64,
204 pub pubsub_subscriptions: u64,
206 pub checkpoint_storage_mb: u64,
208 pub api_requests_this_window: u64,
210 pub window_start: Instant,
212}
213
214impl ResourceUsage {
215 pub fn new() -> Self {
217 Self {
218 kernels: 0,
219 gpu_memory_mb: 0,
220 messages_this_window: 0,
221 k2k_endpoints: 0,
222 pubsub_subscriptions: 0,
223 checkpoint_storage_mb: 0,
224 api_requests_this_window: 0,
225 window_start: Instant::now(),
226 }
227 }
228
229 pub fn reset_window(&mut self) {
231 self.messages_this_window = 0;
232 self.api_requests_this_window = 0;
233 self.window_start = Instant::now();
234 }
235
236 pub fn utilization(&self, quota: &ResourceQuota) -> QuotaUtilization {
238 QuotaUtilization {
239 kernel_pct: quota
240 .max_kernels
241 .map(|max| self.kernels as f64 / max as f64 * 100.0),
242 gpu_memory_pct: quota
243 .max_gpu_memory_mb
244 .map(|max| self.gpu_memory_mb as f64 / max as f64 * 100.0),
245 message_rate_pct: quota
246 .max_messages_per_sec
247 .map(|max| self.messages_this_window as f64 / max as f64 * 100.0),
248 }
249 }
250}
251
252impl Default for ResourceUsage {
253 fn default() -> Self {
254 Self::new()
255 }
256}
257
258#[derive(Debug, Clone)]
260pub struct QuotaUtilization {
261 pub kernel_pct: Option<f64>,
263 pub gpu_memory_pct: Option<f64>,
265 pub message_rate_pct: Option<f64>,
267}
268
269#[derive(Debug, Clone)]
275pub struct TenantContext {
276 pub tenant_id: String,
278 pub display_name: Option<String>,
280 pub metadata: HashMap<String, String>,
282 pub created_at: Instant,
284}
285
286impl TenantContext {
287 pub fn new(tenant_id: impl Into<String>) -> Self {
289 Self {
290 tenant_id: tenant_id.into(),
291 display_name: None,
292 metadata: HashMap::new(),
293 created_at: Instant::now(),
294 }
295 }
296
297 pub fn with_display_name(mut self, name: impl Into<String>) -> Self {
299 self.display_name = Some(name.into());
300 self
301 }
302
303 pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
305 self.metadata.insert(key.into(), value.into());
306 self
307 }
308
309 pub fn resource_name(&self, resource: &str) -> String {
311 format!("{}:{}", self.tenant_id, resource)
312 }
313}
314
315#[derive(Debug, Clone)]
321pub enum TenantError {
322 NotFound(String),
324 QuotaExceeded(String),
326 AlreadyExists(String),
328 Suspended(String),
330 Other(String),
332}
333
334impl fmt::Display for TenantError {
335 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
336 match self {
337 Self::NotFound(msg) => write!(f, "Tenant not found: {}", msg),
338 Self::QuotaExceeded(msg) => write!(f, "Quota exceeded: {}", msg),
339 Self::AlreadyExists(msg) => write!(f, "Tenant already exists: {}", msg),
340 Self::Suspended(msg) => write!(f, "Tenant suspended: {}", msg),
341 Self::Other(msg) => write!(f, "Tenant error: {}", msg),
342 }
343 }
344}
345
346impl std::error::Error for TenantError {}
347
348pub type TenantResult<T> = Result<T, TenantError>;
350
351struct TenantEntry {
353 _context: TenantContext,
355 quota: ResourceQuota,
357 usage: RwLock<ResourceUsage>,
359 active: bool,
361 _registered_at: Instant,
363}
364
365pub struct TenantRegistry {
367 tenants: RwLock<HashMap<String, Arc<TenantEntry>>>,
369 default_quota: ResourceQuota,
371 window_duration: Duration,
373}
374
375impl TenantRegistry {
376 pub fn new() -> Self {
378 Self {
379 tenants: RwLock::new(HashMap::new()),
380 default_quota: ResourceQuota::tier_small(),
381 window_duration: Duration::from_secs(60), }
383 }
384
385 pub fn with_default_quota(mut self, quota: ResourceQuota) -> Self {
387 self.default_quota = quota;
388 self
389 }
390
391 pub fn with_window_duration(mut self, duration: Duration) -> Self {
393 self.window_duration = duration;
394 self
395 }
396
397 pub fn with_tenant(self, tenant_id: impl Into<String>, quota: ResourceQuota) -> Self {
399 let tenant_id = tenant_id.into();
400 let entry = TenantEntry {
401 _context: TenantContext::new(&tenant_id),
402 quota,
403 usage: RwLock::new(ResourceUsage::new()),
404 active: true,
405 _registered_at: Instant::now(),
406 };
407
408 self.tenants.write().insert(tenant_id, Arc::new(entry));
409 self
410 }
411
412 pub fn register_tenant(
414 &self,
415 tenant_id: impl Into<String>,
416 quota: ResourceQuota,
417 ) -> TenantResult<()> {
418 let tenant_id = tenant_id.into();
419 let mut tenants = self.tenants.write();
420
421 if tenants.contains_key(&tenant_id) {
422 return Err(TenantError::AlreadyExists(tenant_id));
423 }
424
425 let entry = TenantEntry {
426 _context: TenantContext::new(&tenant_id),
427 quota,
428 usage: RwLock::new(ResourceUsage::new()),
429 active: true,
430 _registered_at: Instant::now(),
431 };
432
433 tenants.insert(tenant_id, Arc::new(entry));
434 Ok(())
435 }
436
437 pub fn get_quota(&self, tenant_id: &str) -> Option<ResourceQuota> {
439 self.tenants.read().get(tenant_id).map(|e| e.quota.clone())
440 }
441
442 pub fn get_usage(&self, tenant_id: &str) -> Option<ResourceUsage> {
444 self.tenants
445 .read()
446 .get(tenant_id)
447 .map(|e| e.usage.read().clone())
448 }
449
450 pub fn tenant_exists(&self, tenant_id: &str) -> bool {
452 self.tenants.read().contains_key(tenant_id)
453 }
454
455 pub fn is_tenant_active(&self, tenant_id: &str) -> bool {
457 self.tenants
458 .read()
459 .get(tenant_id)
460 .map(|e| e.active)
461 .unwrap_or(false)
462 }
463
464 pub fn suspend_tenant(&self, tenant_id: &str) -> TenantResult<()> {
466 let tenants = self.tenants.read();
467 let _entry = tenants
468 .get(tenant_id)
469 .ok_or_else(|| TenantError::NotFound(tenant_id.to_string()))?;
470 Ok(())
473 }
474
475 pub fn try_allocate_kernel(&self, tenant_id: &str) -> TenantResult<()> {
477 let tenants = self.tenants.read();
478 let entry = tenants
479 .get(tenant_id)
480 .ok_or_else(|| TenantError::NotFound(tenant_id.to_string()))?;
481
482 if !entry.active {
483 return Err(TenantError::Suspended(tenant_id.to_string()));
484 }
485
486 let mut usage = entry.usage.write();
487 if !entry.quota.check_kernel_limit(usage.kernels) {
488 return Err(TenantError::QuotaExceeded(format!(
489 "Kernel limit reached: {}/{}",
490 usage.kernels,
491 entry.quota.max_kernels.unwrap_or(0)
492 )));
493 }
494
495 usage.kernels += 1;
496 Ok(())
497 }
498
499 pub fn release_kernel(&self, tenant_id: &str) {
501 if let Some(entry) = self.tenants.read().get(tenant_id) {
502 let mut usage = entry.usage.write();
503 usage.kernels = usage.kernels.saturating_sub(1);
504 }
505 }
506
507 pub fn try_allocate_gpu_memory(&self, tenant_id: &str, mb: u64) -> TenantResult<()> {
509 let tenants = self.tenants.read();
510 let entry = tenants
511 .get(tenant_id)
512 .ok_or_else(|| TenantError::NotFound(tenant_id.to_string()))?;
513
514 if !entry.active {
515 return Err(TenantError::Suspended(tenant_id.to_string()));
516 }
517
518 let mut usage = entry.usage.write();
519 if !entry.quota.check_gpu_memory_limit(usage.gpu_memory_mb, mb) {
520 return Err(TenantError::QuotaExceeded(format!(
521 "GPU memory limit reached: {}MB + {}MB > {}MB",
522 usage.gpu_memory_mb,
523 mb,
524 entry.quota.max_gpu_memory_mb.unwrap_or(0)
525 )));
526 }
527
528 usage.gpu_memory_mb += mb;
529 Ok(())
530 }
531
532 pub fn release_gpu_memory(&self, tenant_id: &str, mb: u64) {
534 if let Some(entry) = self.tenants.read().get(tenant_id) {
535 let mut usage = entry.usage.write();
536 usage.gpu_memory_mb = usage.gpu_memory_mb.saturating_sub(mb);
537 }
538 }
539
540 pub fn record_message(&self, tenant_id: &str) -> TenantResult<()> {
542 let tenants = self.tenants.read();
543 let entry = tenants
544 .get(tenant_id)
545 .ok_or_else(|| TenantError::NotFound(tenant_id.to_string()))?;
546
547 if !entry.active {
548 return Err(TenantError::Suspended(tenant_id.to_string()));
549 }
550
551 let mut usage = entry.usage.write();
552
553 if usage.window_start.elapsed() >= self.window_duration {
555 usage.reset_window();
556 }
557
558 if !entry.quota.check_message_rate(usage.messages_this_window) {
559 return Err(TenantError::QuotaExceeded(format!(
560 "Message rate limit reached: {}/{} per {:?}",
561 usage.messages_this_window,
562 entry.quota.max_messages_per_sec.unwrap_or(0),
563 self.window_duration
564 )));
565 }
566
567 usage.messages_this_window += 1;
568 Ok(())
569 }
570
571 pub fn get_utilization(&self, tenant_id: &str) -> Option<QuotaUtilization> {
573 self.tenants
574 .read()
575 .get(tenant_id)
576 .map(|entry| entry.usage.read().utilization(&entry.quota))
577 }
578
579 pub fn tenant_ids(&self) -> Vec<String> {
581 self.tenants.read().keys().cloned().collect()
582 }
583
584 pub fn tenant_count(&self) -> usize {
586 self.tenants.read().len()
587 }
588}
589
590impl Default for TenantRegistry {
591 fn default() -> Self {
592 Self::new()
593 }
594}
595
596#[cfg(test)]
601mod tests {
602 use super::*;
603
604 #[test]
605 fn test_resource_quota() {
606 let quota = ResourceQuota::new()
607 .with_max_kernels(10)
608 .with_max_gpu_memory_mb(8192);
609
610 assert!(quota.check_kernel_limit(5));
611 assert!(quota.check_kernel_limit(9));
612 assert!(!quota.check_kernel_limit(10));
613
614 assert!(quota.check_gpu_memory_limit(4096, 2048));
615 assert!(!quota.check_gpu_memory_limit(8192, 1));
616 }
617
618 #[test]
619 fn test_tier_quotas() {
620 let small = ResourceQuota::tier_small();
621 assert_eq!(small.max_kernels, Some(10));
622 assert_eq!(small.max_gpu_memory_mb, Some(2048));
623
624 let large = ResourceQuota::tier_large();
625 assert_eq!(large.max_kernels, Some(200));
626 assert_eq!(large.max_gpu_memory_mb, Some(32768));
627 }
628
629 #[test]
630 fn test_tenant_context() {
631 let ctx = TenantContext::new("tenant_a")
632 .with_display_name("Tenant A")
633 .with_metadata("tier", "enterprise");
634
635 assert_eq!(ctx.tenant_id, "tenant_a");
636 assert_eq!(ctx.display_name, Some("Tenant A".to_string()));
637 assert_eq!(ctx.resource_name("kernel_1"), "tenant_a:kernel_1");
638 }
639
640 #[test]
641 fn test_tenant_registry() {
642 let registry = TenantRegistry::new()
643 .with_tenant("tenant_a", ResourceQuota::tier_small())
644 .with_tenant("tenant_b", ResourceQuota::tier_medium());
645
646 assert!(registry.tenant_exists("tenant_a"));
647 assert!(registry.tenant_exists("tenant_b"));
648 assert!(!registry.tenant_exists("tenant_c"));
649
650 let quota_a = registry.get_quota("tenant_a").unwrap();
651 assert_eq!(quota_a.max_kernels, Some(10));
652
653 let quota_b = registry.get_quota("tenant_b").unwrap();
654 assert_eq!(quota_b.max_kernels, Some(50));
655 }
656
657 #[test]
658 fn test_kernel_allocation() {
659 let registry =
660 TenantRegistry::new().with_tenant("tenant_a", ResourceQuota::new().with_max_kernels(2));
661
662 assert!(registry.try_allocate_kernel("tenant_a").is_ok());
664 assert!(registry.try_allocate_kernel("tenant_a").is_ok());
665
666 assert!(registry.try_allocate_kernel("tenant_a").is_err());
668
669 registry.release_kernel("tenant_a");
671
672 assert!(registry.try_allocate_kernel("tenant_a").is_ok());
674 }
675
676 #[test]
677 fn test_gpu_memory_allocation() {
678 let registry = TenantRegistry::new().with_tenant(
679 "tenant_a",
680 ResourceQuota::new().with_max_gpu_memory_mb(1024),
681 );
682
683 assert!(registry.try_allocate_gpu_memory("tenant_a", 512).is_ok());
684 assert!(registry.try_allocate_gpu_memory("tenant_a", 256).is_ok());
685 assert!(registry.try_allocate_gpu_memory("tenant_a", 512).is_err());
687
688 registry.release_gpu_memory("tenant_a", 256);
690 assert!(registry.try_allocate_gpu_memory("tenant_a", 512).is_ok());
691 }
692
693 #[test]
694 fn test_utilization() {
695 let quota = ResourceQuota::new()
696 .with_max_kernels(100)
697 .with_max_gpu_memory_mb(8192);
698
699 let mut usage = ResourceUsage::new();
700 usage.kernels = 50;
701 usage.gpu_memory_mb = 4096;
702
703 let utilization = usage.utilization("a);
704 assert_eq!(utilization.kernel_pct, Some(50.0));
705 assert_eq!(utilization.gpu_memory_pct, Some(50.0));
706 }
707
708 #[test]
709 fn test_unknown_tenant() {
710 let registry = TenantRegistry::new();
711
712 assert!(registry.try_allocate_kernel("unknown").is_err());
713 assert!(registry.get_quota("unknown").is_none());
714 }
715}