use anyhow::{anyhow, Result};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MultiTenancyConfig {
pub enabled: bool,
pub isolation_mode: IsolationMode,
pub resource_allocation: ResourceAllocationStrategy,
pub default_quota: TenantQuota,
pub lifecycle: TenantLifecycleConfig,
}
impl Default for MultiTenancyConfig {
fn default() -> Self {
Self {
enabled: true,
isolation_mode: IsolationMode::Namespace,
resource_allocation: ResourceAllocationStrategy::FairShare,
default_quota: TenantQuota::default(),
lifecycle: TenantLifecycleConfig::default(),
}
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum IsolationMode {
Namespace,
Process,
Container,
VirtualMachine,
}
impl std::fmt::Display for IsolationMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
IsolationMode::Namespace => write!(f, "Namespace"),
IsolationMode::Process => write!(f, "Process"),
IsolationMode::Container => write!(f, "Container"),
IsolationMode::VirtualMachine => write!(f, "VirtualMachine"),
}
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum ResourceAllocationStrategy {
FairShare,
PriorityBased,
Guaranteed,
BestEffort,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TenantQuota {
pub max_events_per_second: u64,
pub max_connections: u32,
pub max_topics: u32,
pub max_storage_bytes: u64,
pub max_memory_bytes: u64,
pub max_cpu_percent: f64,
pub max_bandwidth_bytes_per_sec: u64,
}
impl Default for TenantQuota {
fn default() -> Self {
Self {
max_events_per_second: 10000,
max_connections: 100,
max_topics: 50,
max_storage_bytes: 10 * 1024 * 1024 * 1024, max_memory_bytes: 1024 * 1024 * 1024, max_cpu_percent: 50.0,
max_bandwidth_bytes_per_sec: 100 * 1024 * 1024, }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TenantLifecycleConfig {
pub auto_provisioning: bool,
pub auto_deprovisioning: bool,
pub deprovision_grace_period_secs: u64,
pub auto_suspend_on_violation: bool,
}
impl Default for TenantLifecycleConfig {
fn default() -> Self {
Self {
auto_provisioning: true,
auto_deprovisioning: false,
deprovision_grace_period_secs: 86400, auto_suspend_on_violation: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Tenant {
pub tenant_id: String,
pub name: String,
pub organization: Option<String>,
pub status: TenantStatus,
pub quota: TenantQuota,
pub usage: ResourceUsage,
pub tier: TenantTier,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum TenantStatus {
Active,
Suspended,
Provisioning,
Deprovisioning,
Archived,
}
impl std::fmt::Display for TenantStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TenantStatus::Active => write!(f, "Active"),
TenantStatus::Suspended => write!(f, "Suspended"),
TenantStatus::Provisioning => write!(f, "Provisioning"),
TenantStatus::Deprovisioning => write!(f, "Deprovisioning"),
TenantStatus::Archived => write!(f, "Archived"),
}
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub enum TenantTier {
Free,
Basic,
Professional,
Enterprise,
Custom,
}
impl std::fmt::Display for TenantTier {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TenantTier::Free => write!(f, "Free"),
TenantTier::Basic => write!(f, "Basic"),
TenantTier::Professional => write!(f, "Professional"),
TenantTier::Enterprise => write!(f, "Enterprise"),
TenantTier::Custom => write!(f, "Custom"),
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ResourceUsage {
pub events_per_second: u64,
pub connections: u32,
pub topics: u32,
pub storage_bytes: u64,
pub memory_bytes: u64,
pub cpu_percent: f64,
pub bandwidth_bytes_per_sec: u64,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TenantNamespace {
pub namespace_id: String,
pub tenant_id: String,
pub resources: NamespaceResources,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct NamespaceResources {
pub topics: Vec<String>,
pub consumer_groups: Vec<String>,
pub connections: Vec<String>,
}
pub struct MultiTenancyManager {
config: MultiTenancyConfig,
tenants: Arc<RwLock<HashMap<String, Tenant>>>,
namespaces: Arc<RwLock<HashMap<String, TenantNamespace>>>,
metrics: Arc<RwLock<MultiTenancyMetrics>>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct MultiTenancyMetrics {
pub total_tenants: u64,
pub active_tenants: u64,
pub suspended_tenants: u64,
pub quota_violations: u64,
pub tenant_utilization: HashMap<String, f64>,
}
impl MultiTenancyManager {
pub fn new(config: MultiTenancyConfig) -> Self {
Self {
config,
tenants: Arc::new(RwLock::new(HashMap::new())),
namespaces: Arc::new(RwLock::new(HashMap::new())),
metrics: Arc::new(RwLock::new(MultiTenancyMetrics::default())),
}
}
pub async fn initialize(&self) -> Result<()> {
if !self.config.enabled {
info!("Multi-tenancy is disabled");
return Ok(());
}
info!(
"Initializing multi-tenancy system with isolation mode: {}",
self.config.isolation_mode
);
self.create_default_namespace().await?;
info!("Multi-tenancy system initialized successfully");
Ok(())
}
async fn create_default_namespace(&self) -> Result<()> {
let namespace = TenantNamespace {
namespace_id: "default".to_string(),
tenant_id: "default".to_string(),
resources: NamespaceResources::default(),
};
self.namespaces
.write()
.await
.insert("default".to_string(), namespace);
debug!("Default namespace created");
Ok(())
}
pub async fn create_tenant(&self, name: String, tier: TenantTier) -> Result<Tenant> {
info!("Creating tenant: {} (tier: {})", name, tier);
let tenant_id = Uuid::new_v4().to_string();
let quota = self.get_quota_for_tier(tier);
let tenant = Tenant {
tenant_id: tenant_id.clone(),
name: name.clone(),
organization: None,
status: TenantStatus::Provisioning,
quota,
usage: ResourceUsage {
updated_at: Utc::now(),
..Default::default()
},
tier,
created_at: Utc::now(),
updated_at: Utc::now(),
metadata: HashMap::new(),
};
self.provision_tenant(&tenant).await?;
let mut active_tenant = tenant.clone();
active_tenant.status = TenantStatus::Active;
self.tenants
.write()
.await
.insert(tenant_id.clone(), active_tenant.clone());
{
let mut metrics = self.metrics.write().await;
metrics.total_tenants += 1;
metrics.active_tenants += 1;
}
info!("Tenant created successfully: {}", name);
Ok(active_tenant)
}
fn get_quota_for_tier(&self, tier: TenantTier) -> TenantQuota {
match tier {
TenantTier::Free => TenantQuota {
max_events_per_second: 1000,
max_connections: 10,
max_topics: 5,
max_storage_bytes: 1024 * 1024 * 1024, max_memory_bytes: 256 * 1024 * 1024, max_cpu_percent: 10.0,
max_bandwidth_bytes_per_sec: 10 * 1024 * 1024, },
TenantTier::Basic => TenantQuota {
max_events_per_second: 10000,
max_connections: 50,
max_topics: 25,
max_storage_bytes: 10 * 1024 * 1024 * 1024, max_memory_bytes: 512 * 1024 * 1024, max_cpu_percent: 25.0,
max_bandwidth_bytes_per_sec: 50 * 1024 * 1024, },
TenantTier::Professional => TenantQuota {
max_events_per_second: 50000,
max_connections: 200,
max_topics: 100,
max_storage_bytes: 100 * 1024 * 1024 * 1024, max_memory_bytes: 2 * 1024 * 1024 * 1024, max_cpu_percent: 50.0,
max_bandwidth_bytes_per_sec: 200 * 1024 * 1024, },
TenantTier::Enterprise => TenantQuota {
max_events_per_second: 500000,
max_connections: 1000,
max_topics: 500,
max_storage_bytes: 1024 * 1024 * 1024 * 1024, max_memory_bytes: 16 * 1024 * 1024 * 1024, max_cpu_percent: 100.0,
max_bandwidth_bytes_per_sec: 1024 * 1024 * 1024, },
TenantTier::Custom => self.config.default_quota.clone(),
}
}
async fn provision_tenant(&self, tenant: &Tenant) -> Result<()> {
debug!("Provisioning resources for tenant: {}", tenant.tenant_id);
let namespace = TenantNamespace {
namespace_id: format!("tenant-{}", tenant.tenant_id),
tenant_id: tenant.tenant_id.clone(),
resources: NamespaceResources::default(),
};
self.namespaces
.write()
.await
.insert(namespace.namespace_id.clone(), namespace);
debug!("Tenant resources provisioned: {}", tenant.tenant_id);
Ok(())
}
pub async fn check_quota(&self, tenant_id: &str, resource: ResourceType) -> Result<bool> {
let tenants = self.tenants.read().await;
let tenant = tenants
.get(tenant_id)
.ok_or_else(|| anyhow!("Tenant not found: {}", tenant_id))?;
if tenant.status != TenantStatus::Active {
return Err(anyhow!("Tenant is not active: {}", tenant.status));
}
let within_quota = match resource {
ResourceType::EventsPerSecond => {
tenant.usage.events_per_second < tenant.quota.max_events_per_second
}
ResourceType::Connections => tenant.usage.connections < tenant.quota.max_connections,
ResourceType::Topics => tenant.usage.topics < tenant.quota.max_topics,
ResourceType::Storage => tenant.usage.storage_bytes < tenant.quota.max_storage_bytes,
ResourceType::Memory => tenant.usage.memory_bytes < tenant.quota.max_memory_bytes,
ResourceType::CPU => tenant.usage.cpu_percent < tenant.quota.max_cpu_percent,
ResourceType::Bandwidth => {
tenant.usage.bandwidth_bytes_per_sec < tenant.quota.max_bandwidth_bytes_per_sec
}
};
if !within_quota {
warn!("Quota exceeded for tenant {}: {:?}", tenant_id, resource);
if self.config.lifecycle.auto_suspend_on_violation {
self.suspend_tenant(tenant_id, "Quota violation".to_string())
.await?;
}
self.metrics.write().await.quota_violations += 1;
}
Ok(within_quota)
}
pub async fn update_usage(&self, tenant_id: &str, usage: ResourceUsage) -> Result<()> {
let mut tenants = self.tenants.write().await;
if let Some(tenant) = tenants.get_mut(tenant_id) {
tenant.usage = usage;
tenant.updated_at = Utc::now();
debug!("Updated usage for tenant: {}", tenant_id);
}
Ok(())
}
pub async fn suspend_tenant(&self, tenant_id: &str, reason: String) -> Result<()> {
info!("Suspending tenant {}: {}", tenant_id, reason);
let mut tenants = self.tenants.write().await;
if let Some(tenant) = tenants.get_mut(tenant_id) {
tenant.status = TenantStatus::Suspended;
tenant.updated_at = Utc::now();
tenant
.metadata
.insert("suspension_reason".to_string(), reason);
drop(tenants);
let mut metrics = self.metrics.write().await;
metrics.active_tenants -= 1;
metrics.suspended_tenants += 1;
}
Ok(())
}
pub async fn resume_tenant(&self, tenant_id: &str) -> Result<()> {
info!("Resuming tenant: {}", tenant_id);
let mut tenants = self.tenants.write().await;
if let Some(tenant) = tenants.get_mut(tenant_id) {
tenant.status = TenantStatus::Active;
tenant.updated_at = Utc::now();
tenant.metadata.remove("suspension_reason");
drop(tenants);
let mut metrics = self.metrics.write().await;
metrics.active_tenants += 1;
metrics.suspended_tenants -= 1;
}
Ok(())
}
pub async fn delete_tenant(&self, tenant_id: &str) -> Result<()> {
info!("Deleting tenant: {}", tenant_id);
self.deprovision_tenant(tenant_id).await?;
let mut tenants = self.tenants.write().await;
if tenants.remove(tenant_id).is_some() {
drop(tenants);
let mut metrics = self.metrics.write().await;
metrics.total_tenants -= 1;
}
Ok(())
}
async fn deprovision_tenant(&self, tenant_id: &str) -> Result<()> {
debug!("Deprovisioning resources for tenant: {}", tenant_id);
let namespace_id = format!("tenant-{}", tenant_id);
self.namespaces.write().await.remove(&namespace_id);
debug!("Tenant resources deprovisioned: {}", tenant_id);
Ok(())
}
pub async fn get_tenant(&self, tenant_id: &str) -> Option<Tenant> {
self.tenants.read().await.get(tenant_id).cloned()
}
pub async fn list_tenants(&self) -> Vec<Tenant> {
self.tenants.read().await.values().cloned().collect()
}
pub async fn get_metrics(&self) -> MultiTenancyMetrics {
self.metrics.read().await.clone()
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum ResourceType {
EventsPerSecond,
Connections,
Topics,
Storage,
Memory,
CPU,
Bandwidth,
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_multi_tenancy_config_default() {
let config = MultiTenancyConfig::default();
assert!(config.enabled);
assert_eq!(config.isolation_mode, IsolationMode::Namespace);
}
#[tokio::test]
async fn test_tenant_creation() {
let config = MultiTenancyConfig::default();
let manager = MultiTenancyManager::new(config);
manager.initialize().await.unwrap();
let tenant = manager
.create_tenant("Test Tenant".to_string(), TenantTier::Basic)
.await
.unwrap();
assert_eq!(tenant.name, "Test Tenant");
assert_eq!(tenant.tier, TenantTier::Basic);
assert_eq!(tenant.status, TenantStatus::Active);
}
#[tokio::test]
async fn test_quota_check() {
let config = MultiTenancyConfig::default();
let manager = MultiTenancyManager::new(config);
manager.initialize().await.unwrap();
let tenant = manager
.create_tenant("Test".to_string(), TenantTier::Free)
.await
.unwrap();
let within_quota = manager
.check_quota(&tenant.tenant_id, ResourceType::Connections)
.await
.unwrap();
assert!(within_quota);
}
#[tokio::test]
async fn test_tenant_suspension() {
let config = MultiTenancyConfig::default();
let manager = MultiTenancyManager::new(config);
manager.initialize().await.unwrap();
let tenant = manager
.create_tenant("Test".to_string(), TenantTier::Basic)
.await
.unwrap();
manager
.suspend_tenant(&tenant.tenant_id, "Testing".to_string())
.await
.unwrap();
let suspended_tenant = manager.get_tenant(&tenant.tenant_id).await.unwrap();
assert_eq!(suspended_tenant.status, TenantStatus::Suspended);
}
#[tokio::test]
async fn test_tier_quota() {
let config = MultiTenancyConfig::default();
let manager = MultiTenancyManager::new(config);
let free_quota = manager.get_quota_for_tier(TenantTier::Free);
let enterprise_quota = manager.get_quota_for_tier(TenantTier::Enterprise);
assert!(enterprise_quota.max_events_per_second > free_quota.max_events_per_second);
assert!(enterprise_quota.max_connections > free_quota.max_connections);
}
}