Skip to main content

oxirs_stream/
multi_tenancy.rs

1//! # Multi-Tenancy Support
2//!
3//! Complete multi-tenancy implementation with resource isolation, quota management,
4//! tenant lifecycle management, and fair resource allocation for streaming workloads.
5
6use anyhow::{anyhow, Result};
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::sync::RwLock;
12use tracing::{debug, info, warn};
13use uuid::Uuid;
14
15/// Multi-tenancy configuration
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct MultiTenancyConfig {
18    /// Enable multi-tenancy
19    pub enabled: bool,
20    /// Isolation mode
21    pub isolation_mode: IsolationMode,
22    /// Resource allocation strategy
23    pub resource_allocation: ResourceAllocationStrategy,
24    /// Default tenant quota
25    pub default_quota: TenantQuota,
26    /// Tenant lifecycle configuration
27    pub lifecycle: TenantLifecycleConfig,
28}
29
30impl Default for MultiTenancyConfig {
31    fn default() -> Self {
32        Self {
33            enabled: true,
34            isolation_mode: IsolationMode::Namespace,
35            resource_allocation: ResourceAllocationStrategy::FairShare,
36            default_quota: TenantQuota::default(),
37            lifecycle: TenantLifecycleConfig::default(),
38        }
39    }
40}
41
42/// Tenant isolation modes
43#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
44pub enum IsolationMode {
45    /// Namespace-based isolation (logical)
46    Namespace,
47    /// Process-based isolation (strong)
48    Process,
49    /// Container-based isolation (very strong)
50    Container,
51    /// VM-based isolation (strongest)
52    VirtualMachine,
53}
54
55impl std::fmt::Display for IsolationMode {
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        match self {
58            IsolationMode::Namespace => write!(f, "Namespace"),
59            IsolationMode::Process => write!(f, "Process"),
60            IsolationMode::Container => write!(f, "Container"),
61            IsolationMode::VirtualMachine => write!(f, "VirtualMachine"),
62        }
63    }
64}
65
66/// Resource allocation strategies
67#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
68pub enum ResourceAllocationStrategy {
69    /// Fair share allocation
70    FairShare,
71    /// Priority-based allocation
72    PriorityBased,
73    /// Guaranteed resources
74    Guaranteed,
75    /// Best-effort allocation
76    BestEffort,
77}
78
79/// Tenant quota configuration
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct TenantQuota {
82    /// Maximum events per second
83    pub max_events_per_second: u64,
84    /// Maximum concurrent connections
85    pub max_connections: u32,
86    /// Maximum topics/streams
87    pub max_topics: u32,
88    /// Maximum storage size (bytes)
89    pub max_storage_bytes: u64,
90    /// Maximum memory usage (bytes)
91    pub max_memory_bytes: u64,
92    /// Maximum CPU usage (percentage, 0-100)
93    pub max_cpu_percent: f64,
94    /// Maximum bandwidth (bytes per second)
95    pub max_bandwidth_bytes_per_sec: u64,
96}
97
98impl Default for TenantQuota {
99    fn default() -> Self {
100        Self {
101            max_events_per_second: 10000,
102            max_connections: 100,
103            max_topics: 50,
104            max_storage_bytes: 10 * 1024 * 1024 * 1024, // 10 GB
105            max_memory_bytes: 1024 * 1024 * 1024,       // 1 GB
106            max_cpu_percent: 50.0,
107            max_bandwidth_bytes_per_sec: 100 * 1024 * 1024, // 100 MB/s
108        }
109    }
110}
111
112/// Tenant lifecycle configuration
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct TenantLifecycleConfig {
115    /// Enable automatic provisioning
116    pub auto_provisioning: bool,
117    /// Enable automatic deprovisioning
118    pub auto_deprovisioning: bool,
119    /// Grace period before deprovisioning (seconds)
120    pub deprovision_grace_period_secs: u64,
121    /// Enable tenant suspension on quota violation
122    pub auto_suspend_on_violation: bool,
123}
124
125impl Default for TenantLifecycleConfig {
126    fn default() -> Self {
127        Self {
128            auto_provisioning: true,
129            auto_deprovisioning: false,
130            deprovision_grace_period_secs: 86400, // 24 hours
131            auto_suspend_on_violation: true,
132        }
133    }
134}
135
136/// Tenant information
137#[derive(Debug, Clone, Serialize, Deserialize)]
138pub struct Tenant {
139    /// Tenant ID
140    pub tenant_id: String,
141    /// Tenant name
142    pub name: String,
143    /// Organization
144    pub organization: Option<String>,
145    /// Tenant status
146    pub status: TenantStatus,
147    /// Quota configuration
148    pub quota: TenantQuota,
149    /// Current resource usage
150    pub usage: ResourceUsage,
151    /// Tenant tier
152    pub tier: TenantTier,
153    /// Created at
154    pub created_at: DateTime<Utc>,
155    /// Updated at
156    pub updated_at: DateTime<Utc>,
157    /// Metadata
158    pub metadata: HashMap<String, String>,
159}
160
161/// Tenant status
162#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
163pub enum TenantStatus {
164    /// Active and operational
165    Active,
166    /// Suspended (quota violation or manual)
167    Suspended,
168    /// Pending provisioning
169    Provisioning,
170    /// Pending deprovisioning
171    Deprovisioning,
172    /// Archived
173    Archived,
174}
175
176impl std::fmt::Display for TenantStatus {
177    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178        match self {
179            TenantStatus::Active => write!(f, "Active"),
180            TenantStatus::Suspended => write!(f, "Suspended"),
181            TenantStatus::Provisioning => write!(f, "Provisioning"),
182            TenantStatus::Deprovisioning => write!(f, "Deprovisioning"),
183            TenantStatus::Archived => write!(f, "Archived"),
184        }
185    }
186}
187
188/// Tenant tier
189#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
190pub enum TenantTier {
191    Free,
192    Basic,
193    Professional,
194    Enterprise,
195    Custom,
196}
197
198impl std::fmt::Display for TenantTier {
199    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
200        match self {
201            TenantTier::Free => write!(f, "Free"),
202            TenantTier::Basic => write!(f, "Basic"),
203            TenantTier::Professional => write!(f, "Professional"),
204            TenantTier::Enterprise => write!(f, "Enterprise"),
205            TenantTier::Custom => write!(f, "Custom"),
206        }
207    }
208}
209
210/// Current resource usage
211#[derive(Debug, Clone, Default, Serialize, Deserialize)]
212pub struct ResourceUsage {
213    /// Current events per second
214    pub events_per_second: u64,
215    /// Current connections
216    pub connections: u32,
217    /// Current topics
218    pub topics: u32,
219    /// Current storage usage (bytes)
220    pub storage_bytes: u64,
221    /// Current memory usage (bytes)
222    pub memory_bytes: u64,
223    /// Current CPU usage (percentage)
224    pub cpu_percent: f64,
225    /// Current bandwidth usage (bytes per second)
226    pub bandwidth_bytes_per_sec: u64,
227    /// Last updated
228    pub updated_at: DateTime<Utc>,
229}
230
231/// Tenant namespace
232#[derive(Debug, Clone, Serialize, Deserialize)]
233pub struct TenantNamespace {
234    /// Namespace ID
235    pub namespace_id: String,
236    /// Tenant ID
237    pub tenant_id: String,
238    /// Namespace resources
239    pub resources: NamespaceResources,
240}
241
242/// Namespace resources
243#[derive(Debug, Clone, Serialize, Deserialize, Default)]
244pub struct NamespaceResources {
245    /// Topics owned by this tenant
246    pub topics: Vec<String>,
247    /// Consumer groups
248    pub consumer_groups: Vec<String>,
249    /// Connections
250    pub connections: Vec<String>,
251}
252
253/// Multi-tenancy manager
254pub struct MultiTenancyManager {
255    config: MultiTenancyConfig,
256    tenants: Arc<RwLock<HashMap<String, Tenant>>>,
257    namespaces: Arc<RwLock<HashMap<String, TenantNamespace>>>,
258    metrics: Arc<RwLock<MultiTenancyMetrics>>,
259}
260
261/// Multi-tenancy metrics
262#[derive(Debug, Clone, Default, Serialize, Deserialize)]
263pub struct MultiTenancyMetrics {
264    /// Total tenants
265    pub total_tenants: u64,
266    /// Active tenants
267    pub active_tenants: u64,
268    /// Suspended tenants
269    pub suspended_tenants: u64,
270    /// Total quota violations
271    pub quota_violations: u64,
272    /// Resource utilization by tenant
273    pub tenant_utilization: HashMap<String, f64>,
274}
275
276impl MultiTenancyManager {
277    /// Create a new multi-tenancy manager
278    pub fn new(config: MultiTenancyConfig) -> Self {
279        Self {
280            config,
281            tenants: Arc::new(RwLock::new(HashMap::new())),
282            namespaces: Arc::new(RwLock::new(HashMap::new())),
283            metrics: Arc::new(RwLock::new(MultiTenancyMetrics::default())),
284        }
285    }
286
287    /// Initialize multi-tenancy system
288    pub async fn initialize(&self) -> Result<()> {
289        if !self.config.enabled {
290            info!("Multi-tenancy is disabled");
291            return Ok(());
292        }
293
294        info!(
295            "Initializing multi-tenancy system with isolation mode: {}",
296            self.config.isolation_mode
297        );
298
299        // Initialize default namespace
300        self.create_default_namespace().await?;
301
302        info!("Multi-tenancy system initialized successfully");
303        Ok(())
304    }
305
306    /// Create default namespace
307    async fn create_default_namespace(&self) -> Result<()> {
308        let namespace = TenantNamespace {
309            namespace_id: "default".to_string(),
310            tenant_id: "default".to_string(),
311            resources: NamespaceResources::default(),
312        };
313
314        self.namespaces
315            .write()
316            .await
317            .insert("default".to_string(), namespace);
318        debug!("Default namespace created");
319        Ok(())
320    }
321
322    /// Create a new tenant
323    pub async fn create_tenant(&self, name: String, tier: TenantTier) -> Result<Tenant> {
324        info!("Creating tenant: {} (tier: {})", name, tier);
325
326        let tenant_id = Uuid::new_v4().to_string();
327        let quota = self.get_quota_for_tier(tier);
328
329        let tenant = Tenant {
330            tenant_id: tenant_id.clone(),
331            name: name.clone(),
332            organization: None,
333            status: TenantStatus::Provisioning,
334            quota,
335            usage: ResourceUsage {
336                updated_at: Utc::now(),
337                ..Default::default()
338            },
339            tier,
340            created_at: Utc::now(),
341            updated_at: Utc::now(),
342            metadata: HashMap::new(),
343        };
344
345        // Provision tenant resources
346        self.provision_tenant(&tenant).await?;
347
348        // Update tenant status to active
349        let mut active_tenant = tenant.clone();
350        active_tenant.status = TenantStatus::Active;
351
352        // Store tenant
353        self.tenants
354            .write()
355            .await
356            .insert(tenant_id.clone(), active_tenant.clone());
357
358        // Update metrics
359        {
360            let mut metrics = self.metrics.write().await;
361            metrics.total_tenants += 1;
362            metrics.active_tenants += 1;
363        }
364
365        info!("Tenant created successfully: {}", name);
366        Ok(active_tenant)
367    }
368
369    /// Get quota for tenant tier
370    fn get_quota_for_tier(&self, tier: TenantTier) -> TenantQuota {
371        match tier {
372            TenantTier::Free => TenantQuota {
373                max_events_per_second: 1000,
374                max_connections: 10,
375                max_topics: 5,
376                max_storage_bytes: 1024 * 1024 * 1024, // 1 GB
377                max_memory_bytes: 256 * 1024 * 1024,   // 256 MB
378                max_cpu_percent: 10.0,
379                max_bandwidth_bytes_per_sec: 10 * 1024 * 1024, // 10 MB/s
380            },
381            TenantTier::Basic => TenantQuota {
382                max_events_per_second: 10000,
383                max_connections: 50,
384                max_topics: 25,
385                max_storage_bytes: 10 * 1024 * 1024 * 1024, // 10 GB
386                max_memory_bytes: 512 * 1024 * 1024,        // 512 MB
387                max_cpu_percent: 25.0,
388                max_bandwidth_bytes_per_sec: 50 * 1024 * 1024, // 50 MB/s
389            },
390            TenantTier::Professional => TenantQuota {
391                max_events_per_second: 50000,
392                max_connections: 200,
393                max_topics: 100,
394                max_storage_bytes: 100 * 1024 * 1024 * 1024, // 100 GB
395                max_memory_bytes: 2 * 1024 * 1024 * 1024,    // 2 GB
396                max_cpu_percent: 50.0,
397                max_bandwidth_bytes_per_sec: 200 * 1024 * 1024, // 200 MB/s
398            },
399            TenantTier::Enterprise => TenantQuota {
400                max_events_per_second: 500000,
401                max_connections: 1000,
402                max_topics: 500,
403                max_storage_bytes: 1024 * 1024 * 1024 * 1024, // 1 TB
404                max_memory_bytes: 16 * 1024 * 1024 * 1024,    // 16 GB
405                max_cpu_percent: 100.0,
406                max_bandwidth_bytes_per_sec: 1024 * 1024 * 1024, // 1 GB/s
407            },
408            TenantTier::Custom => self.config.default_quota.clone(),
409        }
410    }
411
412    /// Provision tenant resources
413    async fn provision_tenant(&self, tenant: &Tenant) -> Result<()> {
414        debug!("Provisioning resources for tenant: {}", tenant.tenant_id);
415
416        // Create tenant namespace
417        let namespace = TenantNamespace {
418            namespace_id: format!("tenant-{}", tenant.tenant_id),
419            tenant_id: tenant.tenant_id.clone(),
420            resources: NamespaceResources::default(),
421        };
422
423        self.namespaces
424            .write()
425            .await
426            .insert(namespace.namespace_id.clone(), namespace);
427
428        // In a real implementation, this would:
429        // 1. Allocate compute resources
430        // 2. Set up network isolation
431        // 3. Configure storage
432        // 4. Initialize monitoring
433
434        debug!("Tenant resources provisioned: {}", tenant.tenant_id);
435        Ok(())
436    }
437
438    /// Check quota for tenant
439    pub async fn check_quota(&self, tenant_id: &str, resource: ResourceType) -> Result<bool> {
440        let tenants = self.tenants.read().await;
441        let tenant = tenants
442            .get(tenant_id)
443            .ok_or_else(|| anyhow!("Tenant not found: {}", tenant_id))?;
444
445        if tenant.status != TenantStatus::Active {
446            return Err(anyhow!("Tenant is not active: {}", tenant.status));
447        }
448
449        let within_quota = match resource {
450            ResourceType::EventsPerSecond => {
451                tenant.usage.events_per_second < tenant.quota.max_events_per_second
452            }
453            ResourceType::Connections => tenant.usage.connections < tenant.quota.max_connections,
454            ResourceType::Topics => tenant.usage.topics < tenant.quota.max_topics,
455            ResourceType::Storage => tenant.usage.storage_bytes < tenant.quota.max_storage_bytes,
456            ResourceType::Memory => tenant.usage.memory_bytes < tenant.quota.max_memory_bytes,
457            ResourceType::CPU => tenant.usage.cpu_percent < tenant.quota.max_cpu_percent,
458            ResourceType::Bandwidth => {
459                tenant.usage.bandwidth_bytes_per_sec < tenant.quota.max_bandwidth_bytes_per_sec
460            }
461        };
462
463        if !within_quota {
464            warn!("Quota exceeded for tenant {}: {:?}", tenant_id, resource);
465
466            if self.config.lifecycle.auto_suspend_on_violation {
467                self.suspend_tenant(tenant_id, "Quota violation".to_string())
468                    .await?;
469            }
470
471            // Update metrics
472            self.metrics.write().await.quota_violations += 1;
473        }
474
475        Ok(within_quota)
476    }
477
478    /// Update tenant resource usage
479    pub async fn update_usage(&self, tenant_id: &str, usage: ResourceUsage) -> Result<()> {
480        let mut tenants = self.tenants.write().await;
481        if let Some(tenant) = tenants.get_mut(tenant_id) {
482            tenant.usage = usage;
483            tenant.updated_at = Utc::now();
484            debug!("Updated usage for tenant: {}", tenant_id);
485        }
486        Ok(())
487    }
488
489    /// Suspend tenant
490    pub async fn suspend_tenant(&self, tenant_id: &str, reason: String) -> Result<()> {
491        info!("Suspending tenant {}: {}", tenant_id, reason);
492
493        let mut tenants = self.tenants.write().await;
494        if let Some(tenant) = tenants.get_mut(tenant_id) {
495            tenant.status = TenantStatus::Suspended;
496            tenant.updated_at = Utc::now();
497            tenant
498                .metadata
499                .insert("suspension_reason".to_string(), reason);
500
501            // Update metrics
502            drop(tenants);
503            let mut metrics = self.metrics.write().await;
504            metrics.active_tenants -= 1;
505            metrics.suspended_tenants += 1;
506        }
507
508        Ok(())
509    }
510
511    /// Resume tenant
512    pub async fn resume_tenant(&self, tenant_id: &str) -> Result<()> {
513        info!("Resuming tenant: {}", tenant_id);
514
515        let mut tenants = self.tenants.write().await;
516        if let Some(tenant) = tenants.get_mut(tenant_id) {
517            tenant.status = TenantStatus::Active;
518            tenant.updated_at = Utc::now();
519            tenant.metadata.remove("suspension_reason");
520
521            // Update metrics
522            drop(tenants);
523            let mut metrics = self.metrics.write().await;
524            metrics.active_tenants += 1;
525            metrics.suspended_tenants -= 1;
526        }
527
528        Ok(())
529    }
530
531    /// Delete tenant
532    pub async fn delete_tenant(&self, tenant_id: &str) -> Result<()> {
533        info!("Deleting tenant: {}", tenant_id);
534
535        // Deprovision resources
536        self.deprovision_tenant(tenant_id).await?;
537
538        // Remove tenant
539        let mut tenants = self.tenants.write().await;
540        if tenants.remove(tenant_id).is_some() {
541            // Update metrics
542            drop(tenants);
543            let mut metrics = self.metrics.write().await;
544            metrics.total_tenants -= 1;
545        }
546
547        Ok(())
548    }
549
550    /// Deprovision tenant resources
551    async fn deprovision_tenant(&self, tenant_id: &str) -> Result<()> {
552        debug!("Deprovisioning resources for tenant: {}", tenant_id);
553
554        // Remove namespace
555        let namespace_id = format!("tenant-{}", tenant_id);
556        self.namespaces.write().await.remove(&namespace_id);
557
558        // In a real implementation, this would:
559        // 1. Release compute resources
560        // 2. Remove network isolation
561        // 3. Clean up storage
562        // 4. Remove monitoring
563
564        debug!("Tenant resources deprovisioned: {}", tenant_id);
565        Ok(())
566    }
567
568    /// Get tenant
569    pub async fn get_tenant(&self, tenant_id: &str) -> Option<Tenant> {
570        self.tenants.read().await.get(tenant_id).cloned()
571    }
572
573    /// List all tenants
574    pub async fn list_tenants(&self) -> Vec<Tenant> {
575        self.tenants.read().await.values().cloned().collect()
576    }
577
578    /// Get metrics
579    pub async fn get_metrics(&self) -> MultiTenancyMetrics {
580        self.metrics.read().await.clone()
581    }
582}
583
584/// Resource types for quota checking
585#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
586pub enum ResourceType {
587    EventsPerSecond,
588    Connections,
589    Topics,
590    Storage,
591    Memory,
592    CPU,
593    Bandwidth,
594}
595
596#[cfg(test)]
597mod tests {
598    use super::*;
599
600    #[tokio::test]
601    async fn test_multi_tenancy_config_default() {
602        let config = MultiTenancyConfig::default();
603        assert!(config.enabled);
604        assert_eq!(config.isolation_mode, IsolationMode::Namespace);
605    }
606
607    #[tokio::test]
608    async fn test_tenant_creation() {
609        let config = MultiTenancyConfig::default();
610        let manager = MultiTenancyManager::new(config);
611        manager.initialize().await.unwrap();
612
613        let tenant = manager
614            .create_tenant("Test Tenant".to_string(), TenantTier::Basic)
615            .await
616            .unwrap();
617        assert_eq!(tenant.name, "Test Tenant");
618        assert_eq!(tenant.tier, TenantTier::Basic);
619        assert_eq!(tenant.status, TenantStatus::Active);
620    }
621
622    #[tokio::test]
623    async fn test_quota_check() {
624        let config = MultiTenancyConfig::default();
625        let manager = MultiTenancyManager::new(config);
626        manager.initialize().await.unwrap();
627
628        let tenant = manager
629            .create_tenant("Test".to_string(), TenantTier::Free)
630            .await
631            .unwrap();
632
633        // Should be within quota initially
634        let within_quota = manager
635            .check_quota(&tenant.tenant_id, ResourceType::Connections)
636            .await
637            .unwrap();
638        assert!(within_quota);
639    }
640
641    #[tokio::test]
642    async fn test_tenant_suspension() {
643        let config = MultiTenancyConfig::default();
644        let manager = MultiTenancyManager::new(config);
645        manager.initialize().await.unwrap();
646
647        let tenant = manager
648            .create_tenant("Test".to_string(), TenantTier::Basic)
649            .await
650            .unwrap();
651
652        manager
653            .suspend_tenant(&tenant.tenant_id, "Testing".to_string())
654            .await
655            .unwrap();
656
657        let suspended_tenant = manager.get_tenant(&tenant.tenant_id).await.unwrap();
658        assert_eq!(suspended_tenant.status, TenantStatus::Suspended);
659    }
660
661    #[tokio::test]
662    async fn test_tier_quota() {
663        let config = MultiTenancyConfig::default();
664        let manager = MultiTenancyManager::new(config);
665
666        let free_quota = manager.get_quota_for_tier(TenantTier::Free);
667        let enterprise_quota = manager.get_quota_for_tier(TenantTier::Enterprise);
668
669        assert!(enterprise_quota.max_events_per_second > free_quota.max_events_per_second);
670        assert!(enterprise_quota.max_connections > free_quota.max_connections);
671    }
672}