Skip to main content

ringkernel_core/
tenancy.rs

1//! Multi-tenancy support for RingKernel.
2//!
3//! This module provides tenant isolation, resource quotas, and tenant-aware
4//! operations for multi-tenant deployments.
5//!
6//! # Example
7//!
8//! ```rust,ignore
9//! use ringkernel_core::tenancy::{TenantContext, TenantRegistry, ResourceQuota};
10//!
11//! let registry = TenantRegistry::new()
12//!     .with_tenant("tenant_a", ResourceQuota::new()
13//!         .with_max_kernels(100)
14//!         .with_max_gpu_memory_mb(8192)
15//!         .with_max_messages_per_sec(10000))
16//!     .with_tenant("tenant_b", ResourceQuota::new()
17//!         .with_max_kernels(50)
18//!         .with_max_gpu_memory_mb(4096));
19//!
20//! let ctx = TenantContext::new("tenant_a");
21//! if let Some(quota) = registry.get_quota(&ctx.tenant_id) {
22//!     if quota.check_kernel_limit(current_kernels) {
23//!         // Launch kernel
24//!     }
25//! }
26//! ```
27
28use parking_lot::RwLock;
29use std::collections::HashMap;
30use std::fmt;
31use std::sync::Arc;
32use std::time::{Duration, Instant};
33
34// ============================================================================
35// RESOURCE QUOTA
36// ============================================================================
37
38/// Resource quotas for a tenant.
39#[derive(Debug, Clone)]
40pub struct ResourceQuota {
41    /// Maximum number of concurrent kernels.
42    pub max_kernels: Option<u64>,
43    /// Maximum GPU memory in megabytes.
44    pub max_gpu_memory_mb: Option<u64>,
45    /// Maximum messages per second.
46    pub max_messages_per_sec: Option<u64>,
47    /// Maximum K2K endpoints.
48    pub max_k2k_endpoints: Option<u64>,
49    /// Maximum PubSub subscriptions.
50    pub max_pubsub_subscriptions: Option<u64>,
51    /// Maximum checkpoint storage in megabytes.
52    pub max_checkpoint_storage_mb: Option<u64>,
53    /// Maximum CPU time per hour (seconds).
54    pub max_cpu_time_per_hour: Option<u64>,
55    /// Maximum API requests per minute.
56    pub max_api_requests_per_min: Option<u64>,
57}
58
59impl ResourceQuota {
60    /// Create a new resource quota with no limits.
61    pub fn new() -> Self {
62        Self {
63            max_kernels: None,
64            max_gpu_memory_mb: None,
65            max_messages_per_sec: None,
66            max_k2k_endpoints: None,
67            max_pubsub_subscriptions: None,
68            max_checkpoint_storage_mb: None,
69            max_cpu_time_per_hour: None,
70            max_api_requests_per_min: None,
71        }
72    }
73
74    /// Create an unlimited quota.
75    pub fn unlimited() -> Self {
76        Self::new()
77    }
78
79    /// Set maximum kernels.
80    pub fn with_max_kernels(mut self, max: u64) -> Self {
81        self.max_kernels = Some(max);
82        self
83    }
84
85    /// Set maximum GPU memory (MB).
86    pub fn with_max_gpu_memory_mb(mut self, max: u64) -> Self {
87        self.max_gpu_memory_mb = Some(max);
88        self
89    }
90
91    /// Set maximum messages per second.
92    pub fn with_max_messages_per_sec(mut self, max: u64) -> Self {
93        self.max_messages_per_sec = Some(max);
94        self
95    }
96
97    /// Set maximum K2K endpoints.
98    pub fn with_max_k2k_endpoints(mut self, max: u64) -> Self {
99        self.max_k2k_endpoints = Some(max);
100        self
101    }
102
103    /// Set maximum PubSub subscriptions.
104    pub fn with_max_pubsub_subscriptions(mut self, max: u64) -> Self {
105        self.max_pubsub_subscriptions = Some(max);
106        self
107    }
108
109    /// Set maximum checkpoint storage (MB).
110    pub fn with_max_checkpoint_storage_mb(mut self, max: u64) -> Self {
111        self.max_checkpoint_storage_mb = Some(max);
112        self
113    }
114
115    /// Set maximum CPU time per hour (seconds).
116    pub fn with_max_cpu_time_per_hour(mut self, max: u64) -> Self {
117        self.max_cpu_time_per_hour = Some(max);
118        self
119    }
120
121    /// Set maximum API requests per minute.
122    pub fn with_max_api_requests_per_min(mut self, max: u64) -> Self {
123        self.max_api_requests_per_min = Some(max);
124        self
125    }
126
127    /// Check if kernel limit allows another kernel.
128    pub fn check_kernel_limit(&self, current: u64) -> bool {
129        self.max_kernels.map(|max| current < max).unwrap_or(true)
130    }
131
132    /// Check if GPU memory limit allows allocation.
133    pub fn check_gpu_memory_limit(&self, current_mb: u64, requested_mb: u64) -> bool {
134        self.max_gpu_memory_mb
135            .map(|max| current_mb + requested_mb <= max)
136            .unwrap_or(true)
137    }
138
139    /// Check if message rate limit allows another message.
140    pub fn check_message_rate(&self, current_rate: u64) -> bool {
141        self.max_messages_per_sec
142            .map(|max| current_rate < max)
143            .unwrap_or(true)
144    }
145
146    /// Create a standard small tier quota.
147    pub fn tier_small() -> Self {
148        Self::new()
149            .with_max_kernels(10)
150            .with_max_gpu_memory_mb(2048)
151            .with_max_messages_per_sec(1000)
152            .with_max_k2k_endpoints(20)
153            .with_max_pubsub_subscriptions(50)
154            .with_max_checkpoint_storage_mb(1024)
155            .with_max_api_requests_per_min(100)
156    }
157
158    /// Create a standard medium tier quota.
159    pub fn tier_medium() -> Self {
160        Self::new()
161            .with_max_kernels(50)
162            .with_max_gpu_memory_mb(8192)
163            .with_max_messages_per_sec(10000)
164            .with_max_k2k_endpoints(100)
165            .with_max_pubsub_subscriptions(200)
166            .with_max_checkpoint_storage_mb(10240)
167            .with_max_api_requests_per_min(1000)
168    }
169
170    /// Create a standard large tier quota.
171    pub fn tier_large() -> Self {
172        Self::new()
173            .with_max_kernels(200)
174            .with_max_gpu_memory_mb(32768)
175            .with_max_messages_per_sec(100000)
176            .with_max_k2k_endpoints(500)
177            .with_max_pubsub_subscriptions(1000)
178            .with_max_checkpoint_storage_mb(102400)
179            .with_max_api_requests_per_min(10000)
180    }
181}
182
183impl Default for ResourceQuota {
184    fn default() -> Self {
185        Self::new()
186    }
187}
188
189// ============================================================================
190// RESOURCE USAGE
191// ============================================================================
192
193/// Current resource usage for a tenant.
194#[derive(Debug, Clone)]
195pub struct ResourceUsage {
196    /// Current kernel count.
197    pub kernels: u64,
198    /// Current GPU memory usage (MB).
199    pub gpu_memory_mb: u64,
200    /// Messages sent in current window.
201    pub messages_this_window: u64,
202    /// Current K2K endpoint count.
203    pub k2k_endpoints: u64,
204    /// Current PubSub subscription count.
205    pub pubsub_subscriptions: u64,
206    /// Current checkpoint storage (MB).
207    pub checkpoint_storage_mb: u64,
208    /// API requests in current window.
209    pub api_requests_this_window: u64,
210    /// Window start time.
211    pub window_start: Instant,
212}
213
214impl ResourceUsage {
215    /// Create new resource usage tracking.
216    pub fn new() -> Self {
217        Self {
218            kernels: 0,
219            gpu_memory_mb: 0,
220            messages_this_window: 0,
221            k2k_endpoints: 0,
222            pubsub_subscriptions: 0,
223            checkpoint_storage_mb: 0,
224            api_requests_this_window: 0,
225            window_start: Instant::now(),
226        }
227    }
228
229    /// Reset windowed counters (messages, API requests).
230    pub fn reset_window(&mut self) {
231        self.messages_this_window = 0;
232        self.api_requests_this_window = 0;
233        self.window_start = Instant::now();
234    }
235
236    /// Calculate utilization against quota.
237    pub fn utilization(&self, quota: &ResourceQuota) -> QuotaUtilization {
238        QuotaUtilization {
239            kernel_pct: quota
240                .max_kernels
241                .map(|max| self.kernels as f64 / max as f64 * 100.0),
242            gpu_memory_pct: quota
243                .max_gpu_memory_mb
244                .map(|max| self.gpu_memory_mb as f64 / max as f64 * 100.0),
245            message_rate_pct: quota
246                .max_messages_per_sec
247                .map(|max| self.messages_this_window as f64 / max as f64 * 100.0),
248        }
249    }
250}
251
252impl Default for ResourceUsage {
253    fn default() -> Self {
254        Self::new()
255    }
256}
257
258/// Quota utilization percentages.
259#[derive(Debug, Clone)]
260pub struct QuotaUtilization {
261    /// Kernel utilization percentage.
262    pub kernel_pct: Option<f64>,
263    /// GPU memory utilization percentage.
264    pub gpu_memory_pct: Option<f64>,
265    /// Message rate utilization percentage.
266    pub message_rate_pct: Option<f64>,
267}
268
269// ============================================================================
270// TENANT CONTEXT
271// ============================================================================
272
273/// Context for tenant-scoped operations.
274#[derive(Debug, Clone)]
275pub struct TenantContext {
276    /// Tenant ID.
277    pub tenant_id: String,
278    /// Tenant display name.
279    pub display_name: Option<String>,
280    /// Tenant metadata.
281    pub metadata: HashMap<String, String>,
282    /// When the context was created.
283    pub created_at: Instant,
284}
285
286impl TenantContext {
287    /// Create a new tenant context.
288    pub fn new(tenant_id: impl Into<String>) -> Self {
289        Self {
290            tenant_id: tenant_id.into(),
291            display_name: None,
292            metadata: HashMap::new(),
293            created_at: Instant::now(),
294        }
295    }
296
297    /// Set display name.
298    pub fn with_display_name(mut self, name: impl Into<String>) -> Self {
299        self.display_name = Some(name.into());
300        self
301    }
302
303    /// Add metadata.
304    pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
305        self.metadata.insert(key.into(), value.into());
306        self
307    }
308
309    /// Get the tenant-prefixed resource name.
310    pub fn resource_name(&self, resource: &str) -> String {
311        format!("{}:{}", self.tenant_id, resource)
312    }
313}
314
315// ============================================================================
316// TENANT REGISTRY
317// ============================================================================
318
319/// Error type for tenant operations.
320#[derive(Debug, Clone)]
321pub enum TenantError {
322    /// Tenant not found.
323    NotFound(String),
324    /// Quota exceeded.
325    QuotaExceeded(String),
326    /// Tenant already exists.
327    AlreadyExists(String),
328    /// Tenant is suspended.
329    Suspended(String),
330    /// Other error.
331    Other(String),
332}
333
334impl fmt::Display for TenantError {
335    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
336        match self {
337            Self::NotFound(msg) => write!(f, "Tenant not found: {}", msg),
338            Self::QuotaExceeded(msg) => write!(f, "Quota exceeded: {}", msg),
339            Self::AlreadyExists(msg) => write!(f, "Tenant already exists: {}", msg),
340            Self::Suspended(msg) => write!(f, "Tenant suspended: {}", msg),
341            Self::Other(msg) => write!(f, "Tenant error: {}", msg),
342        }
343    }
344}
345
346impl std::error::Error for TenantError {}
347
348/// Result type for tenant operations.
349pub type TenantResult<T> = Result<T, TenantError>;
350
351/// Internal tenant entry.
352struct TenantEntry {
353    /// Tenant context.
354    _context: TenantContext,
355    /// Resource quota.
356    quota: ResourceQuota,
357    /// Resource usage (with interior mutability).
358    usage: RwLock<ResourceUsage>,
359    /// Whether tenant is active.
360    active: bool,
361    /// When tenant was registered.
362    _registered_at: Instant,
363}
364
365/// Registry for managing tenants.
366pub struct TenantRegistry {
367    /// Registered tenants.
368    tenants: RwLock<HashMap<String, Arc<TenantEntry>>>,
369    /// Default quota for new tenants.
370    default_quota: ResourceQuota,
371    /// Window duration for rate limiting.
372    window_duration: Duration,
373}
374
375impl TenantRegistry {
376    /// Create a new tenant registry.
377    pub fn new() -> Self {
378        Self {
379            tenants: RwLock::new(HashMap::new()),
380            default_quota: ResourceQuota::tier_small(),
381            window_duration: Duration::from_secs(60), // 1 minute windows
382        }
383    }
384
385    /// Set default quota for new tenants.
386    pub fn with_default_quota(mut self, quota: ResourceQuota) -> Self {
387        self.default_quota = quota;
388        self
389    }
390
391    /// Set rate limit window duration.
392    pub fn with_window_duration(mut self, duration: Duration) -> Self {
393        self.window_duration = duration;
394        self
395    }
396
397    /// Register a tenant with quota.
398    pub fn with_tenant(self, tenant_id: impl Into<String>, quota: ResourceQuota) -> Self {
399        let tenant_id = tenant_id.into();
400        let entry = TenantEntry {
401            _context: TenantContext::new(&tenant_id),
402            quota,
403            usage: RwLock::new(ResourceUsage::new()),
404            active: true,
405            _registered_at: Instant::now(),
406        };
407
408        self.tenants.write().insert(tenant_id, Arc::new(entry));
409        self
410    }
411
412    /// Register a new tenant.
413    pub fn register_tenant(
414        &self,
415        tenant_id: impl Into<String>,
416        quota: ResourceQuota,
417    ) -> TenantResult<()> {
418        let tenant_id = tenant_id.into();
419        let mut tenants = self.tenants.write();
420
421        if tenants.contains_key(&tenant_id) {
422            return Err(TenantError::AlreadyExists(tenant_id));
423        }
424
425        let entry = TenantEntry {
426            _context: TenantContext::new(&tenant_id),
427            quota,
428            usage: RwLock::new(ResourceUsage::new()),
429            active: true,
430            _registered_at: Instant::now(),
431        };
432
433        tenants.insert(tenant_id, Arc::new(entry));
434        Ok(())
435    }
436
437    /// Get a tenant's quota.
438    pub fn get_quota(&self, tenant_id: &str) -> Option<ResourceQuota> {
439        self.tenants.read().get(tenant_id).map(|e| e.quota.clone())
440    }
441
442    /// Get a tenant's current usage.
443    pub fn get_usage(&self, tenant_id: &str) -> Option<ResourceUsage> {
444        self.tenants
445            .read()
446            .get(tenant_id)
447            .map(|e| e.usage.read().clone())
448    }
449
450    /// Check if tenant exists.
451    pub fn tenant_exists(&self, tenant_id: &str) -> bool {
452        self.tenants.read().contains_key(tenant_id)
453    }
454
455    /// Check if tenant is active.
456    pub fn is_tenant_active(&self, tenant_id: &str) -> bool {
457        self.tenants
458            .read()
459            .get(tenant_id)
460            .map(|e| e.active)
461            .unwrap_or(false)
462    }
463
464    /// Suspend a tenant.
465    pub fn suspend_tenant(&self, tenant_id: &str) -> TenantResult<()> {
466        let tenants = self.tenants.read();
467        let _entry = tenants
468            .get(tenant_id)
469            .ok_or_else(|| TenantError::NotFound(tenant_id.to_string()))?;
470        // Note: Would need interior mutability for active flag
471        // For now, this is a placeholder
472        Ok(())
473    }
474
475    /// Check and increment kernel count.
476    pub fn try_allocate_kernel(&self, tenant_id: &str) -> TenantResult<()> {
477        let tenants = self.tenants.read();
478        let entry = tenants
479            .get(tenant_id)
480            .ok_or_else(|| TenantError::NotFound(tenant_id.to_string()))?;
481
482        if !entry.active {
483            return Err(TenantError::Suspended(tenant_id.to_string()));
484        }
485
486        let mut usage = entry.usage.write();
487        if !entry.quota.check_kernel_limit(usage.kernels) {
488            return Err(TenantError::QuotaExceeded(format!(
489                "Kernel limit reached: {}/{}",
490                usage.kernels,
491                entry.quota.max_kernels.unwrap_or(0)
492            )));
493        }
494
495        usage.kernels += 1;
496        Ok(())
497    }
498
499    /// Release a kernel allocation.
500    pub fn release_kernel(&self, tenant_id: &str) {
501        if let Some(entry) = self.tenants.read().get(tenant_id) {
502            let mut usage = entry.usage.write();
503            usage.kernels = usage.kernels.saturating_sub(1);
504        }
505    }
506
507    /// Check and increment GPU memory.
508    pub fn try_allocate_gpu_memory(&self, tenant_id: &str, mb: u64) -> TenantResult<()> {
509        let tenants = self.tenants.read();
510        let entry = tenants
511            .get(tenant_id)
512            .ok_or_else(|| TenantError::NotFound(tenant_id.to_string()))?;
513
514        if !entry.active {
515            return Err(TenantError::Suspended(tenant_id.to_string()));
516        }
517
518        let mut usage = entry.usage.write();
519        if !entry.quota.check_gpu_memory_limit(usage.gpu_memory_mb, mb) {
520            return Err(TenantError::QuotaExceeded(format!(
521                "GPU memory limit reached: {}MB + {}MB > {}MB",
522                usage.gpu_memory_mb,
523                mb,
524                entry.quota.max_gpu_memory_mb.unwrap_or(0)
525            )));
526        }
527
528        usage.gpu_memory_mb += mb;
529        Ok(())
530    }
531
532    /// Release GPU memory allocation.
533    pub fn release_gpu_memory(&self, tenant_id: &str, mb: u64) {
534        if let Some(entry) = self.tenants.read().get(tenant_id) {
535            let mut usage = entry.usage.write();
536            usage.gpu_memory_mb = usage.gpu_memory_mb.saturating_sub(mb);
537        }
538    }
539
540    /// Record a message sent.
541    pub fn record_message(&self, tenant_id: &str) -> TenantResult<()> {
542        let tenants = self.tenants.read();
543        let entry = tenants
544            .get(tenant_id)
545            .ok_or_else(|| TenantError::NotFound(tenant_id.to_string()))?;
546
547        if !entry.active {
548            return Err(TenantError::Suspended(tenant_id.to_string()));
549        }
550
551        let mut usage = entry.usage.write();
552
553        // Reset window if needed
554        if usage.window_start.elapsed() >= self.window_duration {
555            usage.reset_window();
556        }
557
558        if !entry.quota.check_message_rate(usage.messages_this_window) {
559            return Err(TenantError::QuotaExceeded(format!(
560                "Message rate limit reached: {}/{} per {:?}",
561                usage.messages_this_window,
562                entry.quota.max_messages_per_sec.unwrap_or(0),
563                self.window_duration
564            )));
565        }
566
567        usage.messages_this_window += 1;
568        Ok(())
569    }
570
571    /// Get utilization for a tenant.
572    pub fn get_utilization(&self, tenant_id: &str) -> Option<QuotaUtilization> {
573        self.tenants
574            .read()
575            .get(tenant_id)
576            .map(|entry| entry.usage.read().utilization(&entry.quota))
577    }
578
579    /// Get all tenant IDs.
580    pub fn tenant_ids(&self) -> Vec<String> {
581        self.tenants.read().keys().cloned().collect()
582    }
583
584    /// Get tenant count.
585    pub fn tenant_count(&self) -> usize {
586        self.tenants.read().len()
587    }
588}
589
590impl Default for TenantRegistry {
591    fn default() -> Self {
592        Self::new()
593    }
594}
595
596// ============================================================================
597// TESTS
598// ============================================================================
599
600#[cfg(test)]
601mod tests {
602    use super::*;
603
604    #[test]
605    fn test_resource_quota() {
606        let quota = ResourceQuota::new()
607            .with_max_kernels(10)
608            .with_max_gpu_memory_mb(8192);
609
610        assert!(quota.check_kernel_limit(5));
611        assert!(quota.check_kernel_limit(9));
612        assert!(!quota.check_kernel_limit(10));
613
614        assert!(quota.check_gpu_memory_limit(4096, 2048));
615        assert!(!quota.check_gpu_memory_limit(8192, 1));
616    }
617
618    #[test]
619    fn test_tier_quotas() {
620        let small = ResourceQuota::tier_small();
621        assert_eq!(small.max_kernels, Some(10));
622        assert_eq!(small.max_gpu_memory_mb, Some(2048));
623
624        let large = ResourceQuota::tier_large();
625        assert_eq!(large.max_kernels, Some(200));
626        assert_eq!(large.max_gpu_memory_mb, Some(32768));
627    }
628
629    #[test]
630    fn test_tenant_context() {
631        let ctx = TenantContext::new("tenant_a")
632            .with_display_name("Tenant A")
633            .with_metadata("tier", "enterprise");
634
635        assert_eq!(ctx.tenant_id, "tenant_a");
636        assert_eq!(ctx.display_name, Some("Tenant A".to_string()));
637        assert_eq!(ctx.resource_name("kernel_1"), "tenant_a:kernel_1");
638    }
639
640    #[test]
641    fn test_tenant_registry() {
642        let registry = TenantRegistry::new()
643            .with_tenant("tenant_a", ResourceQuota::tier_small())
644            .with_tenant("tenant_b", ResourceQuota::tier_medium());
645
646        assert!(registry.tenant_exists("tenant_a"));
647        assert!(registry.tenant_exists("tenant_b"));
648        assert!(!registry.tenant_exists("tenant_c"));
649
650        let quota_a = registry.get_quota("tenant_a").unwrap();
651        assert_eq!(quota_a.max_kernels, Some(10));
652
653        let quota_b = registry.get_quota("tenant_b").unwrap();
654        assert_eq!(quota_b.max_kernels, Some(50));
655    }
656
657    #[test]
658    fn test_kernel_allocation() {
659        let registry =
660            TenantRegistry::new().with_tenant("tenant_a", ResourceQuota::new().with_max_kernels(2));
661
662        // First two allocations succeed
663        assert!(registry.try_allocate_kernel("tenant_a").is_ok());
664        assert!(registry.try_allocate_kernel("tenant_a").is_ok());
665
666        // Third fails
667        assert!(registry.try_allocate_kernel("tenant_a").is_err());
668
669        // Release one
670        registry.release_kernel("tenant_a");
671
672        // Now can allocate again
673        assert!(registry.try_allocate_kernel("tenant_a").is_ok());
674    }
675
676    #[test]
677    fn test_gpu_memory_allocation() {
678        let registry = TenantRegistry::new().with_tenant(
679            "tenant_a",
680            ResourceQuota::new().with_max_gpu_memory_mb(1024),
681        );
682
683        assert!(registry.try_allocate_gpu_memory("tenant_a", 512).is_ok());
684        assert!(registry.try_allocate_gpu_memory("tenant_a", 256).is_ok());
685        // Would exceed limit
686        assert!(registry.try_allocate_gpu_memory("tenant_a", 512).is_err());
687
688        // Release and retry
689        registry.release_gpu_memory("tenant_a", 256);
690        assert!(registry.try_allocate_gpu_memory("tenant_a", 512).is_ok());
691    }
692
693    #[test]
694    fn test_utilization() {
695        let quota = ResourceQuota::new()
696            .with_max_kernels(100)
697            .with_max_gpu_memory_mb(8192);
698
699        let mut usage = ResourceUsage::new();
700        usage.kernels = 50;
701        usage.gpu_memory_mb = 4096;
702
703        let utilization = usage.utilization(&quota);
704        assert_eq!(utilization.kernel_pct, Some(50.0));
705        assert_eq!(utilization.gpu_memory_pct, Some(50.0));
706    }
707
708    #[test]
709    fn test_unknown_tenant() {
710        let registry = TenantRegistry::new();
711
712        assert!(registry.try_allocate_kernel("unknown").is_err());
713        assert!(registry.get_quota("unknown").is_none());
714    }
715}