1use crate::error::{AllSourceError, Result};
2use chrono::{DateTime, Utc};
3use dashmap::DashMap;
4use serde::{Deserialize, Serialize};
5use std::sync::Arc;
6
7#[derive(Debug, Clone, Serialize, Deserialize)]
9pub struct TenantQuotas {
10 pub max_events_per_day: u64,
12 pub max_storage_bytes: u64,
14 pub max_queries_per_hour: u64,
16 pub max_api_keys: u32,
18 pub max_projections: u32,
20 pub max_pipelines: u32,
22}
23
24impl Default for TenantQuotas {
25 fn default() -> Self {
26 Self {
27 max_events_per_day: 1_000_000, max_storage_bytes: 10_737_418_240, max_queries_per_hour: 100_000, max_api_keys: 10,
31 max_projections: 50,
32 max_pipelines: 20,
33 }
34 }
35}
36
37impl TenantQuotas {
38 pub fn unlimited() -> Self {
40 Self {
41 max_events_per_day: 0,
42 max_storage_bytes: 0,
43 max_queries_per_hour: 0,
44 max_api_keys: 0,
45 max_projections: 0,
46 max_pipelines: 0,
47 }
48 }
49
50 pub fn free_tier() -> Self {
52 Self {
53 max_events_per_day: 10_000,
54 max_storage_bytes: 1_073_741_824, max_queries_per_hour: 1_000,
56 max_api_keys: 2,
57 max_projections: 5,
58 max_pipelines: 2,
59 }
60 }
61
62 pub fn professional() -> Self {
64 Self {
65 max_events_per_day: 1_000_000,
66 max_storage_bytes: 107_374_182_400, max_queries_per_hour: 100_000,
68 max_api_keys: 25,
69 max_projections: 100,
70 max_pipelines: 50,
71 }
72 }
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct TenantUsage {
78 pub events_today: u64,
80 pub total_events: u64,
82 pub storage_bytes: u64,
84 pub queries_this_hour: u64,
86 pub active_api_keys: u32,
88 pub active_projections: u32,
90 pub active_pipelines: u32,
92 pub last_daily_reset: DateTime<Utc>,
94 pub last_hourly_reset: DateTime<Utc>,
96}
97
98impl Default for TenantUsage {
99 fn default() -> Self {
100 Self {
101 events_today: 0,
102 total_events: 0,
103 storage_bytes: 0,
104 queries_this_hour: 0,
105 active_api_keys: 0,
106 active_projections: 0,
107 active_pipelines: 0,
108 last_daily_reset: Utc::now(),
109 last_hourly_reset: Utc::now(),
110 }
111 }
112}
113
114impl TenantUsage {
115 pub fn reset_daily_if_needed(&mut self) {
117 let now = Utc::now();
118 let hours_since_reset = (now - self.last_daily_reset).num_hours();
119
120 if hours_since_reset >= 24 {
121 self.events_today = 0;
122 self.last_daily_reset = now;
123 }
124 }
125
126 pub fn reset_hourly_if_needed(&mut self) {
128 let now = Utc::now();
129 let hours_since_reset = (now - self.last_hourly_reset).num_hours();
130
131 if hours_since_reset >= 1 {
132 self.queries_this_hour = 0;
133 self.last_hourly_reset = now;
134 }
135 }
136
137 pub fn check_and_reset(&mut self) {
139 self.reset_daily_if_needed();
140 self.reset_hourly_if_needed();
141 }
142}
143
144#[derive(Debug, Clone, Serialize, Deserialize)]
146pub struct Tenant {
147 pub id: String,
148 pub name: String,
149 pub description: Option<String>,
150 pub quotas: TenantQuotas,
151 pub usage: TenantUsage,
152 pub created_at: DateTime<Utc>,
153 pub updated_at: DateTime<Utc>,
154 pub active: bool,
155 pub metadata: serde_json::Value,
157}
158
159impl Tenant {
160 pub fn new(id: String, name: String, quotas: TenantQuotas) -> Self {
162 let now = Utc::now();
163 Self {
164 id,
165 name,
166 description: None,
167 quotas,
168 usage: TenantUsage::default(),
169 created_at: now,
170 updated_at: now,
171 active: true,
172 metadata: serde_json::json!({}),
173 }
174 }
175
176 pub fn can_ingest_event(&mut self) -> Result<()> {
178 if !self.active {
179 return Err(AllSourceError::ValidationError(
180 "Tenant is inactive".to_string(),
181 ));
182 }
183
184 self.usage.check_and_reset();
185
186 if self.quotas.max_events_per_day > 0
187 && self.usage.events_today >= self.quotas.max_events_per_day
188 {
189 return Err(AllSourceError::ValidationError(
190 "Daily event quota exceeded".to_string(),
191 ));
192 }
193
194 if self.quotas.max_storage_bytes > 0
195 && self.usage.storage_bytes >= self.quotas.max_storage_bytes
196 {
197 return Err(AllSourceError::ValidationError(
198 "Storage quota exceeded".to_string(),
199 ));
200 }
201
202 Ok(())
203 }
204
205 pub fn record_event(&mut self, size_bytes: u64) {
207 self.usage.events_today += 1;
208 self.usage.total_events += 1;
209 self.usage.storage_bytes += size_bytes;
210 self.updated_at = Utc::now();
211 }
212
213 pub fn can_query(&mut self) -> Result<()> {
215 if !self.active {
216 return Err(AllSourceError::ValidationError(
217 "Tenant is inactive".to_string(),
218 ));
219 }
220
221 self.usage.check_and_reset();
222
223 if self.quotas.max_queries_per_hour > 0
224 && self.usage.queries_this_hour >= self.quotas.max_queries_per_hour
225 {
226 return Err(AllSourceError::ValidationError(
227 "Hourly query quota exceeded".to_string(),
228 ));
229 }
230
231 Ok(())
232 }
233
234 pub fn record_query(&mut self) {
236 self.usage.queries_this_hour += 1;
237 self.updated_at = Utc::now();
238 }
239
240 pub fn quota_utilization(&self) -> serde_json::Value {
242 let events_pct = if self.quotas.max_events_per_day > 0 {
243 (self.usage.events_today as f64 / self.quotas.max_events_per_day as f64) * 100.0
244 } else {
245 0.0
246 };
247
248 let storage_pct = if self.quotas.max_storage_bytes > 0 {
249 (self.usage.storage_bytes as f64 / self.quotas.max_storage_bytes as f64) * 100.0
250 } else {
251 0.0
252 };
253
254 let queries_pct = if self.quotas.max_queries_per_hour > 0 {
255 (self.usage.queries_this_hour as f64 / self.quotas.max_queries_per_hour as f64) * 100.0
256 } else {
257 0.0
258 };
259
260 serde_json::json!({
261 "events_today": {
262 "used": self.usage.events_today,
263 "limit": self.quotas.max_events_per_day,
264 "percentage": events_pct.min(100.0)
265 },
266 "storage": {
267 "used_bytes": self.usage.storage_bytes,
268 "limit_bytes": self.quotas.max_storage_bytes,
269 "percentage": storage_pct.min(100.0)
270 },
271 "queries_this_hour": {
272 "used": self.usage.queries_this_hour,
273 "limit": self.quotas.max_queries_per_hour,
274 "percentage": queries_pct.min(100.0)
275 }
276 })
277 }
278}
279
280pub struct TenantManager {
282 tenants: Arc<DashMap<String, Tenant>>,
283}
284
285impl TenantManager {
286 pub fn new() -> Self {
288 let manager = Self {
289 tenants: Arc::new(DashMap::new()),
290 };
291
292 let default_tenant = Tenant::new(
294 "default".to_string(),
295 "Default Tenant".to_string(),
296 TenantQuotas::unlimited(),
297 );
298 manager.tenants.insert("default".to_string(), default_tenant);
299
300 manager
301 }
302
303 pub fn create_tenant(
305 &self,
306 id: String,
307 name: String,
308 quotas: TenantQuotas,
309 ) -> Result<Tenant> {
310 if self.tenants.contains_key(&id) {
311 return Err(AllSourceError::ValidationError(
312 "Tenant ID already exists".to_string(),
313 ));
314 }
315
316 let tenant = Tenant::new(id.clone(), name, quotas);
317 self.tenants.insert(id, tenant.clone());
318
319 Ok(tenant)
320 }
321
322 pub fn get_tenant(&self, tenant_id: &str) -> Result<Tenant> {
324 self.tenants
325 .get(tenant_id)
326 .map(|t| t.clone())
327 .ok_or_else(|| AllSourceError::ValidationError("Tenant not found".to_string()))
328 }
329
330 pub fn update_quotas(&self, tenant_id: &str, quotas: TenantQuotas) -> Result<()> {
332 if let Some(mut tenant) = self.tenants.get_mut(tenant_id) {
333 tenant.quotas = quotas;
334 tenant.updated_at = Utc::now();
335 Ok(())
336 } else {
337 Err(AllSourceError::ValidationError(
338 "Tenant not found".to_string(),
339 ))
340 }
341 }
342
343 pub fn deactivate_tenant(&self, tenant_id: &str) -> Result<()> {
345 if tenant_id == "default" {
346 return Err(AllSourceError::ValidationError(
347 "Cannot deactivate default tenant".to_string(),
348 ));
349 }
350
351 if let Some(mut tenant) = self.tenants.get_mut(tenant_id) {
352 tenant.active = false;
353 tenant.updated_at = Utc::now();
354 Ok(())
355 } else {
356 Err(AllSourceError::ValidationError(
357 "Tenant not found".to_string(),
358 ))
359 }
360 }
361
362 pub fn activate_tenant(&self, tenant_id: &str) -> Result<()> {
364 if let Some(mut tenant) = self.tenants.get_mut(tenant_id) {
365 tenant.active = true;
366 tenant.updated_at = Utc::now();
367 Ok(())
368 } else {
369 Err(AllSourceError::ValidationError(
370 "Tenant not found".to_string(),
371 ))
372 }
373 }
374
375 pub fn delete_tenant(&self, tenant_id: &str) -> Result<()> {
377 if tenant_id == "default" {
378 return Err(AllSourceError::ValidationError(
379 "Cannot delete default tenant".to_string(),
380 ));
381 }
382
383 self.tenants
384 .remove(tenant_id)
385 .ok_or_else(|| AllSourceError::ValidationError("Tenant not found".to_string()))?;
386
387 Ok(())
388 }
389
390 pub fn list_tenants(&self) -> Vec<Tenant> {
392 self.tenants.iter().map(|entry| entry.value().clone()).collect()
393 }
394
395 pub fn check_can_ingest(&self, tenant_id: &str) -> Result<()> {
397 if let Some(mut tenant) = self.tenants.get_mut(tenant_id) {
398 tenant.can_ingest_event()
399 } else {
400 Err(AllSourceError::ValidationError(
401 "Tenant not found".to_string(),
402 ))
403 }
404 }
405
406 pub fn record_ingestion(&self, tenant_id: &str, size_bytes: u64) -> Result<()> {
408 if let Some(mut tenant) = self.tenants.get_mut(tenant_id) {
409 tenant.record_event(size_bytes);
410 Ok(())
411 } else {
412 Err(AllSourceError::ValidationError(
413 "Tenant not found".to_string(),
414 ))
415 }
416 }
417
418 pub fn check_can_query(&self, tenant_id: &str) -> Result<()> {
420 if let Some(mut tenant) = self.tenants.get_mut(tenant_id) {
421 tenant.can_query()
422 } else {
423 Err(AllSourceError::ValidationError(
424 "Tenant not found".to_string(),
425 ))
426 }
427 }
428
429 pub fn record_query(&self, tenant_id: &str) -> Result<()> {
431 if let Some(mut tenant) = self.tenants.get_mut(tenant_id) {
432 tenant.record_query();
433 Ok(())
434 } else {
435 Err(AllSourceError::ValidationError(
436 "Tenant not found".to_string(),
437 ))
438 }
439 }
440
441 pub fn get_stats(&self, tenant_id: &str) -> Result<serde_json::Value> {
443 let tenant = self.get_tenant(tenant_id)?;
444
445 Ok(serde_json::json!({
446 "tenant_id": tenant.id,
447 "name": tenant.name,
448 "active": tenant.active,
449 "usage": tenant.usage,
450 "quotas": tenant.quotas,
451 "utilization": tenant.quota_utilization(),
452 "created_at": tenant.created_at,
453 "updated_at": tenant.updated_at
454 }))
455 }
456}
457
458impl Default for TenantManager {
459 fn default() -> Self {
460 Self::new()
461 }
462}
463
464#[cfg(test)]
465mod tests {
466 use super::*;
467
468 #[test]
469 fn test_tenant_creation() {
470 let manager = TenantManager::new();
471
472 let tenant = manager
473 .create_tenant(
474 "test-tenant".to_string(),
475 "Test Tenant".to_string(),
476 TenantQuotas::default(),
477 )
478 .unwrap();
479
480 assert_eq!(tenant.id, "test-tenant");
481 assert_eq!(tenant.name, "Test Tenant");
482 assert!(tenant.active);
483 }
484
485 #[test]
486 fn test_quota_enforcement() {
487 let mut tenant = Tenant::new(
488 "test".to_string(),
489 "Test".to_string(),
490 TenantQuotas {
491 max_events_per_day: 10,
492 max_storage_bytes: 1000,
493 ..Default::default()
494 },
495 );
496
497 for _ in 0..10 {
499 assert!(tenant.can_ingest_event().is_ok());
500 tenant.record_event(50);
501 }
502
503 assert!(tenant.can_ingest_event().is_err());
505 }
506
507 #[test]
508 fn test_tenant_deactivation() {
509 let manager = TenantManager::new();
510
511 manager
512 .create_tenant(
513 "test".to_string(),
514 "Test".to_string(),
515 TenantQuotas::default(),
516 )
517 .unwrap();
518
519 manager.deactivate_tenant("test").unwrap();
520
521 let tenant = manager.get_tenant("test").unwrap();
522 assert!(!tenant.active);
523 }
524
525 #[test]
526 fn test_quota_utilization() {
527 let mut tenant = Tenant::new(
528 "test".to_string(),
529 "Test".to_string(),
530 TenantQuotas {
531 max_events_per_day: 100,
532 max_storage_bytes: 1000,
533 max_queries_per_hour: 50,
534 ..Default::default()
535 },
536 );
537
538 tenant.record_event(500); tenant.record_event(250); let util = tenant.quota_utilization();
542 assert_eq!(util["events_today"]["used"], 2);
543 assert_eq!(util["storage"]["used_bytes"], 750);
544 }
545}