use crate::error::{AllSourceError, Result};
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TenantQuotas {
pub max_events_per_day: u64,
pub max_storage_bytes: u64,
pub max_queries_per_hour: u64,
pub max_api_keys: u32,
pub max_projections: u32,
pub max_pipelines: u32,
}
impl Default for TenantQuotas {
fn default() -> Self {
Self {
max_events_per_day: 1_000_000, max_storage_bytes: 10_737_418_240, max_queries_per_hour: 100_000, max_api_keys: 10,
max_projections: 50,
max_pipelines: 20,
}
}
}
impl TenantQuotas {
pub fn unlimited() -> Self {
Self {
max_events_per_day: 0,
max_storage_bytes: 0,
max_queries_per_hour: 0,
max_api_keys: 0,
max_projections: 0,
max_pipelines: 0,
}
}
pub fn free_tier() -> Self {
Self {
max_events_per_day: 10_000,
max_storage_bytes: 1_073_741_824, max_queries_per_hour: 1_000,
max_api_keys: 2,
max_projections: 5,
max_pipelines: 2,
}
}
pub fn professional() -> Self {
Self {
max_events_per_day: 1_000_000,
max_storage_bytes: 107_374_182_400, max_queries_per_hour: 100_000,
max_api_keys: 25,
max_projections: 100,
max_pipelines: 50,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TenantUsage {
pub events_today: u64,
pub total_events: u64,
pub storage_bytes: u64,
pub queries_this_hour: u64,
pub active_api_keys: u32,
pub active_projections: u32,
pub active_pipelines: u32,
pub last_daily_reset: DateTime<Utc>,
pub last_hourly_reset: DateTime<Utc>,
}
impl Default for TenantUsage {
fn default() -> Self {
Self {
events_today: 0,
total_events: 0,
storage_bytes: 0,
queries_this_hour: 0,
active_api_keys: 0,
active_projections: 0,
active_pipelines: 0,
last_daily_reset: Utc::now(),
last_hourly_reset: Utc::now(),
}
}
}
impl TenantUsage {
pub fn reset_daily_if_needed(&mut self) {
let now = Utc::now();
let hours_since_reset = (now - self.last_daily_reset).num_hours();
if hours_since_reset >= 24 {
self.events_today = 0;
self.last_daily_reset = now;
}
}
pub fn reset_hourly_if_needed(&mut self) {
let now = Utc::now();
let hours_since_reset = (now - self.last_hourly_reset).num_hours();
if hours_since_reset >= 1 {
self.queries_this_hour = 0;
self.last_hourly_reset = now;
}
}
pub fn check_and_reset(&mut self) {
self.reset_daily_if_needed();
self.reset_hourly_if_needed();
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Tenant {
pub id: String,
pub name: String,
pub description: Option<String>,
pub quotas: TenantQuotas,
pub usage: TenantUsage,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub active: bool,
pub metadata: serde_json::Value,
}
impl Tenant {
pub fn new(id: String, name: String, quotas: TenantQuotas) -> Self {
let now = Utc::now();
Self {
id,
name,
description: None,
quotas,
usage: TenantUsage::default(),
created_at: now,
updated_at: now,
active: true,
metadata: serde_json::json!({}),
}
}
pub fn can_ingest_event(&mut self) -> Result<()> {
if !self.active {
return Err(AllSourceError::ValidationError(
"Tenant is inactive".to_string(),
));
}
self.usage.check_and_reset();
if self.quotas.max_events_per_day > 0
&& self.usage.events_today >= self.quotas.max_events_per_day
{
return Err(AllSourceError::ValidationError(
"Daily event quota exceeded".to_string(),
));
}
if self.quotas.max_storage_bytes > 0
&& self.usage.storage_bytes >= self.quotas.max_storage_bytes
{
return Err(AllSourceError::ValidationError(
"Storage quota exceeded".to_string(),
));
}
Ok(())
}
pub fn record_event(&mut self, size_bytes: u64) {
self.usage.events_today += 1;
self.usage.total_events += 1;
self.usage.storage_bytes += size_bytes;
self.updated_at = Utc::now();
}
pub fn can_query(&mut self) -> Result<()> {
if !self.active {
return Err(AllSourceError::ValidationError(
"Tenant is inactive".to_string(),
));
}
self.usage.check_and_reset();
if self.quotas.max_queries_per_hour > 0
&& self.usage.queries_this_hour >= self.quotas.max_queries_per_hour
{
return Err(AllSourceError::ValidationError(
"Hourly query quota exceeded".to_string(),
));
}
Ok(())
}
pub fn record_query(&mut self) {
self.usage.queries_this_hour += 1;
self.updated_at = Utc::now();
}
pub fn quota_utilization(&self) -> serde_json::Value {
let events_pct = if self.quotas.max_events_per_day > 0 {
(self.usage.events_today as f64 / self.quotas.max_events_per_day as f64) * 100.0
} else {
0.0
};
let storage_pct = if self.quotas.max_storage_bytes > 0 {
(self.usage.storage_bytes as f64 / self.quotas.max_storage_bytes as f64) * 100.0
} else {
0.0
};
let queries_pct = if self.quotas.max_queries_per_hour > 0 {
(self.usage.queries_this_hour as f64 / self.quotas.max_queries_per_hour as f64) * 100.0
} else {
0.0
};
serde_json::json!({
"events_today": {
"used": self.usage.events_today,
"limit": self.quotas.max_events_per_day,
"percentage": events_pct.min(100.0)
},
"storage": {
"used_bytes": self.usage.storage_bytes,
"limit_bytes": self.quotas.max_storage_bytes,
"percentage": storage_pct.min(100.0)
},
"queries_this_hour": {
"used": self.usage.queries_this_hour,
"limit": self.quotas.max_queries_per_hour,
"percentage": queries_pct.min(100.0)
}
})
}
}
pub struct TenantManager {
tenants: Arc<DashMap<String, Tenant>>,
}
impl TenantManager {
pub fn new() -> Self {
let manager = Self {
tenants: Arc::new(DashMap::new()),
};
let default_tenant = Tenant::new(
"default".to_string(),
"Default Tenant".to_string(),
TenantQuotas::unlimited(),
);
manager
.tenants
.insert("default".to_string(), default_tenant);
manager
}
pub fn create_tenant(&self, id: String, name: String, quotas: TenantQuotas) -> Result<Tenant> {
if self.tenants.contains_key(&id) {
return Err(AllSourceError::ValidationError(
"Tenant ID already exists".to_string(),
));
}
let tenant = Tenant::new(id.clone(), name, quotas);
self.tenants.insert(id, tenant.clone());
Ok(tenant)
}
pub fn get_tenant(&self, tenant_id: &str) -> Result<Tenant> {
self.tenants
.get(tenant_id)
.map(|t| t.clone())
.ok_or_else(|| AllSourceError::ValidationError("Tenant not found".to_string()))
}
pub fn update_quotas(&self, tenant_id: &str, quotas: TenantQuotas) -> Result<()> {
if let Some(mut tenant) = self.tenants.get_mut(tenant_id) {
tenant.quotas = quotas;
tenant.updated_at = Utc::now();
Ok(())
} else {
Err(AllSourceError::ValidationError(
"Tenant not found".to_string(),
))
}
}
pub fn deactivate_tenant(&self, tenant_id: &str) -> Result<()> {
if tenant_id == "default" {
return Err(AllSourceError::ValidationError(
"Cannot deactivate default tenant".to_string(),
));
}
if let Some(mut tenant) = self.tenants.get_mut(tenant_id) {
tenant.active = false;
tenant.updated_at = Utc::now();
Ok(())
} else {
Err(AllSourceError::ValidationError(
"Tenant not found".to_string(),
))
}
}
pub fn activate_tenant(&self, tenant_id: &str) -> Result<()> {
if let Some(mut tenant) = self.tenants.get_mut(tenant_id) {
tenant.active = true;
tenant.updated_at = Utc::now();
Ok(())
} else {
Err(AllSourceError::ValidationError(
"Tenant not found".to_string(),
))
}
}
pub fn delete_tenant(&self, tenant_id: &str) -> Result<()> {
if tenant_id == "default" {
return Err(AllSourceError::ValidationError(
"Cannot delete default tenant".to_string(),
));
}
self.tenants
.remove(tenant_id)
.ok_or_else(|| AllSourceError::ValidationError("Tenant not found".to_string()))?;
Ok(())
}
pub fn list_tenants(&self) -> Vec<Tenant> {
self.tenants
.iter()
.map(|entry| entry.value().clone())
.collect()
}
pub fn check_can_ingest(&self, tenant_id: &str) -> Result<()> {
if let Some(mut tenant) = self.tenants.get_mut(tenant_id) {
tenant.can_ingest_event()
} else {
Err(AllSourceError::ValidationError(
"Tenant not found".to_string(),
))
}
}
pub fn record_ingestion(&self, tenant_id: &str, size_bytes: u64) -> Result<()> {
if let Some(mut tenant) = self.tenants.get_mut(tenant_id) {
tenant.record_event(size_bytes);
Ok(())
} else {
Err(AllSourceError::ValidationError(
"Tenant not found".to_string(),
))
}
}
pub fn check_can_query(&self, tenant_id: &str) -> Result<()> {
if let Some(mut tenant) = self.tenants.get_mut(tenant_id) {
tenant.can_query()
} else {
Err(AllSourceError::ValidationError(
"Tenant not found".to_string(),
))
}
}
pub fn record_query(&self, tenant_id: &str) -> Result<()> {
if let Some(mut tenant) = self.tenants.get_mut(tenant_id) {
tenant.record_query();
Ok(())
} else {
Err(AllSourceError::ValidationError(
"Tenant not found".to_string(),
))
}
}
pub fn get_stats(&self, tenant_id: &str) -> Result<serde_json::Value> {
let tenant = self.get_tenant(tenant_id)?;
Ok(serde_json::json!({
"tenant_id": tenant.id,
"name": tenant.name,
"active": tenant.active,
"usage": tenant.usage,
"quotas": tenant.quotas,
"utilization": tenant.quota_utilization(),
"created_at": tenant.created_at,
"updated_at": tenant.updated_at
}))
}
}
impl Default for TenantManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_tenant_creation() {
let manager = TenantManager::new();
let tenant = manager
.create_tenant(
"test-tenant".to_string(),
"Test Tenant".to_string(),
TenantQuotas::default(),
)
.unwrap();
assert_eq!(tenant.id, "test-tenant");
assert_eq!(tenant.name, "Test Tenant");
assert!(tenant.active);
}
#[test]
fn test_quota_enforcement() {
let mut tenant = Tenant::new(
"test".to_string(),
"Test".to_string(),
TenantQuotas {
max_events_per_day: 10,
max_storage_bytes: 1000,
..Default::default()
},
);
for _ in 0..10 {
assert!(tenant.can_ingest_event().is_ok());
tenant.record_event(50);
}
assert!(tenant.can_ingest_event().is_err());
}
#[test]
fn test_tenant_deactivation() {
let manager = TenantManager::new();
manager
.create_tenant(
"test".to_string(),
"Test".to_string(),
TenantQuotas::default(),
)
.unwrap();
manager.deactivate_tenant("test").unwrap();
let tenant = manager.get_tenant("test").unwrap();
assert!(!tenant.active);
}
#[test]
fn test_quota_utilization() {
let mut tenant = Tenant::new(
"test".to_string(),
"Test".to_string(),
TenantQuotas {
max_events_per_day: 100,
max_storage_bytes: 1000,
max_queries_per_hour: 50,
..Default::default()
},
);
tenant.record_event(500); tenant.record_event(250);
let util = tenant.quota_utilization();
assert_eq!(util["events_today"]["used"], 2);
assert_eq!(util["storage"]["used_bytes"], 750);
}
}