1use anyhow::{anyhow, Result};
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::sync::RwLock;
12use tracing::{debug, info, warn};
13use uuid::Uuid;
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct MultiTenancyConfig {
18 pub enabled: bool,
20 pub isolation_mode: IsolationMode,
22 pub resource_allocation: ResourceAllocationStrategy,
24 pub default_quota: TenantQuota,
26 pub lifecycle: TenantLifecycleConfig,
28}
29
30impl Default for MultiTenancyConfig {
31 fn default() -> Self {
32 Self {
33 enabled: true,
34 isolation_mode: IsolationMode::Namespace,
35 resource_allocation: ResourceAllocationStrategy::FairShare,
36 default_quota: TenantQuota::default(),
37 lifecycle: TenantLifecycleConfig::default(),
38 }
39 }
40}
41
42#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
44pub enum IsolationMode {
45 Namespace,
47 Process,
49 Container,
51 VirtualMachine,
53}
54
55impl std::fmt::Display for IsolationMode {
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 match self {
58 IsolationMode::Namespace => write!(f, "Namespace"),
59 IsolationMode::Process => write!(f, "Process"),
60 IsolationMode::Container => write!(f, "Container"),
61 IsolationMode::VirtualMachine => write!(f, "VirtualMachine"),
62 }
63 }
64}
65
66#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
68pub enum ResourceAllocationStrategy {
69 FairShare,
71 PriorityBased,
73 Guaranteed,
75 BestEffort,
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct TenantQuota {
82 pub max_events_per_second: u64,
84 pub max_connections: u32,
86 pub max_topics: u32,
88 pub max_storage_bytes: u64,
90 pub max_memory_bytes: u64,
92 pub max_cpu_percent: f64,
94 pub max_bandwidth_bytes_per_sec: u64,
96}
97
98impl Default for TenantQuota {
99 fn default() -> Self {
100 Self {
101 max_events_per_second: 10000,
102 max_connections: 100,
103 max_topics: 50,
104 max_storage_bytes: 10 * 1024 * 1024 * 1024, max_memory_bytes: 1024 * 1024 * 1024, max_cpu_percent: 50.0,
107 max_bandwidth_bytes_per_sec: 100 * 1024 * 1024, }
109 }
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct TenantLifecycleConfig {
115 pub auto_provisioning: bool,
117 pub auto_deprovisioning: bool,
119 pub deprovision_grace_period_secs: u64,
121 pub auto_suspend_on_violation: bool,
123}
124
125impl Default for TenantLifecycleConfig {
126 fn default() -> Self {
127 Self {
128 auto_provisioning: true,
129 auto_deprovisioning: false,
130 deprovision_grace_period_secs: 86400, auto_suspend_on_violation: true,
132 }
133 }
134}
135
136#[derive(Debug, Clone, Serialize, Deserialize)]
138pub struct Tenant {
139 pub tenant_id: String,
141 pub name: String,
143 pub organization: Option<String>,
145 pub status: TenantStatus,
147 pub quota: TenantQuota,
149 pub usage: ResourceUsage,
151 pub tier: TenantTier,
153 pub created_at: DateTime<Utc>,
155 pub updated_at: DateTime<Utc>,
157 pub metadata: HashMap<String, String>,
159}
160
161#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
163pub enum TenantStatus {
164 Active,
166 Suspended,
168 Provisioning,
170 Deprovisioning,
172 Archived,
174}
175
176impl std::fmt::Display for TenantStatus {
177 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178 match self {
179 TenantStatus::Active => write!(f, "Active"),
180 TenantStatus::Suspended => write!(f, "Suspended"),
181 TenantStatus::Provisioning => write!(f, "Provisioning"),
182 TenantStatus::Deprovisioning => write!(f, "Deprovisioning"),
183 TenantStatus::Archived => write!(f, "Archived"),
184 }
185 }
186}
187
188#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
190pub enum TenantTier {
191 Free,
192 Basic,
193 Professional,
194 Enterprise,
195 Custom,
196}
197
198impl std::fmt::Display for TenantTier {
199 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
200 match self {
201 TenantTier::Free => write!(f, "Free"),
202 TenantTier::Basic => write!(f, "Basic"),
203 TenantTier::Professional => write!(f, "Professional"),
204 TenantTier::Enterprise => write!(f, "Enterprise"),
205 TenantTier::Custom => write!(f, "Custom"),
206 }
207 }
208}
209
210#[derive(Debug, Clone, Default, Serialize, Deserialize)]
212pub struct ResourceUsage {
213 pub events_per_second: u64,
215 pub connections: u32,
217 pub topics: u32,
219 pub storage_bytes: u64,
221 pub memory_bytes: u64,
223 pub cpu_percent: f64,
225 pub bandwidth_bytes_per_sec: u64,
227 pub updated_at: DateTime<Utc>,
229}
230
231#[derive(Debug, Clone, Serialize, Deserialize)]
233pub struct TenantNamespace {
234 pub namespace_id: String,
236 pub tenant_id: String,
238 pub resources: NamespaceResources,
240}
241
242#[derive(Debug, Clone, Serialize, Deserialize, Default)]
244pub struct NamespaceResources {
245 pub topics: Vec<String>,
247 pub consumer_groups: Vec<String>,
249 pub connections: Vec<String>,
251}
252
253pub struct MultiTenancyManager {
255 config: MultiTenancyConfig,
256 tenants: Arc<RwLock<HashMap<String, Tenant>>>,
257 namespaces: Arc<RwLock<HashMap<String, TenantNamespace>>>,
258 metrics: Arc<RwLock<MultiTenancyMetrics>>,
259}
260
261#[derive(Debug, Clone, Default, Serialize, Deserialize)]
263pub struct MultiTenancyMetrics {
264 pub total_tenants: u64,
266 pub active_tenants: u64,
268 pub suspended_tenants: u64,
270 pub quota_violations: u64,
272 pub tenant_utilization: HashMap<String, f64>,
274}
275
276impl MultiTenancyManager {
277 pub fn new(config: MultiTenancyConfig) -> Self {
279 Self {
280 config,
281 tenants: Arc::new(RwLock::new(HashMap::new())),
282 namespaces: Arc::new(RwLock::new(HashMap::new())),
283 metrics: Arc::new(RwLock::new(MultiTenancyMetrics::default())),
284 }
285 }
286
287 pub async fn initialize(&self) -> Result<()> {
289 if !self.config.enabled {
290 info!("Multi-tenancy is disabled");
291 return Ok(());
292 }
293
294 info!(
295 "Initializing multi-tenancy system with isolation mode: {}",
296 self.config.isolation_mode
297 );
298
299 self.create_default_namespace().await?;
301
302 info!("Multi-tenancy system initialized successfully");
303 Ok(())
304 }
305
306 async fn create_default_namespace(&self) -> Result<()> {
308 let namespace = TenantNamespace {
309 namespace_id: "default".to_string(),
310 tenant_id: "default".to_string(),
311 resources: NamespaceResources::default(),
312 };
313
314 self.namespaces
315 .write()
316 .await
317 .insert("default".to_string(), namespace);
318 debug!("Default namespace created");
319 Ok(())
320 }
321
322 pub async fn create_tenant(&self, name: String, tier: TenantTier) -> Result<Tenant> {
324 info!("Creating tenant: {} (tier: {})", name, tier);
325
326 let tenant_id = Uuid::new_v4().to_string();
327 let quota = self.get_quota_for_tier(tier);
328
329 let tenant = Tenant {
330 tenant_id: tenant_id.clone(),
331 name: name.clone(),
332 organization: None,
333 status: TenantStatus::Provisioning,
334 quota,
335 usage: ResourceUsage {
336 updated_at: Utc::now(),
337 ..Default::default()
338 },
339 tier,
340 created_at: Utc::now(),
341 updated_at: Utc::now(),
342 metadata: HashMap::new(),
343 };
344
345 self.provision_tenant(&tenant).await?;
347
348 let mut active_tenant = tenant.clone();
350 active_tenant.status = TenantStatus::Active;
351
352 self.tenants
354 .write()
355 .await
356 .insert(tenant_id.clone(), active_tenant.clone());
357
358 {
360 let mut metrics = self.metrics.write().await;
361 metrics.total_tenants += 1;
362 metrics.active_tenants += 1;
363 }
364
365 info!("Tenant created successfully: {}", name);
366 Ok(active_tenant)
367 }
368
369 fn get_quota_for_tier(&self, tier: TenantTier) -> TenantQuota {
371 match tier {
372 TenantTier::Free => TenantQuota {
373 max_events_per_second: 1000,
374 max_connections: 10,
375 max_topics: 5,
376 max_storage_bytes: 1024 * 1024 * 1024, max_memory_bytes: 256 * 1024 * 1024, max_cpu_percent: 10.0,
379 max_bandwidth_bytes_per_sec: 10 * 1024 * 1024, },
381 TenantTier::Basic => TenantQuota {
382 max_events_per_second: 10000,
383 max_connections: 50,
384 max_topics: 25,
385 max_storage_bytes: 10 * 1024 * 1024 * 1024, max_memory_bytes: 512 * 1024 * 1024, max_cpu_percent: 25.0,
388 max_bandwidth_bytes_per_sec: 50 * 1024 * 1024, },
390 TenantTier::Professional => TenantQuota {
391 max_events_per_second: 50000,
392 max_connections: 200,
393 max_topics: 100,
394 max_storage_bytes: 100 * 1024 * 1024 * 1024, max_memory_bytes: 2 * 1024 * 1024 * 1024, max_cpu_percent: 50.0,
397 max_bandwidth_bytes_per_sec: 200 * 1024 * 1024, },
399 TenantTier::Enterprise => TenantQuota {
400 max_events_per_second: 500000,
401 max_connections: 1000,
402 max_topics: 500,
403 max_storage_bytes: 1024 * 1024 * 1024 * 1024, max_memory_bytes: 16 * 1024 * 1024 * 1024, max_cpu_percent: 100.0,
406 max_bandwidth_bytes_per_sec: 1024 * 1024 * 1024, },
408 TenantTier::Custom => self.config.default_quota.clone(),
409 }
410 }
411
412 async fn provision_tenant(&self, tenant: &Tenant) -> Result<()> {
414 debug!("Provisioning resources for tenant: {}", tenant.tenant_id);
415
416 let namespace = TenantNamespace {
418 namespace_id: format!("tenant-{}", tenant.tenant_id),
419 tenant_id: tenant.tenant_id.clone(),
420 resources: NamespaceResources::default(),
421 };
422
423 self.namespaces
424 .write()
425 .await
426 .insert(namespace.namespace_id.clone(), namespace);
427
428 debug!("Tenant resources provisioned: {}", tenant.tenant_id);
435 Ok(())
436 }
437
438 pub async fn check_quota(&self, tenant_id: &str, resource: ResourceType) -> Result<bool> {
440 let tenants = self.tenants.read().await;
441 let tenant = tenants
442 .get(tenant_id)
443 .ok_or_else(|| anyhow!("Tenant not found: {}", tenant_id))?;
444
445 if tenant.status != TenantStatus::Active {
446 return Err(anyhow!("Tenant is not active: {}", tenant.status));
447 }
448
449 let within_quota = match resource {
450 ResourceType::EventsPerSecond => {
451 tenant.usage.events_per_second < tenant.quota.max_events_per_second
452 }
453 ResourceType::Connections => tenant.usage.connections < tenant.quota.max_connections,
454 ResourceType::Topics => tenant.usage.topics < tenant.quota.max_topics,
455 ResourceType::Storage => tenant.usage.storage_bytes < tenant.quota.max_storage_bytes,
456 ResourceType::Memory => tenant.usage.memory_bytes < tenant.quota.max_memory_bytes,
457 ResourceType::CPU => tenant.usage.cpu_percent < tenant.quota.max_cpu_percent,
458 ResourceType::Bandwidth => {
459 tenant.usage.bandwidth_bytes_per_sec < tenant.quota.max_bandwidth_bytes_per_sec
460 }
461 };
462
463 if !within_quota {
464 warn!("Quota exceeded for tenant {}: {:?}", tenant_id, resource);
465
466 if self.config.lifecycle.auto_suspend_on_violation {
467 self.suspend_tenant(tenant_id, "Quota violation".to_string())
468 .await?;
469 }
470
471 self.metrics.write().await.quota_violations += 1;
473 }
474
475 Ok(within_quota)
476 }
477
478 pub async fn update_usage(&self, tenant_id: &str, usage: ResourceUsage) -> Result<()> {
480 let mut tenants = self.tenants.write().await;
481 if let Some(tenant) = tenants.get_mut(tenant_id) {
482 tenant.usage = usage;
483 tenant.updated_at = Utc::now();
484 debug!("Updated usage for tenant: {}", tenant_id);
485 }
486 Ok(())
487 }
488
489 pub async fn suspend_tenant(&self, tenant_id: &str, reason: String) -> Result<()> {
491 info!("Suspending tenant {}: {}", tenant_id, reason);
492
493 let mut tenants = self.tenants.write().await;
494 if let Some(tenant) = tenants.get_mut(tenant_id) {
495 tenant.status = TenantStatus::Suspended;
496 tenant.updated_at = Utc::now();
497 tenant
498 .metadata
499 .insert("suspension_reason".to_string(), reason);
500
501 drop(tenants);
503 let mut metrics = self.metrics.write().await;
504 metrics.active_tenants -= 1;
505 metrics.suspended_tenants += 1;
506 }
507
508 Ok(())
509 }
510
511 pub async fn resume_tenant(&self, tenant_id: &str) -> Result<()> {
513 info!("Resuming tenant: {}", tenant_id);
514
515 let mut tenants = self.tenants.write().await;
516 if let Some(tenant) = tenants.get_mut(tenant_id) {
517 tenant.status = TenantStatus::Active;
518 tenant.updated_at = Utc::now();
519 tenant.metadata.remove("suspension_reason");
520
521 drop(tenants);
523 let mut metrics = self.metrics.write().await;
524 metrics.active_tenants += 1;
525 metrics.suspended_tenants -= 1;
526 }
527
528 Ok(())
529 }
530
531 pub async fn delete_tenant(&self, tenant_id: &str) -> Result<()> {
533 info!("Deleting tenant: {}", tenant_id);
534
535 self.deprovision_tenant(tenant_id).await?;
537
538 let mut tenants = self.tenants.write().await;
540 if tenants.remove(tenant_id).is_some() {
541 drop(tenants);
543 let mut metrics = self.metrics.write().await;
544 metrics.total_tenants -= 1;
545 }
546
547 Ok(())
548 }
549
550 async fn deprovision_tenant(&self, tenant_id: &str) -> Result<()> {
552 debug!("Deprovisioning resources for tenant: {}", tenant_id);
553
554 let namespace_id = format!("tenant-{}", tenant_id);
556 self.namespaces.write().await.remove(&namespace_id);
557
558 debug!("Tenant resources deprovisioned: {}", tenant_id);
565 Ok(())
566 }
567
568 pub async fn get_tenant(&self, tenant_id: &str) -> Option<Tenant> {
570 self.tenants.read().await.get(tenant_id).cloned()
571 }
572
573 pub async fn list_tenants(&self) -> Vec<Tenant> {
575 self.tenants.read().await.values().cloned().collect()
576 }
577
578 pub async fn get_metrics(&self) -> MultiTenancyMetrics {
580 self.metrics.read().await.clone()
581 }
582}
583
584#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
586pub enum ResourceType {
587 EventsPerSecond,
588 Connections,
589 Topics,
590 Storage,
591 Memory,
592 CPU,
593 Bandwidth,
594}
595
596#[cfg(test)]
597mod tests {
598 use super::*;
599
600 #[tokio::test]
601 async fn test_multi_tenancy_config_default() {
602 let config = MultiTenancyConfig::default();
603 assert!(config.enabled);
604 assert_eq!(config.isolation_mode, IsolationMode::Namespace);
605 }
606
607 #[tokio::test]
608 async fn test_tenant_creation() {
609 let config = MultiTenancyConfig::default();
610 let manager = MultiTenancyManager::new(config);
611 manager.initialize().await.unwrap();
612
613 let tenant = manager
614 .create_tenant("Test Tenant".to_string(), TenantTier::Basic)
615 .await
616 .unwrap();
617 assert_eq!(tenant.name, "Test Tenant");
618 assert_eq!(tenant.tier, TenantTier::Basic);
619 assert_eq!(tenant.status, TenantStatus::Active);
620 }
621
622 #[tokio::test]
623 async fn test_quota_check() {
624 let config = MultiTenancyConfig::default();
625 let manager = MultiTenancyManager::new(config);
626 manager.initialize().await.unwrap();
627
628 let tenant = manager
629 .create_tenant("Test".to_string(), TenantTier::Free)
630 .await
631 .unwrap();
632
633 let within_quota = manager
635 .check_quota(&tenant.tenant_id, ResourceType::Connections)
636 .await
637 .unwrap();
638 assert!(within_quota);
639 }
640
641 #[tokio::test]
642 async fn test_tenant_suspension() {
643 let config = MultiTenancyConfig::default();
644 let manager = MultiTenancyManager::new(config);
645 manager.initialize().await.unwrap();
646
647 let tenant = manager
648 .create_tenant("Test".to_string(), TenantTier::Basic)
649 .await
650 .unwrap();
651
652 manager
653 .suspend_tenant(&tenant.tenant_id, "Testing".to_string())
654 .await
655 .unwrap();
656
657 let suspended_tenant = manager.get_tenant(&tenant.tenant_id).await.unwrap();
658 assert_eq!(suspended_tenant.status, TenantStatus::Suspended);
659 }
660
661 #[tokio::test]
662 async fn test_tier_quota() {
663 let config = MultiTenancyConfig::default();
664 let manager = MultiTenancyManager::new(config);
665
666 let free_quota = manager.get_quota_for_tier(TenantTier::Free);
667 let enterprise_quota = manager.get_quota_for_tier(TenantTier::Enterprise);
668
669 assert!(enterprise_quota.max_events_per_second > free_quota.max_events_per_second);
670 assert!(enterprise_quota.max_connections > free_quota.max_connections);
671 }
672}