use std::collections::HashMap;
use crate::types::TenantId;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct TenantQuota {
pub max_memory_bytes: u64,
pub max_storage_bytes: u64,
pub max_concurrent_requests: u32,
pub max_qps: u32,
pub max_vector_dim: u32,
pub max_graph_depth: u32,
#[serde(default)]
pub max_connections: u32,
}
impl Default for TenantQuota {
fn default() -> Self {
Self {
max_memory_bytes: 1024 * 1024 * 1024, max_storage_bytes: 10 * 1024 * 1024 * 1024, max_concurrent_requests: 100,
max_qps: 1000,
max_vector_dim: 4096,
max_graph_depth: 10,
max_connections: 0, }
}
}
#[derive(Debug, Clone, Default)]
pub struct TenantUsage {
pub memory_bytes: u64,
pub storage_bytes: u64,
pub active_requests: u32,
pub requests_this_second: u32,
pub total_requests: u64,
pub rejected_requests: u64,
pub active_connections: u32,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum QuotaCheck {
Allowed,
MemoryExceeded { used: u64, limit: u64 },
StorageExceeded { used: u64, limit: u64 },
ConcurrencyExceeded { active: u32, limit: u32 },
RateLimited { qps: u32, limit: u32 },
}
impl QuotaCheck {
pub fn is_allowed(&self) -> bool {
matches!(self, QuotaCheck::Allowed)
}
}
#[derive(Debug)]
pub struct TenantIsolation {
quotas: HashMap<TenantId, TenantQuota>,
usage: HashMap<TenantId, TenantUsage>,
default_quota: TenantQuota,
}
impl TenantIsolation {
pub fn new(default_quota: TenantQuota) -> Self {
Self {
quotas: HashMap::new(),
usage: HashMap::new(),
default_quota,
}
}
pub fn set_quota(&mut self, tenant_id: TenantId, quota: TenantQuota) {
self.quotas.insert(tenant_id, quota);
}
pub fn quota(&self, tenant_id: TenantId) -> &TenantQuota {
self.quotas.get(&tenant_id).unwrap_or(&self.default_quota)
}
pub fn check(&self, tenant_id: TenantId) -> QuotaCheck {
let quota = self.quota(tenant_id);
let usage = self.usage.get(&tenant_id);
let usage = match usage {
Some(u) => u,
None => return QuotaCheck::Allowed, };
if usage.memory_bytes > quota.max_memory_bytes {
return QuotaCheck::MemoryExceeded {
used: usage.memory_bytes,
limit: quota.max_memory_bytes,
};
}
if usage.storage_bytes > quota.max_storage_bytes {
return QuotaCheck::StorageExceeded {
used: usage.storage_bytes,
limit: quota.max_storage_bytes,
};
}
if usage.active_requests >= quota.max_concurrent_requests {
return QuotaCheck::ConcurrencyExceeded {
active: usage.active_requests,
limit: quota.max_concurrent_requests,
};
}
if usage.requests_this_second >= quota.max_qps {
return QuotaCheck::RateLimited {
qps: usage.requests_this_second,
limit: quota.max_qps,
};
}
QuotaCheck::Allowed
}
pub fn request_start(&mut self, tenant_id: TenantId) {
let usage = self.usage.entry(tenant_id).or_default();
usage.active_requests += 1;
usage.requests_this_second += 1;
usage.total_requests += 1;
}
pub fn request_end(&mut self, tenant_id: TenantId) {
if let Some(usage) = self.usage.get_mut(&tenant_id) {
usage.active_requests = usage.active_requests.saturating_sub(1);
}
}
pub fn request_rejected(&mut self, tenant_id: TenantId) {
let usage = self.usage.entry(tenant_id).or_default();
usage.rejected_requests += 1;
}
pub fn update_memory(&mut self, tenant_id: TenantId, bytes: u64) {
let usage = self.usage.entry(tenant_id).or_default();
usage.memory_bytes = bytes;
}
pub fn update_storage(&mut self, tenant_id: TenantId, bytes: u64) {
let usage = self.usage.entry(tenant_id).or_default();
usage.storage_bytes = bytes;
}
pub fn reset_rate_counters(&mut self) {
for usage in self.usage.values_mut() {
usage.requests_this_second = 0;
}
}
pub fn check_connection(&self, tenant_id: TenantId) -> QuotaCheck {
let quota = self.quota(tenant_id);
if quota.max_connections == 0 {
return QuotaCheck::Allowed; }
let usage = match self.usage.get(&tenant_id) {
Some(u) => u,
None => return QuotaCheck::Allowed,
};
if usage.active_connections >= quota.max_connections {
QuotaCheck::ConcurrencyExceeded {
active: usage.active_connections,
limit: quota.max_connections,
}
} else {
QuotaCheck::Allowed
}
}
pub fn connection_start(&mut self, tenant_id: TenantId) {
let usage = self.usage.entry(tenant_id).or_default();
usage.active_connections += 1;
}
pub fn connection_end(&mut self, tenant_id: TenantId) {
if let Some(usage) = self.usage.get_mut(&tenant_id) {
usage.active_connections = usage.active_connections.saturating_sub(1);
}
}
pub fn usage(&self, tenant_id: TenantId) -> Option<&TenantUsage> {
self.usage.get(&tenant_id)
}
pub fn tenant_count(&self) -> usize {
self.usage.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn t(id: u32) -> TenantId {
TenantId::new(id)
}
#[test]
fn default_quota_applied() {
let isolation = TenantIsolation::new(TenantQuota::default());
let quota = isolation.quota(t(1));
assert_eq!(quota.max_concurrent_requests, 100);
}
#[test]
fn custom_quota_overrides_default() {
let mut isolation = TenantIsolation::new(TenantQuota::default());
isolation.set_quota(
t(1),
TenantQuota {
max_concurrent_requests: 50,
..Default::default()
},
);
assert_eq!(isolation.quota(t(1)).max_concurrent_requests, 50);
assert_eq!(isolation.quota(t(2)).max_concurrent_requests, 100); }
#[test]
fn quota_check_allowed() {
let isolation = TenantIsolation::new(TenantQuota::default());
assert!(isolation.check(t(1)).is_allowed());
}
#[test]
fn quota_check_concurrency_exceeded() {
let mut isolation = TenantIsolation::new(TenantQuota {
max_concurrent_requests: 2,
..Default::default()
});
isolation.request_start(t(1));
isolation.request_start(t(1));
assert_eq!(
isolation.check(t(1)),
QuotaCheck::ConcurrencyExceeded {
active: 2,
limit: 2,
}
);
isolation.request_end(t(1));
assert!(isolation.check(t(1)).is_allowed());
}
#[test]
fn quota_check_rate_limited() {
let mut isolation = TenantIsolation::new(TenantQuota {
max_qps: 3,
..Default::default()
});
for _ in 0..3 {
isolation.request_start(t(1));
isolation.request_end(t(1));
}
assert_eq!(
isolation.check(t(1)),
QuotaCheck::RateLimited { qps: 3, limit: 3 }
);
isolation.reset_rate_counters();
assert!(isolation.check(t(1)).is_allowed());
}
#[test]
fn quota_check_memory_exceeded() {
let mut isolation = TenantIsolation::new(TenantQuota {
max_memory_bytes: 1000,
..Default::default()
});
isolation.update_memory(t(1), 1001);
assert!(matches!(
isolation.check(t(1)),
QuotaCheck::MemoryExceeded { .. }
));
}
#[test]
fn request_rejected_tracking() {
let mut isolation = TenantIsolation::new(TenantQuota::default());
isolation.request_rejected(t(1));
isolation.request_rejected(t(1));
assert_eq!(isolation.usage(t(1)).unwrap().rejected_requests, 2);
}
#[test]
fn multi_tenant_isolation() {
let mut isolation = TenantIsolation::new(TenantQuota {
max_concurrent_requests: 1,
..Default::default()
});
isolation.request_start(t(1));
assert!(!isolation.check(t(1)).is_allowed());
assert!(isolation.check(t(2)).is_allowed());
}
}