allsource_core/
tenant.rs

1use crate::error::{AllSourceError, Result};
2use chrono::{DateTime, Utc};
3use dashmap::DashMap;
4use serde::{Deserialize, Serialize};
5use std::sync::Arc;
6
7/// Tenant quotas and limits
8#[derive(Debug, Clone, Serialize, Deserialize)]
9pub struct TenantQuotas {
10    /// Maximum events per day (0 = unlimited)
11    pub max_events_per_day: u64,
12    /// Maximum storage in bytes (0 = unlimited)
13    pub max_storage_bytes: u64,
14    /// Maximum queries per hour (0 = unlimited)
15    pub max_queries_per_hour: u64,
16    /// Maximum API keys (0 = unlimited)
17    pub max_api_keys: u32,
18    /// Maximum projections (0 = unlimited)
19    pub max_projections: u32,
20    /// Maximum pipelines (0 = unlimited)
21    pub max_pipelines: u32,
22}
23
24impl Default for TenantQuotas {
25    fn default() -> Self {
26        Self {
27            max_events_per_day: 1_000_000,    // 1M events/day
28            max_storage_bytes: 10_737_418_240, // 10 GB
29            max_queries_per_hour: 100_000,     // 100K queries/hour
30            max_api_keys: 10,
31            max_projections: 50,
32            max_pipelines: 20,
33        }
34    }
35}
36
37impl TenantQuotas {
38    /// Unlimited quotas
39    pub fn unlimited() -> Self {
40        Self {
41            max_events_per_day: 0,
42            max_storage_bytes: 0,
43            max_queries_per_hour: 0,
44            max_api_keys: 0,
45            max_projections: 0,
46            max_pipelines: 0,
47        }
48    }
49
50    /// Free tier quotas
51    pub fn free_tier() -> Self {
52        Self {
53            max_events_per_day: 10_000,
54            max_storage_bytes: 1_073_741_824, // 1 GB
55            max_queries_per_hour: 1_000,
56            max_api_keys: 2,
57            max_projections: 5,
58            max_pipelines: 2,
59        }
60    }
61
62    /// Professional tier quotas
63    pub fn professional() -> Self {
64        Self {
65            max_events_per_day: 1_000_000,
66            max_storage_bytes: 107_374_182_400, // 100 GB
67            max_queries_per_hour: 100_000,
68            max_api_keys: 25,
69            max_projections: 100,
70            max_pipelines: 50,
71        }
72    }
73}
74
75/// Tenant usage statistics
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct TenantUsage {
78    /// Events ingested today
79    pub events_today: u64,
80    /// Total events stored
81    pub total_events: u64,
82    /// Storage used in bytes
83    pub storage_bytes: u64,
84    /// Queries in current hour
85    pub queries_this_hour: u64,
86    /// Active API keys
87    pub active_api_keys: u32,
88    /// Active projections
89    pub active_projections: u32,
90    /// Active pipelines
91    pub active_pipelines: u32,
92    /// Last reset time for daily counters
93    pub last_daily_reset: DateTime<Utc>,
94    /// Last reset time for hourly counters
95    pub last_hourly_reset: DateTime<Utc>,
96}
97
98impl Default for TenantUsage {
99    fn default() -> Self {
100        Self {
101            events_today: 0,
102            total_events: 0,
103            storage_bytes: 0,
104            queries_this_hour: 0,
105            active_api_keys: 0,
106            active_projections: 0,
107            active_pipelines: 0,
108            last_daily_reset: Utc::now(),
109            last_hourly_reset: Utc::now(),
110        }
111    }
112}
113
114impl TenantUsage {
115    /// Reset daily counters if needed
116    pub fn reset_daily_if_needed(&mut self) {
117        let now = Utc::now();
118        let hours_since_reset = (now - self.last_daily_reset).num_hours();
119
120        if hours_since_reset >= 24 {
121            self.events_today = 0;
122            self.last_daily_reset = now;
123        }
124    }
125
126    /// Reset hourly counters if needed
127    pub fn reset_hourly_if_needed(&mut self) {
128        let now = Utc::now();
129        let hours_since_reset = (now - self.last_hourly_reset).num_hours();
130
131        if hours_since_reset >= 1 {
132            self.queries_this_hour = 0;
133            self.last_hourly_reset = now;
134        }
135    }
136
137    /// Check and reset counters
138    pub fn check_and_reset(&mut self) {
139        self.reset_daily_if_needed();
140        self.reset_hourly_if_needed();
141    }
142}
143
144/// Tenant configuration
145#[derive(Debug, Clone, Serialize, Deserialize)]
146pub struct Tenant {
147    pub id: String,
148    pub name: String,
149    pub description: Option<String>,
150    pub quotas: TenantQuotas,
151    pub usage: TenantUsage,
152    pub created_at: DateTime<Utc>,
153    pub updated_at: DateTime<Utc>,
154    pub active: bool,
155    /// Custom metadata
156    pub metadata: serde_json::Value,
157}
158
159impl Tenant {
160    /// Create new tenant
161    pub fn new(id: String, name: String, quotas: TenantQuotas) -> Self {
162        let now = Utc::now();
163        Self {
164            id,
165            name,
166            description: None,
167            quotas,
168            usage: TenantUsage::default(),
169            created_at: now,
170            updated_at: now,
171            active: true,
172            metadata: serde_json::json!({}),
173        }
174    }
175
176    /// Check if tenant can ingest more events
177    pub fn can_ingest_event(&mut self) -> Result<()> {
178        if !self.active {
179            return Err(AllSourceError::ValidationError(
180                "Tenant is inactive".to_string(),
181            ));
182        }
183
184        self.usage.check_and_reset();
185
186        if self.quotas.max_events_per_day > 0
187            && self.usage.events_today >= self.quotas.max_events_per_day
188        {
189            return Err(AllSourceError::ValidationError(
190                "Daily event quota exceeded".to_string(),
191            ));
192        }
193
194        if self.quotas.max_storage_bytes > 0
195            && self.usage.storage_bytes >= self.quotas.max_storage_bytes
196        {
197            return Err(AllSourceError::ValidationError(
198                "Storage quota exceeded".to_string(),
199            ));
200        }
201
202        Ok(())
203    }
204
205    /// Record event ingestion
206    pub fn record_event(&mut self, size_bytes: u64) {
207        self.usage.events_today += 1;
208        self.usage.total_events += 1;
209        self.usage.storage_bytes += size_bytes;
210        self.updated_at = Utc::now();
211    }
212
213    /// Check if tenant can execute query
214    pub fn can_query(&mut self) -> Result<()> {
215        if !self.active {
216            return Err(AllSourceError::ValidationError(
217                "Tenant is inactive".to_string(),
218            ));
219        }
220
221        self.usage.check_and_reset();
222
223        if self.quotas.max_queries_per_hour > 0
224            && self.usage.queries_this_hour >= self.quotas.max_queries_per_hour
225        {
226            return Err(AllSourceError::ValidationError(
227                "Hourly query quota exceeded".to_string(),
228            ));
229        }
230
231        Ok(())
232    }
233
234    /// Record query execution
235    pub fn record_query(&mut self) {
236        self.usage.queries_this_hour += 1;
237        self.updated_at = Utc::now();
238    }
239
240    /// Get quota utilization percentage
241    pub fn quota_utilization(&self) -> serde_json::Value {
242        let events_pct = if self.quotas.max_events_per_day > 0 {
243            (self.usage.events_today as f64 / self.quotas.max_events_per_day as f64) * 100.0
244        } else {
245            0.0
246        };
247
248        let storage_pct = if self.quotas.max_storage_bytes > 0 {
249            (self.usage.storage_bytes as f64 / self.quotas.max_storage_bytes as f64) * 100.0
250        } else {
251            0.0
252        };
253
254        let queries_pct = if self.quotas.max_queries_per_hour > 0 {
255            (self.usage.queries_this_hour as f64 / self.quotas.max_queries_per_hour as f64) * 100.0
256        } else {
257            0.0
258        };
259
260        serde_json::json!({
261            "events_today": {
262                "used": self.usage.events_today,
263                "limit": self.quotas.max_events_per_day,
264                "percentage": events_pct.min(100.0)
265            },
266            "storage": {
267                "used_bytes": self.usage.storage_bytes,
268                "limit_bytes": self.quotas.max_storage_bytes,
269                "percentage": storage_pct.min(100.0)
270            },
271            "queries_this_hour": {
272                "used": self.usage.queries_this_hour,
273                "limit": self.quotas.max_queries_per_hour,
274                "percentage": queries_pct.min(100.0)
275            }
276        })
277    }
278}
279
280/// Tenant manager
281pub struct TenantManager {
282    tenants: Arc<DashMap<String, Tenant>>,
283}
284
285impl TenantManager {
286    /// Create new tenant manager
287    pub fn new() -> Self {
288        let manager = Self {
289            tenants: Arc::new(DashMap::new()),
290        };
291
292        // Create default tenant
293        let default_tenant = Tenant::new(
294            "default".to_string(),
295            "Default Tenant".to_string(),
296            TenantQuotas::unlimited(),
297        );
298        manager.tenants.insert("default".to_string(), default_tenant);
299
300        manager
301    }
302
303    /// Create tenant
304    pub fn create_tenant(
305        &self,
306        id: String,
307        name: String,
308        quotas: TenantQuotas,
309    ) -> Result<Tenant> {
310        if self.tenants.contains_key(&id) {
311            return Err(AllSourceError::ValidationError(
312                "Tenant ID already exists".to_string(),
313            ));
314        }
315
316        let tenant = Tenant::new(id.clone(), name, quotas);
317        self.tenants.insert(id, tenant.clone());
318
319        Ok(tenant)
320    }
321
322    /// Get tenant
323    pub fn get_tenant(&self, tenant_id: &str) -> Result<Tenant> {
324        self.tenants
325            .get(tenant_id)
326            .map(|t| t.clone())
327            .ok_or_else(|| AllSourceError::ValidationError("Tenant not found".to_string()))
328    }
329
330    /// Update tenant quotas
331    pub fn update_quotas(&self, tenant_id: &str, quotas: TenantQuotas) -> Result<()> {
332        if let Some(mut tenant) = self.tenants.get_mut(tenant_id) {
333            tenant.quotas = quotas;
334            tenant.updated_at = Utc::now();
335            Ok(())
336        } else {
337            Err(AllSourceError::ValidationError(
338                "Tenant not found".to_string(),
339            ))
340        }
341    }
342
343    /// Deactivate tenant
344    pub fn deactivate_tenant(&self, tenant_id: &str) -> Result<()> {
345        if tenant_id == "default" {
346            return Err(AllSourceError::ValidationError(
347                "Cannot deactivate default tenant".to_string(),
348            ));
349        }
350
351        if let Some(mut tenant) = self.tenants.get_mut(tenant_id) {
352            tenant.active = false;
353            tenant.updated_at = Utc::now();
354            Ok(())
355        } else {
356            Err(AllSourceError::ValidationError(
357                "Tenant not found".to_string(),
358            ))
359        }
360    }
361
362    /// Activate tenant
363    pub fn activate_tenant(&self, tenant_id: &str) -> Result<()> {
364        if let Some(mut tenant) = self.tenants.get_mut(tenant_id) {
365            tenant.active = true;
366            tenant.updated_at = Utc::now();
367            Ok(())
368        } else {
369            Err(AllSourceError::ValidationError(
370                "Tenant not found".to_string(),
371            ))
372        }
373    }
374
375    /// Delete tenant
376    pub fn delete_tenant(&self, tenant_id: &str) -> Result<()> {
377        if tenant_id == "default" {
378            return Err(AllSourceError::ValidationError(
379                "Cannot delete default tenant".to_string(),
380            ));
381        }
382
383        self.tenants
384            .remove(tenant_id)
385            .ok_or_else(|| AllSourceError::ValidationError("Tenant not found".to_string()))?;
386
387        Ok(())
388    }
389
390    /// List all tenants
391    pub fn list_tenants(&self) -> Vec<Tenant> {
392        self.tenants.iter().map(|entry| entry.value().clone()).collect()
393    }
394
395    /// Check if tenant can ingest event
396    pub fn check_can_ingest(&self, tenant_id: &str) -> Result<()> {
397        if let Some(mut tenant) = self.tenants.get_mut(tenant_id) {
398            tenant.can_ingest_event()
399        } else {
400            Err(AllSourceError::ValidationError(
401                "Tenant not found".to_string(),
402            ))
403        }
404    }
405
406    /// Record event ingestion
407    pub fn record_ingestion(&self, tenant_id: &str, size_bytes: u64) -> Result<()> {
408        if let Some(mut tenant) = self.tenants.get_mut(tenant_id) {
409            tenant.record_event(size_bytes);
410            Ok(())
411        } else {
412            Err(AllSourceError::ValidationError(
413                "Tenant not found".to_string(),
414            ))
415        }
416    }
417
418    /// Check if tenant can query
419    pub fn check_can_query(&self, tenant_id: &str) -> Result<()> {
420        if let Some(mut tenant) = self.tenants.get_mut(tenant_id) {
421            tenant.can_query()
422        } else {
423            Err(AllSourceError::ValidationError(
424                "Tenant not found".to_string(),
425            ))
426        }
427    }
428
429    /// Record query execution
430    pub fn record_query(&self, tenant_id: &str) -> Result<()> {
431        if let Some(mut tenant) = self.tenants.get_mut(tenant_id) {
432            tenant.record_query();
433            Ok(())
434        } else {
435            Err(AllSourceError::ValidationError(
436                "Tenant not found".to_string(),
437            ))
438        }
439    }
440
441    /// Get tenant statistics
442    pub fn get_stats(&self, tenant_id: &str) -> Result<serde_json::Value> {
443        let tenant = self.get_tenant(tenant_id)?;
444
445        Ok(serde_json::json!({
446            "tenant_id": tenant.id,
447            "name": tenant.name,
448            "active": tenant.active,
449            "usage": tenant.usage,
450            "quotas": tenant.quotas,
451            "utilization": tenant.quota_utilization(),
452            "created_at": tenant.created_at,
453            "updated_at": tenant.updated_at
454        }))
455    }
456}
457
458impl Default for TenantManager {
459    fn default() -> Self {
460        Self::new()
461    }
462}
463
464#[cfg(test)]
465mod tests {
466    use super::*;
467
468    #[test]
469    fn test_tenant_creation() {
470        let manager = TenantManager::new();
471
472        let tenant = manager
473            .create_tenant(
474                "test-tenant".to_string(),
475                "Test Tenant".to_string(),
476                TenantQuotas::default(),
477            )
478            .unwrap();
479
480        assert_eq!(tenant.id, "test-tenant");
481        assert_eq!(tenant.name, "Test Tenant");
482        assert!(tenant.active);
483    }
484
485    #[test]
486    fn test_quota_enforcement() {
487        let mut tenant = Tenant::new(
488            "test".to_string(),
489            "Test".to_string(),
490            TenantQuotas {
491                max_events_per_day: 10,
492                max_storage_bytes: 1000,
493                ..Default::default()
494            },
495        );
496
497        // Should allow first 10 events
498        for _ in 0..10 {
499            assert!(tenant.can_ingest_event().is_ok());
500            tenant.record_event(50);
501        }
502
503        // Should reject 11th event (quota exceeded)
504        assert!(tenant.can_ingest_event().is_err());
505    }
506
507    #[test]
508    fn test_tenant_deactivation() {
509        let manager = TenantManager::new();
510
511        manager
512            .create_tenant(
513                "test".to_string(),
514                "Test".to_string(),
515                TenantQuotas::default(),
516            )
517            .unwrap();
518
519        manager.deactivate_tenant("test").unwrap();
520
521        let tenant = manager.get_tenant("test").unwrap();
522        assert!(!tenant.active);
523    }
524
525    #[test]
526    fn test_quota_utilization() {
527        let mut tenant = Tenant::new(
528            "test".to_string(),
529            "Test".to_string(),
530            TenantQuotas {
531                max_events_per_day: 100,
532                max_storage_bytes: 1000,
533                max_queries_per_hour: 50,
534                ..Default::default()
535            },
536        );
537
538        tenant.record_event(500); // 50% storage
539        tenant.record_event(250); // 75% storage, 2% events
540
541        let util = tenant.quota_utilization();
542        assert_eq!(util["events_today"]["used"], 2);
543        assert_eq!(util["storage"]["used_bytes"], 750);
544    }
545}