use crate::dataframe::DataFrame;
use crate::error::{Error, Result};
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant, SystemTime};
pub type TenantId = String;
pub type DatasetId = String;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Permission {
Read,
Write,
Delete,
Create,
Share,
Admin,
}
#[derive(Debug, Clone)]
pub struct ResourceQuota {
pub max_total_rows: Option<usize>,
pub max_datasets: Option<usize>,
pub max_memory_bytes: Option<usize>,
pub max_columns_per_dataset: Option<usize>,
pub max_query_time: Option<Duration>,
}
impl Default for ResourceQuota {
fn default() -> Self {
ResourceQuota {
max_total_rows: Some(10_000_000),
max_datasets: Some(100),
max_memory_bytes: Some(1024 * 1024 * 1024), max_columns_per_dataset: Some(1000),
max_query_time: Some(Duration::from_secs(300)),
}
}
}
impl ResourceQuota {
pub fn unlimited() -> Self {
ResourceQuota {
max_total_rows: None,
max_datasets: None,
max_memory_bytes: None,
max_columns_per_dataset: None,
max_query_time: None,
}
}
}
#[derive(Debug, Clone)]
pub struct TenantConfig {
pub id: TenantId,
pub name: String,
pub description: Option<String>,
pub permissions: HashSet<Permission>,
pub quota: ResourceQuota,
pub active: bool,
pub created_at: SystemTime,
pub tags: HashMap<String, String>,
}
impl TenantConfig {
pub fn new(id: impl Into<String>) -> Self {
let id = id.into();
TenantConfig {
id: id.clone(),
name: id,
description: None,
permissions: HashSet::new(),
quota: ResourceQuota::default(),
active: true,
created_at: SystemTime::now(),
tags: HashMap::new(),
}
}
pub fn with_name(mut self, name: impl Into<String>) -> Self {
self.name = name.into();
self
}
pub fn with_description(mut self, desc: impl Into<String>) -> Self {
self.description = Some(desc.into());
self
}
pub fn with_permission(mut self, perm: Permission) -> Self {
self.permissions.insert(perm);
self
}
pub fn with_permissions(mut self, perms: HashSet<Permission>) -> Self {
self.permissions = perms;
self
}
pub fn with_quota(mut self, quota: ResourceQuota) -> Self {
self.quota = quota;
self
}
pub fn with_max_rows(mut self, max: usize) -> Self {
self.quota.max_total_rows = Some(max);
self
}
pub fn with_max_datasets(mut self, max: usize) -> Self {
self.quota.max_datasets = Some(max);
self
}
pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.tags.insert(key.into(), value.into());
self
}
pub fn default_rw(id: impl Into<String>) -> Self {
Self::new(id)
.with_permission(Permission::Read)
.with_permission(Permission::Write)
.with_permission(Permission::Create)
}
}
#[derive(Debug, Clone, Default)]
pub struct TenantUsage {
pub dataset_count: usize,
pub total_rows: usize,
pub estimated_memory: usize,
pub read_operations: u64,
pub write_operations: u64,
pub last_access: Option<Instant>,
}
#[derive(Debug, Clone)]
pub struct TenantAuditEntry {
pub timestamp: SystemTime,
pub tenant_id: TenantId,
pub operation: TenantOperation,
pub dataset_id: Option<DatasetId>,
pub success: bool,
pub error_message: Option<String>,
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TenantOperation {
CreateDataset,
ReadDataset,
UpdateDataset,
DeleteDataset,
ShareDataset,
Query,
SchemaChange,
ConfigChange,
}
#[derive(Debug, Clone)]
pub struct DatasetMetadata {
pub id: DatasetId,
pub owner: TenantId,
pub created_at: SystemTime,
pub modified_at: SystemTime,
pub row_count: usize,
pub column_count: usize,
pub columns: Vec<String>,
pub tags: HashMap<String, String>,
pub shared_with: HashSet<TenantId>,
}
#[derive(Debug)]
struct TenantStore {
datasets: HashMap<DatasetId, Arc<RwLock<DataFrame>>>,
metadata: HashMap<DatasetId, DatasetMetadata>,
usage: TenantUsage,
}
impl TenantStore {
fn new() -> Self {
TenantStore {
datasets: HashMap::new(),
metadata: HashMap::new(),
usage: TenantUsage::default(),
}
}
}
#[derive(Debug)]
pub struct TenantManager {
tenants: HashMap<TenantId, TenantConfig>,
stores: HashMap<TenantId, TenantStore>,
audit_log: Vec<TenantAuditEntry>,
max_audit_entries: usize,
enforce_quotas: bool,
}
impl TenantManager {
pub fn new() -> Self {
TenantManager {
tenants: HashMap::new(),
stores: HashMap::new(),
audit_log: Vec::new(),
max_audit_entries: 10000,
enforce_quotas: true,
}
}
pub fn with_max_audit_entries(mut self, max: usize) -> Self {
self.max_audit_entries = max;
self
}
pub fn with_quota_enforcement(mut self, enforce: bool) -> Self {
self.enforce_quotas = enforce;
self
}
pub fn register_tenant(&mut self, config: TenantConfig) -> Result<()> {
if self.tenants.contains_key(&config.id) {
return Err(Error::InvalidInput(format!(
"Tenant '{}' already exists",
config.id
)));
}
let tenant_id = config.id.clone();
self.tenants.insert(tenant_id.clone(), config);
self.stores.insert(tenant_id, TenantStore::new());
Ok(())
}
pub fn remove_tenant(&mut self, tenant_id: &str) -> Result<()> {
if !self.tenants.contains_key(tenant_id) {
return Err(Error::InvalidInput(format!(
"Tenant '{}' not found",
tenant_id
)));
}
self.tenants.remove(tenant_id);
self.stores.remove(tenant_id);
self.log_operation(
tenant_id.to_string(),
TenantOperation::ConfigChange,
None,
true,
None,
);
Ok(())
}
pub fn get_tenant(&self, tenant_id: &str) -> Option<&TenantConfig> {
self.tenants.get(tenant_id)
}
pub fn update_tenant(&mut self, config: TenantConfig) -> Result<()> {
if !self.tenants.contains_key(&config.id) {
return Err(Error::InvalidInput(format!(
"Tenant '{}' not found",
config.id
)));
}
let tenant_id = config.id.clone();
self.tenants.insert(tenant_id.clone(), config);
self.log_operation(tenant_id, TenantOperation::ConfigChange, None, true, None);
Ok(())
}
pub fn list_tenants(&self) -> Vec<&TenantId> {
self.tenants.keys().collect()
}
pub fn has_permission(&self, tenant_id: &str, permission: Permission) -> bool {
self.tenants
.get(tenant_id)
.map(|t| t.active && t.permissions.contains(&permission))
.unwrap_or(false)
}
pub fn store_dataframe(
&mut self,
tenant_id: &str,
dataset_id: &str,
df: DataFrame,
) -> Result<()> {
let config = self
.tenants
.get(tenant_id)
.ok_or_else(|| Error::InvalidInput(format!("Tenant '{}' not found", tenant_id)))?;
let operation = if self
.stores
.get(tenant_id)
.map(|s| s.datasets.contains_key(dataset_id))
.unwrap_or(false)
{
if !config.permissions.contains(&Permission::Write) {
return Err(Error::InvalidOperation(format!(
"Tenant '{}' does not have write permission",
tenant_id
)));
}
TenantOperation::UpdateDataset
} else {
if !config.permissions.contains(&Permission::Create) {
return Err(Error::InvalidOperation(format!(
"Tenant '{}' does not have create permission",
tenant_id
)));
}
TenantOperation::CreateDataset
};
if self.enforce_quotas {
self.check_quotas(tenant_id, &df)?;
}
let store = self
.stores
.get_mut(tenant_id)
.ok_or_else(|| Error::InvalidInput(format!("Tenant store not found")))?;
let column_names = df.column_names();
let row_count = df.row_count();
let col_count = column_names.len();
let metadata = DatasetMetadata {
id: dataset_id.to_string(),
owner: tenant_id.to_string(),
created_at: SystemTime::now(),
modified_at: SystemTime::now(),
row_count,
column_count: col_count,
columns: column_names,
tags: HashMap::new(),
shared_with: HashSet::new(),
};
if let Some(old_meta) = store.metadata.get(dataset_id) {
store.usage.total_rows = store.usage.total_rows.saturating_sub(old_meta.row_count);
} else {
store.usage.dataset_count += 1;
}
store.usage.total_rows += row_count;
store.usage.write_operations += 1;
store.usage.last_access = Some(Instant::now());
store
.datasets
.insert(dataset_id.to_string(), Arc::new(RwLock::new(df)));
store.metadata.insert(dataset_id.to_string(), metadata);
self.log_operation(
tenant_id.to_string(),
operation,
Some(dataset_id.to_string()),
true,
None,
);
Ok(())
}
pub fn get_dataframe(&mut self, tenant_id: &str, dataset_id: &str) -> Result<DataFrame> {
let config = self
.tenants
.get(tenant_id)
.ok_or_else(|| Error::InvalidInput(format!("Tenant '{}' not found", tenant_id)))?;
if !config.permissions.contains(&Permission::Read) {
return Err(Error::InvalidOperation(format!(
"Tenant '{}' does not have read permission",
tenant_id
)));
}
let store = self
.stores
.get_mut(tenant_id)
.ok_or_else(|| Error::InvalidInput(format!("Tenant store not found")))?;
let df_lock = store.datasets.get(dataset_id).ok_or_else(|| {
Error::InvalidInput(format!(
"Dataset '{}' not found for tenant '{}'",
dataset_id, tenant_id
))
})?;
let df = df_lock
.read()
.map_err(|_| Error::InvalidOperation("Failed to acquire read lock".to_string()))?
.clone();
store.usage.read_operations += 1;
store.usage.last_access = Some(Instant::now());
self.log_operation(
tenant_id.to_string(),
TenantOperation::ReadDataset,
Some(dataset_id.to_string()),
true,
None,
);
Ok(df)
}
pub fn delete_dataframe(&mut self, tenant_id: &str, dataset_id: &str) -> Result<()> {
let config = self
.tenants
.get(tenant_id)
.ok_or_else(|| Error::InvalidInput(format!("Tenant '{}' not found", tenant_id)))?;
if !config.permissions.contains(&Permission::Delete) {
return Err(Error::InvalidOperation(format!(
"Tenant '{}' does not have delete permission",
tenant_id
)));
}
let store = self
.stores
.get_mut(tenant_id)
.ok_or_else(|| Error::InvalidInput(format!("Tenant store not found")))?;
if let Some(metadata) = store.metadata.remove(dataset_id) {
store.datasets.remove(dataset_id);
store.usage.dataset_count = store.usage.dataset_count.saturating_sub(1);
store.usage.total_rows = store.usage.total_rows.saturating_sub(metadata.row_count);
}
self.log_operation(
tenant_id.to_string(),
TenantOperation::DeleteDataset,
Some(dataset_id.to_string()),
true,
None,
);
Ok(())
}
pub fn list_datasets(&self, tenant_id: &str) -> Result<Vec<&DatasetMetadata>> {
let store = self
.stores
.get(tenant_id)
.ok_or_else(|| Error::InvalidInput(format!("Tenant '{}' not found", tenant_id)))?;
Ok(store.metadata.values().collect())
}
pub fn get_dataset_metadata(
&self,
tenant_id: &str,
dataset_id: &str,
) -> Result<&DatasetMetadata> {
let store = self
.stores
.get(tenant_id)
.ok_or_else(|| Error::InvalidInput(format!("Tenant '{}' not found", tenant_id)))?;
store.metadata.get(dataset_id).ok_or_else(|| {
Error::InvalidInput(format!(
"Dataset '{}' not found for tenant '{}'",
dataset_id, tenant_id
))
})
}
pub fn share_dataset(
&mut self,
owner_tenant: &str,
dataset_id: &str,
target_tenant: &str,
) -> Result<()> {
let owner_config = self
.tenants
.get(owner_tenant)
.ok_or_else(|| Error::InvalidInput(format!("Tenant '{}' not found", owner_tenant)))?;
if !owner_config.permissions.contains(&Permission::Share) {
return Err(Error::InvalidOperation(format!(
"Tenant '{}' does not have share permission",
owner_tenant
)));
}
if !self.tenants.contains_key(target_tenant) {
return Err(Error::InvalidInput(format!(
"Target tenant '{}' not found",
target_tenant
)));
}
let store = self
.stores
.get_mut(owner_tenant)
.ok_or_else(|| Error::InvalidInput(format!("Tenant store not found")))?;
let metadata = store
.metadata
.get_mut(dataset_id)
.ok_or_else(|| Error::InvalidInput(format!("Dataset '{}' not found", dataset_id)))?;
metadata.shared_with.insert(target_tenant.to_string());
self.log_operation(
owner_tenant.to_string(),
TenantOperation::ShareDataset,
Some(dataset_id.to_string()),
true,
None,
);
Ok(())
}
pub fn get_usage(&self, tenant_id: &str) -> Result<&TenantUsage> {
let store = self
.stores
.get(tenant_id)
.ok_or_else(|| Error::InvalidInput(format!("Tenant '{}' not found", tenant_id)))?;
Ok(&store.usage)
}
pub fn get_audit_log(&self, tenant_id: Option<&str>) -> Vec<&TenantAuditEntry> {
self.audit_log
.iter()
.filter(|entry| tenant_id.map(|id| entry.tenant_id == id).unwrap_or(true))
.collect()
}
fn check_quotas(&self, tenant_id: &str, df: &DataFrame) -> Result<()> {
let config = self
.tenants
.get(tenant_id)
.ok_or_else(|| Error::InvalidInput(format!("Tenant '{}' not found", tenant_id)))?;
let store = self
.stores
.get(tenant_id)
.ok_or_else(|| Error::InvalidInput(format!("Tenant store not found")))?;
let new_rows = df.row_count();
let new_cols = df.column_names().len();
if let Some(max) = config.quota.max_datasets {
if store.usage.dataset_count >= max {
return Err(Error::InvalidOperation(format!(
"Dataset quota exceeded: max {} datasets allowed",
max
)));
}
}
if let Some(max) = config.quota.max_total_rows {
if store.usage.total_rows + new_rows > max {
return Err(Error::InvalidOperation(format!(
"Row quota exceeded: max {} total rows allowed",
max
)));
}
}
if let Some(max) = config.quota.max_columns_per_dataset {
if new_cols > max {
return Err(Error::InvalidOperation(format!(
"Column quota exceeded: max {} columns per dataset",
max
)));
}
}
Ok(())
}
fn log_operation(
&mut self,
tenant_id: TenantId,
operation: TenantOperation,
dataset_id: Option<DatasetId>,
success: bool,
error_message: Option<String>,
) {
let entry = TenantAuditEntry {
timestamp: SystemTime::now(),
tenant_id,
operation,
dataset_id,
success,
error_message,
metadata: HashMap::new(),
};
self.audit_log.push(entry);
if self.audit_log.len() > self.max_audit_entries {
let excess = self.audit_log.len() - self.max_audit_entries;
self.audit_log.drain(0..excess);
}
}
}
impl Default for TenantManager {
fn default() -> Self {
Self::new()
}
}
pub type SharedTenantManager = Arc<RwLock<TenantManager>>;
pub fn create_shared_manager() -> SharedTenantManager {
Arc::new(RwLock::new(TenantManager::new()))
}
#[derive(Debug, Clone)]
pub struct IsolationContext {
pub tenant_id: TenantId,
pub session_id: String,
pub start_time: Instant,
pub max_execution_time: Option<Duration>,
}
impl IsolationContext {
pub fn new(tenant_id: impl Into<String>) -> Self {
IsolationContext {
tenant_id: tenant_id.into(),
session_id: format!(
"session_{}",
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_millis()
),
start_time: Instant::now(),
max_execution_time: None,
}
}
pub fn check_time_limit(&self) -> Result<()> {
if let Some(max_time) = self.max_execution_time {
if self.start_time.elapsed() > max_time {
return Err(Error::InvalidOperation(
"Query execution time limit exceeded".to_string(),
));
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::series::Series;
fn create_test_df() -> DataFrame {
let mut df = DataFrame::new();
let x = Series::new(vec![1.0, 2.0, 3.0, 4.0, 5.0], Some("x".to_string()))
.expect("operation should succeed");
let y = Series::new(vec![10.0, 20.0, 30.0, 40.0, 50.0], Some("y".to_string()))
.expect("operation should succeed");
df.add_column("x".to_string(), x)
.expect("operation should succeed");
df.add_column("y".to_string(), y)
.expect("operation should succeed");
df
}
#[test]
fn test_tenant_registration() {
let mut manager = TenantManager::new();
let config = TenantConfig::default_rw("tenant_a");
manager
.register_tenant(config)
.expect("operation should succeed");
assert!(manager.get_tenant("tenant_a").is_some());
assert!(manager.get_tenant("tenant_b").is_none());
}
#[test]
fn test_data_isolation() {
let mut manager = TenantManager::new();
manager
.register_tenant(TenantConfig::default_rw("tenant_a"))
.expect("operation should succeed");
manager
.register_tenant(TenantConfig::default_rw("tenant_b"))
.expect("operation should succeed");
let df = create_test_df();
manager
.store_dataframe("tenant_a", "data", df)
.expect("operation should succeed");
assert!(manager.get_dataframe("tenant_a", "data").is_ok());
assert!(manager.get_dataframe("tenant_b", "data").is_err());
}
#[test]
fn test_permission_enforcement() {
let mut manager = TenantManager::new();
let config = TenantConfig::new("readonly").with_permission(Permission::Read);
manager
.register_tenant(config)
.expect("operation should succeed");
let df = create_test_df();
assert!(manager.store_dataframe("readonly", "data", df).is_err());
}
#[test]
fn test_quota_enforcement() {
let mut manager = TenantManager::new();
let config = TenantConfig::default_rw("limited").with_max_rows(8); manager
.register_tenant(config)
.expect("operation should succeed");
let df = create_test_df();
manager
.store_dataframe("limited", "data1", df)
.expect("operation should succeed");
let df2 = create_test_df();
let result = manager.store_dataframe("limited", "data2", df2);
assert!(result.is_err(), "Should fail due to quota exceeded");
}
#[test]
fn test_usage_tracking() {
let mut manager = TenantManager::new();
manager
.register_tenant(TenantConfig::default_rw("tenant_a"))
.expect("operation should succeed");
let df = create_test_df();
manager
.store_dataframe("tenant_a", "data", df)
.expect("operation should succeed");
let usage = manager
.get_usage("tenant_a")
.expect("operation should succeed");
assert_eq!(usage.dataset_count, 1);
assert_eq!(usage.total_rows, 5);
assert_eq!(usage.write_operations, 1);
}
#[test]
fn test_audit_log() {
let mut manager = TenantManager::new();
manager
.register_tenant(TenantConfig::default_rw("tenant_a"))
.expect("operation should succeed");
let df = create_test_df();
manager
.store_dataframe("tenant_a", "data", df)
.expect("operation should succeed");
let _ = manager.get_dataframe("tenant_a", "data");
let audit = manager.get_audit_log(Some("tenant_a"));
assert!(audit.len() >= 2); }
#[test]
fn test_dataset_sharing() {
let mut manager = TenantManager::new();
let config_a = TenantConfig::default_rw("tenant_a").with_permission(Permission::Share);
manager
.register_tenant(config_a)
.expect("operation should succeed");
manager
.register_tenant(TenantConfig::default_rw("tenant_b"))
.expect("operation should succeed");
let df = create_test_df();
manager
.store_dataframe("tenant_a", "data", df)
.expect("operation should succeed");
manager
.share_dataset("tenant_a", "data", "tenant_b")
.expect("operation should succeed");
let metadata = manager
.get_dataset_metadata("tenant_a", "data")
.expect("operation should succeed");
assert!(metadata.shared_with.contains("tenant_b"));
}
#[test]
fn test_list_datasets() {
let mut manager = TenantManager::new();
manager
.register_tenant(TenantConfig::default_rw("tenant_a"))
.expect("operation should succeed");
let df1 = create_test_df();
let df2 = create_test_df();
manager
.store_dataframe("tenant_a", "data1", df1)
.expect("operation should succeed");
manager
.store_dataframe("tenant_a", "data2", df2)
.expect("operation should succeed");
let datasets = manager
.list_datasets("tenant_a")
.expect("operation should succeed");
assert_eq!(datasets.len(), 2);
}
#[test]
fn test_delete_dataset() {
let mut manager = TenantManager::new();
let config = TenantConfig::default_rw("tenant_a").with_permission(Permission::Delete);
manager
.register_tenant(config)
.expect("operation should succeed");
let df = create_test_df();
manager
.store_dataframe("tenant_a", "data", df)
.expect("operation should succeed");
assert!(manager.get_dataframe("tenant_a", "data").is_ok());
manager
.delete_dataframe("tenant_a", "data")
.expect("operation should succeed");
assert!(manager.get_dataframe("tenant_a", "data").is_err());
}
#[test]
fn test_isolation_context() {
let ctx = IsolationContext::new("tenant_a");
assert_eq!(ctx.tenant_id, "tenant_a");
assert!(ctx.check_time_limit().is_ok());
}
}