use std::collections::HashMap;
use crate::types::TenantId;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct TenantQuota {
pub max_memory_bytes: u64,
pub max_storage_bytes: u64,
pub max_concurrent_requests: u32,
pub max_qps: u32,
pub max_vector_dim: u32,
pub max_graph_depth: u32,
#[serde(default)]
pub max_connections: u32,
#[serde(default)]
pub deactivated_collection_retention_days: Option<u32>,
}
impl Default for TenantQuota {
fn default() -> Self {
Self {
max_memory_bytes: 1024 * 1024 * 1024, max_storage_bytes: 10 * 1024 * 1024 * 1024, max_concurrent_requests: 100,
max_qps: 1000,
max_vector_dim: 4096,
max_graph_depth: 10,
max_connections: 0, deactivated_collection_retention_days: None,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct TenantUsage {
pub memory_bytes: u64,
pub storage_bytes: u64,
pub active_requests: u32,
pub requests_this_second: u32,
pub total_requests: u64,
pub rejected_requests: u64,
pub active_connections: u32,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum QuotaCheck {
Allowed,
MemoryExceeded { used: u64, limit: u64 },
StorageExceeded { used: u64, limit: u64 },
ConcurrencyExceeded { active: u32, limit: u32 },
RateLimited { qps: u32, limit: u32 },
}
impl QuotaCheck {
pub fn is_allowed(&self) -> bool {
matches!(self, QuotaCheck::Allowed)
}
}
#[derive(Debug)]
pub struct TenantIsolation {
quotas: HashMap<TenantId, TenantQuota>,
usage: HashMap<TenantId, TenantUsage>,
default_quota: TenantQuota,
}
impl TenantIsolation {
pub fn new(default_quota: TenantQuota) -> Self {
Self {
quotas: HashMap::new(),
usage: HashMap::new(),
default_quota,
}
}
pub fn set_quota(&mut self, tenant_id: TenantId, quota: TenantQuota) {
self.quotas.insert(tenant_id, quota);
}
pub fn has_quota(&self, tenant_id: TenantId) -> bool {
self.quotas.contains_key(&tenant_id)
}
pub fn remove_quota(&mut self, tenant_id: TenantId) {
self.quotas.remove(&tenant_id);
self.usage.remove(&tenant_id);
}
pub fn quota(&self, tenant_id: TenantId) -> &TenantQuota {
self.quotas.get(&tenant_id).unwrap_or(&self.default_quota)
}
pub fn check(&self, tenant_id: TenantId) -> QuotaCheck {
let quota = self.quota(tenant_id);
let usage = self.usage.get(&tenant_id);
let usage = match usage {
Some(u) => u,
None => return QuotaCheck::Allowed, };
if usage.memory_bytes > quota.max_memory_bytes {
return QuotaCheck::MemoryExceeded {
used: usage.memory_bytes,
limit: quota.max_memory_bytes,
};
}
if usage.storage_bytes > quota.max_storage_bytes {
return QuotaCheck::StorageExceeded {
used: usage.storage_bytes,
limit: quota.max_storage_bytes,
};
}
if usage.active_requests >= quota.max_concurrent_requests {
return QuotaCheck::ConcurrencyExceeded {
active: usage.active_requests,
limit: quota.max_concurrent_requests,
};
}
if usage.requests_this_second >= quota.max_qps {
return QuotaCheck::RateLimited {
qps: usage.requests_this_second,
limit: quota.max_qps,
};
}
QuotaCheck::Allowed
}
pub fn request_start(&mut self, tenant_id: TenantId) {
let usage = self.usage.entry(tenant_id).or_default();
usage.active_requests += 1;
usage.requests_this_second += 1;
usage.total_requests += 1;
}
pub fn request_end(&mut self, tenant_id: TenantId) {
if let Some(usage) = self.usage.get_mut(&tenant_id) {
usage.active_requests = usage.active_requests.saturating_sub(1);
}
}
pub fn request_rejected(&mut self, tenant_id: TenantId) {
let usage = self.usage.entry(tenant_id).or_default();
usage.rejected_requests += 1;
}
pub fn update_memory(&mut self, tenant_id: TenantId, bytes: u64) {
let usage = self.usage.entry(tenant_id).or_default();
usage.memory_bytes = bytes;
}
pub fn update_storage(&mut self, tenant_id: TenantId, bytes: u64) {
let usage = self.usage.entry(tenant_id).or_default();
usage.storage_bytes = bytes;
}
pub fn reset_rate_counters(&mut self) {
for usage in self.usage.values_mut() {
usage.requests_this_second = 0;
}
}
pub fn check_connection(&self, tenant_id: TenantId) -> QuotaCheck {
let quota = self.quota(tenant_id);
if quota.max_connections == 0 {
return QuotaCheck::Allowed; }
let usage = match self.usage.get(&tenant_id) {
Some(u) => u,
None => return QuotaCheck::Allowed,
};
if usage.active_connections >= quota.max_connections {
QuotaCheck::ConcurrencyExceeded {
active: usage.active_connections,
limit: quota.max_connections,
}
} else {
QuotaCheck::Allowed
}
}
pub fn connection_start(&mut self, tenant_id: TenantId) {
let usage = self.usage.entry(tenant_id).or_default();
usage.active_connections += 1;
}
pub fn connection_end(&mut self, tenant_id: TenantId) {
if let Some(usage) = self.usage.get_mut(&tenant_id) {
usage.active_connections = usage.active_connections.saturating_sub(1);
}
}
pub fn usage(&self, tenant_id: TenantId) -> Option<&TenantUsage> {
self.usage.get(&tenant_id)
}
pub fn tenant_count(&self) -> usize {
self.usage.len()
}
pub fn snapshot_metrics(
&self,
tenant_id: TenantId,
) -> crate::control::metrics::tenant::TenantQuotaMetrics {
let quota = self.quota(tenant_id);
let usage = self.usage.get(&tenant_id);
let (mem_used, stor_used, qps, conns) = match usage {
Some(u) => (
u.memory_bytes,
u.storage_bytes,
u.requests_this_second as u64,
u.active_connections as u64,
),
None => (0, 0, 0, 0),
};
crate::control::metrics::tenant::TenantQuotaMetrics {
tenant_id: tenant_id.as_u64(),
memory_bytes_used: mem_used,
memory_bytes_limit: quota.max_memory_bytes,
storage_bytes_used: stor_used,
storage_bytes_limit: quota.max_storage_bytes,
qps_current: qps,
qps_limit: quota.max_qps as u64,
connections_active: conns,
connections_limit: quota.max_connections as u64,
}
}
pub fn iter_usage(&self) -> impl Iterator<Item = (TenantId, &TenantUsage, &TenantQuota)> {
self.usage.iter().map(move |(&tid, usage)| {
let quota = self.quotas.get(&tid).unwrap_or(&self.default_quota);
(tid, usage, quota)
})
}
}
#[cfg(test)]
mod tests {
use super::*;
fn t(id: u64) -> TenantId {
TenantId::new(id)
}
#[test]
fn default_quota_applied() {
let isolation = TenantIsolation::new(TenantQuota::default());
let quota = isolation.quota(t(1));
assert_eq!(quota.max_concurrent_requests, 100);
}
#[test]
fn custom_quota_overrides_default() {
let mut isolation = TenantIsolation::new(TenantQuota::default());
isolation.set_quota(
t(1),
TenantQuota {
max_concurrent_requests: 50,
..Default::default()
},
);
assert_eq!(isolation.quota(t(1)).max_concurrent_requests, 50);
assert_eq!(isolation.quota(t(2)).max_concurrent_requests, 100); }
#[test]
fn quota_check_allowed() {
let isolation = TenantIsolation::new(TenantQuota::default());
assert!(isolation.check(t(1)).is_allowed());
}
#[test]
fn quota_check_concurrency_exceeded() {
let mut isolation = TenantIsolation::new(TenantQuota {
max_concurrent_requests: 2,
..Default::default()
});
isolation.request_start(t(1));
isolation.request_start(t(1));
assert_eq!(
isolation.check(t(1)),
QuotaCheck::ConcurrencyExceeded {
active: 2,
limit: 2,
}
);
isolation.request_end(t(1));
assert!(isolation.check(t(1)).is_allowed());
}
#[test]
fn quota_check_rate_limited() {
let mut isolation = TenantIsolation::new(TenantQuota {
max_qps: 3,
..Default::default()
});
for _ in 0..3 {
isolation.request_start(t(1));
isolation.request_end(t(1));
}
assert_eq!(
isolation.check(t(1)),
QuotaCheck::RateLimited { qps: 3, limit: 3 }
);
isolation.reset_rate_counters();
assert!(isolation.check(t(1)).is_allowed());
}
#[test]
fn quota_check_memory_exceeded() {
let mut isolation = TenantIsolation::new(TenantQuota {
max_memory_bytes: 1000,
..Default::default()
});
isolation.update_memory(t(1), 1001);
assert!(matches!(
isolation.check(t(1)),
QuotaCheck::MemoryExceeded { .. }
));
}
#[test]
fn request_rejected_tracking() {
let mut isolation = TenantIsolation::new(TenantQuota::default());
isolation.request_rejected(t(1));
isolation.request_rejected(t(1));
assert_eq!(isolation.usage(t(1)).unwrap().rejected_requests, 2);
}
#[test]
fn multi_tenant_isolation() {
let mut isolation = TenantIsolation::new(TenantQuota {
max_concurrent_requests: 1,
..Default::default()
});
isolation.request_start(t(1));
assert!(!isolation.check(t(1)).is_allowed());
assert!(isolation.check(t(2)).is_allowed());
}
const MIB: u64 = 1024 * 1024;
#[test]
fn default_cap_clears_typical_fresh_boot_footprint() {
let mut iso = TenantIsolation::new(TenantQuota::default());
iso.request_start(t(1));
iso.request_end(t(1));
iso.update_memory(t(1), 113 * MIB);
let check = iso.check(t(1));
assert!(
check.is_allowed(),
"default tenant quota must clear a 113 MiB fresh-boot footprint; got {check:?}"
);
}
#[test]
fn default_cap_clears_quarter_gibibyte_footprint() {
let mut iso = TenantIsolation::new(TenantQuota::default());
iso.request_start(t(1));
iso.request_end(t(1));
iso.update_memory(t(1), 256 * MIB);
assert!(
iso.check(t(1)).is_allowed(),
"default tenant quota must clear a 256 MiB attribution; got {:?}",
iso.check(t(1))
);
}
#[test]
fn default_cap_remains_at_least_one_gib() {
let q = TenantQuota::default();
assert!(
q.max_memory_bytes >= 1024 * MIB,
"default tenant memory cap must remain >= 1 GiB; got {} bytes",
q.max_memory_bytes
);
}
}