1use std::sync::{Arc, RwLock};
2use std::collections::HashMap;
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5
6use super::tenant::{TenantId, TenantInfo, TenantConfig, TenantStatus, TenantError};
7use super::isolation::{TenantIsolation, IsolationPolicy};
8use super::quota::{TenantQuota, ResourceUsage, ResourceType};
9use crate::error::{EventualiError, Result};
10
11pub struct TenantManager {
13 tenants: Arc<RwLock<HashMap<TenantId, TenantInfo>>>,
14 quotas: Arc<RwLock<HashMap<TenantId, Arc<TenantQuota>>>>,
15 isolation: Arc<TenantIsolation>,
16 registry: Arc<RwLock<TenantRegistry>>,
17}
18
19impl Default for TenantManager {
20 fn default() -> Self {
21 Self::new()
22 }
23}
24
25impl TenantManager {
26 pub fn new() -> Self {
27 Self {
28 tenants: Arc::new(RwLock::new(HashMap::new())),
29 quotas: Arc::new(RwLock::new(HashMap::new())),
30 isolation: Arc::new(TenantIsolation::new()),
31 registry: Arc::new(RwLock::new(TenantRegistry::new())),
32 }
33 }
34
35 pub async fn create_tenant(&self, tenant_id: TenantId, name: String, config: Option<TenantConfig>) -> Result<TenantInfo> {
37 {
39 let tenants = self.tenants.read().unwrap();
40 if tenants.contains_key(&tenant_id) {
41 return Err(EventualiError::from(TenantError::TenantAlreadyExists(tenant_id)));
42 }
43 }
44
45 let mut tenant_info = TenantInfo::new(tenant_id.clone(), name);
47 if let Some(config) = config {
48 tenant_info.config = config;
49 }
50
51 let quota = Arc::new(TenantQuota::new(tenant_id.clone(), tenant_info.config.resource_limits.clone()));
53
54 let isolation_policy = match tenant_info.config.isolation_level {
56 super::tenant::IsolationLevel::Database => IsolationPolicy::strict(),
57 super::tenant::IsolationLevel::Application => IsolationPolicy::relaxed(),
58 super::tenant::IsolationLevel::Row => IsolationPolicy::relaxed(),
59 };
60
61 self.isolation.register_tenant(tenant_id.clone(), isolation_policy)?;
62
63 {
65 let mut tenants = self.tenants.write().unwrap();
66 let mut quotas = self.quotas.write().unwrap();
67 let mut registry = self.registry.write().unwrap();
68
69 tenants.insert(tenant_id.clone(), tenant_info.clone());
70 quotas.insert(tenant_id.clone(), quota);
71 registry.register_tenant(tenant_id.clone())?;
72 }
73
74 Ok(tenant_info)
75 }
76
77 pub fn get_tenant(&self, tenant_id: &TenantId) -> Result<TenantInfo> {
79 let tenants = self.tenants.read().unwrap();
80 tenants.get(tenant_id)
81 .cloned()
82 .ok_or_else(|| EventualiError::from(TenantError::TenantNotFound(tenant_id.clone())))
83 }
84
85 pub fn list_tenants(&self, status_filter: Option<TenantStatus>) -> Vec<TenantInfo> {
87 let tenants = self.tenants.read().unwrap();
88 tenants.values()
89 .filter(|tenant| {
90 status_filter.as_ref().is_none_or(|status| {
91 std::mem::discriminant(&tenant.status) == std::mem::discriminant(status)
92 })
93 })
94 .cloned()
95 .collect()
96 }
97
98 pub fn update_tenant(&self, tenant_id: &TenantId, updates: TenantUpdate) -> Result<TenantInfo> {
100 let mut tenants = self.tenants.write().unwrap();
101 let tenant = tenants.get_mut(tenant_id)
102 .ok_or_else(|| EventualiError::from(TenantError::TenantNotFound(tenant_id.clone())))?;
103
104 tenant.updated_at = Utc::now();
105
106 if let Some(name) = updates.name {
107 tenant.name = name;
108 }
109
110 if let Some(description) = updates.description {
111 tenant.description = Some(description);
112 }
113
114 if let Some(status) = updates.status {
115 tenant.status = status;
116 }
117
118 if let Some(config) = updates.config {
119 tenant.config = config;
120
121 let quotas = self.quotas.read().unwrap();
123 if let Some(_quota) = quotas.get(tenant_id) {
124 }
127 }
128
129 Ok(tenant.clone())
130 }
131
132 pub fn delete_tenant(&self, tenant_id: &TenantId) -> Result<()> {
134 let mut tenants = self.tenants.write().unwrap();
135 let tenant = tenants.get_mut(tenant_id)
136 .ok_or_else(|| EventualiError::from(TenantError::TenantNotFound(tenant_id.clone())))?;
137
138 tenant.status = TenantStatus::PendingDeletion;
139 tenant.updated_at = Utc::now();
140
141 Ok(())
142 }
143
144 pub fn get_tenant_usage(&self, tenant_id: &TenantId) -> Result<ResourceUsage> {
146 let quotas = self.quotas.read().unwrap();
147 let quota = quotas.get(tenant_id)
148 .ok_or_else(|| EventualiError::from(TenantError::TenantNotFound(tenant_id.clone())))?;
149
150 Ok(quota.get_legacy_usage())
151 }
152
153 pub fn check_tenant_quota(&self, tenant_id: &TenantId, resource_type: ResourceType, amount: u64) -> Result<()> {
155 let quotas = self.quotas.read().unwrap();
156 let quota = quotas.get(tenant_id)
157 .ok_or_else(|| EventualiError::from(TenantError::TenantNotFound(tenant_id.clone())))?;
158
159 match quota.check_quota(resource_type, amount) {
161 Ok(result) => {
162 if result.allowed {
163 Ok(())
164 } else {
165 Err(EventualiError::Tenant(format!("Quota exceeded for resource {resource_type:?}")))
166 }
167 }
168 Err(e) => Err(e)
169 }
170 }
171
172 pub fn record_tenant_usage(&self, tenant_id: &TenantId, resource_type: ResourceType, amount: u64) -> Result<()> {
174 let quotas = self.quotas.read().unwrap();
175 let quota = quotas.get(tenant_id)
176 .ok_or_else(|| EventualiError::from(TenantError::TenantNotFound(tenant_id.clone())))?;
177
178 quota.record_usage(resource_type, amount);
179
180 let mut tenants = self.tenants.write().unwrap();
182 if let Some(tenant) = tenants.get_mut(tenant_id) {
183 tenant.metadata.last_activity = Some(Utc::now());
184
185 match resource_type {
186 ResourceType::Events => tenant.metadata.total_events += amount,
187 ResourceType::Aggregates => tenant.metadata.total_aggregates += amount,
188 ResourceType::Storage => tenant.metadata.storage_used_mb += amount as f64,
189 _ => {}
190 }
191 }
192
193 Ok(())
194 }
195
196 pub fn get_tenants_near_limits(&self) -> Vec<(TenantId, ResourceUsage)> {
198 let quotas = self.quotas.read().unwrap();
199 quotas.iter()
200 .filter_map(|(tenant_id, quota)| {
201 let enhanced_usage = quota.get_usage();
202 if enhanced_usage.has_resources_near_limit() {
203 let legacy_usage = quota.get_legacy_usage();
205 Some((tenant_id.clone(), legacy_usage))
206 } else {
207 None
208 }
209 })
210 .collect()
211 }
212
213 pub fn get_isolation_metrics(&self) -> super::isolation::IsolationMetrics {
215 self.isolation.get_metrics()
216 }
217}
218
219#[derive(Debug, Clone)]
221pub struct TenantUpdate {
222 pub name: Option<String>,
223 pub description: Option<String>,
224 pub status: Option<TenantStatus>,
225 pub config: Option<TenantConfig>,
226}
227
228#[async_trait]
230pub trait TenantOperations {
231 async fn create_tenant(&self, tenant_id: TenantId, name: String, config: Option<TenantConfig>) -> Result<TenantInfo>;
232 async fn get_tenant(&self, tenant_id: &TenantId) -> Result<TenantInfo>;
233 async fn update_tenant(&self, tenant_id: &TenantId, updates: TenantUpdate) -> Result<TenantInfo>;
234 async fn delete_tenant(&self, tenant_id: &TenantId) -> Result<()>;
235 async fn list_tenants(&self, status_filter: Option<TenantStatus>) -> Result<Vec<TenantInfo>>;
236}
237
238#[async_trait]
239impl TenantOperations for TenantManager {
240 async fn create_tenant(&self, tenant_id: TenantId, name: String, config: Option<TenantConfig>) -> Result<TenantInfo> {
241 self.create_tenant(tenant_id, name, config).await
242 }
243
244 async fn get_tenant(&self, tenant_id: &TenantId) -> Result<TenantInfo> {
245 self.get_tenant(tenant_id)
246 }
247
248 async fn update_tenant(&self, tenant_id: &TenantId, updates: TenantUpdate) -> Result<TenantInfo> {
249 self.update_tenant(tenant_id, updates)
250 }
251
252 async fn delete_tenant(&self, tenant_id: &TenantId) -> Result<()> {
253 self.delete_tenant(tenant_id)
254 }
255
256 async fn list_tenants(&self, status_filter: Option<TenantStatus>) -> Result<Vec<TenantInfo>> {
257 Ok(self.list_tenants(status_filter))
258 }
259}
260
261#[derive(Debug)]
263pub struct TenantRegistry {
264 registered_tenants: HashMap<TenantId, RegistrationInfo>,
265 performance_stats: PerformanceStats,
266}
267
268impl Default for TenantRegistry {
269 fn default() -> Self {
270 Self::new()
271 }
272}
273
274impl TenantRegistry {
275 pub fn new() -> Self {
276 Self {
277 registered_tenants: HashMap::new(),
278 performance_stats: PerformanceStats::new(),
279 }
280 }
281
282 pub fn register_tenant(&mut self, tenant_id: TenantId) -> Result<()> {
283 let info = RegistrationInfo {
284 registered_at: Utc::now(),
285 last_activity: Utc::now(),
286 operation_count: 0,
287 };
288
289 self.registered_tenants.insert(tenant_id, info);
290 self.performance_stats.total_tenants += 1;
291
292 Ok(())
293 }
294
295 pub fn record_activity(&mut self, tenant_id: &TenantId) {
296 if let Some(info) = self.registered_tenants.get_mut(tenant_id) {
297 info.last_activity = Utc::now();
298 info.operation_count += 1;
299 self.performance_stats.total_operations += 1;
300 }
301 }
302
303 pub fn get_stats(&self) -> &PerformanceStats {
304 &self.performance_stats
305 }
306}
307
308#[derive(Debug, Clone)]
309struct RegistrationInfo {
310 #[allow(dead_code)] registered_at: DateTime<Utc>,
312 last_activity: DateTime<Utc>,
313 operation_count: u64,
314}
315
316#[derive(Debug, Clone)]
317pub struct PerformanceStats {
318 pub total_tenants: u64,
319 pub active_tenants: u64,
320 pub total_operations: u64,
321 pub average_response_time_ms: f64,
322}
323
324impl Default for PerformanceStats {
325 fn default() -> Self {
326 Self::new()
327 }
328}
329
330impl PerformanceStats {
331 pub fn new() -> Self {
332 Self {
333 total_tenants: 0,
334 active_tenants: 0,
335 total_operations: 0,
336 average_response_time_ms: 0.0,
337 }
338 }
339}
340
341#[cfg(test)]
342mod tests {
343 use super::*;
344
345 #[tokio::test]
346 async fn test_tenant_creation() {
347 let manager = TenantManager::new();
348 let tenant_id = TenantId::new("test-tenant".to_string()).unwrap();
349
350 let tenant_info = manager.create_tenant(
351 tenant_id.clone(),
352 "Test Tenant".to_string(),
353 None
354 ).await.unwrap();
355
356 assert_eq!(tenant_info.id, tenant_id);
357 assert_eq!(tenant_info.name, "Test Tenant");
358 assert!(tenant_info.is_active());
359 }
360
361 #[tokio::test]
362 async fn test_tenant_operations() {
363 let manager = TenantManager::new();
364 let tenant_id = TenantId::new("test-tenant".to_string()).unwrap();
365
366 let _tenant_info = manager.create_tenant(
368 tenant_id.clone(),
369 "Test Tenant".to_string(),
370 None
371 ).await.unwrap();
372
373 let retrieved = manager.get_tenant(&tenant_id).unwrap();
375 assert_eq!(retrieved.name, "Test Tenant");
376
377 let updates = TenantUpdate {
379 name: Some("Updated Tenant".to_string()),
380 description: Some("Updated description".to_string()),
381 status: None,
382 config: None,
383 };
384
385 let updated = manager.update_tenant(&tenant_id, updates).unwrap();
386 assert_eq!(updated.name, "Updated Tenant");
387 assert_eq!(updated.description, Some("Updated description".to_string()));
388 }
389
390 #[test]
391 fn test_quota_checking() {
392 let manager = TenantManager::new();
393 let tenant_id = TenantId::new("test-tenant".to_string()).unwrap();
394
395 assert!(manager.check_tenant_quota(&tenant_id, ResourceType::Events, 100).is_err());
398 }
399}