use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use super::registry::{get_registry, TenantQuota};
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct TenantUsage {
pub vector_count: u64,
pub storage_bytes: u64,
pub queries_this_second: u32,
pub concurrent_queries: u32,
pub collection_count: u32,
pub last_updated: i64,
}
impl TenantUsage {
pub fn new() -> Self {
Self {
vector_count: 0,
storage_bytes: 0,
queries_this_second: 0,
concurrent_queries: 0,
collection_count: 0,
last_updated: chrono_now_millis(),
}
}
pub fn storage_gb(&self) -> f64 {
self.storage_bytes as f64 / (1024.0 * 1024.0 * 1024.0)
}
}
#[repr(C)]
pub struct AtomicTenantUsage {
pub vector_count: AtomicU64,
pub storage_bytes: AtomicU64,
pub rate_count: AtomicU32,
pub rate_window_start: AtomicU64,
pub concurrent_count: AtomicU32,
pub collection_count: AtomicU32,
}
impl AtomicTenantUsage {
pub fn new() -> Self {
Self {
vector_count: AtomicU64::new(0),
storage_bytes: AtomicU64::new(0),
rate_count: AtomicU32::new(0),
rate_window_start: AtomicU64::new(0),
concurrent_count: AtomicU32::new(0),
collection_count: AtomicU32::new(0),
}
}
pub fn snapshot(&self) -> TenantUsage {
TenantUsage {
vector_count: self.vector_count.load(Ordering::Relaxed),
storage_bytes: self.storage_bytes.load(Ordering::Relaxed),
queries_this_second: self.rate_count.load(Ordering::Relaxed),
concurrent_queries: self.concurrent_count.load(Ordering::Relaxed),
collection_count: self.collection_count.load(Ordering::Relaxed),
last_updated: chrono_now_millis(),
}
}
pub fn reset_from(&self, usage: &TenantUsage) {
self.vector_count
.store(usage.vector_count, Ordering::Relaxed);
self.storage_bytes
.store(usage.storage_bytes, Ordering::Relaxed);
self.collection_count
.store(usage.collection_count, Ordering::Relaxed);
}
}
impl Default for AtomicTenantUsage {
fn default() -> Self {
Self::new()
}
}
pub struct TokenBucket {
capacity: u32,
rate: u32,
tokens: AtomicU64,
last_refill: AtomicU64,
}
impl TokenBucket {
pub fn new(capacity: u32, rate: u32) -> Self {
Self {
capacity,
rate,
tokens: AtomicU64::new((capacity as u64) * 1000), last_refill: AtomicU64::new(chrono_now_millis() as u64),
}
}
pub fn try_acquire(&self, tokens: u32) -> bool {
self.refill();
let tokens_needed = (tokens as u64) * 1000;
let current = self.tokens.load(Ordering::Relaxed);
if current >= tokens_needed {
match self.tokens.compare_exchange(
current,
current - tokens_needed,
Ordering::AcqRel,
Ordering::Relaxed,
) {
Ok(_) => true,
Err(_) => {
let new_current = self.tokens.load(Ordering::Relaxed);
if new_current >= tokens_needed {
self.tokens.fetch_sub(tokens_needed, Ordering::Relaxed);
true
} else {
false
}
}
}
} else {
false
}
}
fn refill(&self) {
let now = chrono_now_millis() as u64;
let last = self.last_refill.load(Ordering::Relaxed);
let elapsed_ms = now.saturating_sub(last);
if elapsed_ms == 0 {
return;
}
let tokens_to_add = (self.rate as u64 * 1000 * elapsed_ms) / 1000;
if tokens_to_add > 0 {
let max_tokens = (self.capacity as u64) * 1000;
let current = self.tokens.load(Ordering::Relaxed);
let new_tokens = (current + tokens_to_add).min(max_tokens);
self.tokens.store(new_tokens, Ordering::Relaxed);
self.last_refill.store(now, Ordering::Relaxed);
}
}
pub fn time_to_available(&self, tokens: u32) -> u64 {
self.refill();
let tokens_needed = (tokens as u64) * 1000;
let current = self.tokens.load(Ordering::Relaxed);
if current >= tokens_needed {
return 0;
}
let tokens_short = tokens_needed - current;
let rate_per_ms = (self.rate as u64 * 1000) / 1000;
if rate_per_ms == 0 {
return u64::MAX;
}
(tokens_short + rate_per_ms - 1) / rate_per_ms
}
}
#[derive(Debug, Clone)]
pub enum QuotaResult {
Allowed,
RateLimited {
retry_after_ms: u64,
},
VectorQuotaExceeded { current: u64, limit: u64 },
StorageQuotaExceeded {
current_bytes: u64,
limit_bytes: u64,
},
ConcurrentLimitExceeded { current: u32, limit: u32 },
CollectionLimitExceeded { current: u32, limit: u32 },
}
impl QuotaResult {
pub fn is_allowed(&self) -> bool {
matches!(self, Self::Allowed)
}
pub fn error_message(&self) -> Option<String> {
match self {
Self::Allowed => None,
Self::RateLimited { retry_after_ms } => Some(format!(
"Rate limit exceeded. Retry after {}ms",
retry_after_ms
)),
Self::VectorQuotaExceeded { current, limit } => Some(format!(
"Vector quota exceeded: {} / {} vectors",
current, limit
)),
Self::StorageQuotaExceeded {
current_bytes,
limit_bytes,
} => {
let current_gb = *current_bytes as f64 / (1024.0 * 1024.0 * 1024.0);
let limit_gb = *limit_bytes as f64 / (1024.0 * 1024.0 * 1024.0);
Some(format!(
"Storage quota exceeded: {:.2} / {:.2} GB",
current_gb, limit_gb
))
}
Self::ConcurrentLimitExceeded { current, limit } => Some(format!(
"Concurrent query limit exceeded: {} / {}",
current, limit
)),
Self::CollectionLimitExceeded { current, limit } => Some(format!(
"Collection limit exceeded: {} / {}",
current, limit
)),
}
}
}
pub struct QuotaManager {
usage: DashMap<String, AtomicTenantUsage>,
rate_limiters: DashMap<String, TokenBucket>,
}
impl QuotaManager {
pub fn new() -> Self {
Self {
usage: DashMap::new(),
rate_limiters: DashMap::new(),
}
}
fn get_or_create_usage(&self, tenant_id: &str) -> &AtomicTenantUsage {
if !self.usage.contains_key(tenant_id) {
self.usage
.insert(tenant_id.to_string(), AtomicTenantUsage::new());
}
unsafe {
let ptr = self.usage.get(tenant_id).unwrap();
&*(ptr.value() as *const AtomicTenantUsage)
}
}
fn get_or_create_rate_limiter(&self, tenant_id: &str, quota: &TenantQuota) -> &TokenBucket {
if !self.rate_limiters.contains_key(tenant_id) {
let bucket = TokenBucket::new(quota.max_qps * 2, quota.max_qps);
self.rate_limiters.insert(tenant_id.to_string(), bucket);
}
unsafe {
let ptr = self.rate_limiters.get(tenant_id).unwrap();
&*(ptr.value() as *const TokenBucket)
}
}
pub fn check_vector_insert(
&self,
tenant_id: &str,
count: u64,
estimated_bytes: u64,
) -> QuotaResult {
let config = match get_registry().get(tenant_id) {
Some(c) => c,
None => return QuotaResult::Allowed, };
let usage = self.get_or_create_usage(tenant_id);
let current_vectors = usage.vector_count.load(Ordering::Relaxed);
if current_vectors + count > config.quota.max_vectors {
return QuotaResult::VectorQuotaExceeded {
current: current_vectors,
limit: config.quota.max_vectors,
};
}
let current_storage = usage.storage_bytes.load(Ordering::Relaxed);
if current_storage + estimated_bytes > config.quota.max_storage_bytes {
return QuotaResult::StorageQuotaExceeded {
current_bytes: current_storage,
limit_bytes: config.quota.max_storage_bytes,
};
}
QuotaResult::Allowed
}
pub fn check_query(&self, tenant_id: &str) -> QuotaResult {
let config = match get_registry().get(tenant_id) {
Some(c) => c,
None => return QuotaResult::Allowed,
};
let rate_limiter = self.get_or_create_rate_limiter(tenant_id, &config.quota);
if !rate_limiter.try_acquire(1) {
return QuotaResult::RateLimited {
retry_after_ms: rate_limiter.time_to_available(1),
};
}
let usage = self.get_or_create_usage(tenant_id);
let current_concurrent = usage.concurrent_count.load(Ordering::Relaxed);
if current_concurrent >= config.quota.max_concurrent {
return QuotaResult::ConcurrentLimitExceeded {
current: current_concurrent,
limit: config.quota.max_concurrent,
};
}
QuotaResult::Allowed
}
pub fn check_collection_create(&self, tenant_id: &str) -> QuotaResult {
let config = match get_registry().get(tenant_id) {
Some(c) => c,
None => return QuotaResult::Allowed,
};
let usage = self.get_or_create_usage(tenant_id);
let current = usage.collection_count.load(Ordering::Relaxed);
if current >= config.quota.max_collections {
return QuotaResult::CollectionLimitExceeded {
current,
limit: config.quota.max_collections,
};
}
QuotaResult::Allowed
}
pub fn record_vector_insert(&self, tenant_id: &str, count: u64, bytes: u64) {
let usage = self.get_or_create_usage(tenant_id);
usage.vector_count.fetch_add(count, Ordering::Relaxed);
usage.storage_bytes.fetch_add(bytes, Ordering::Relaxed);
}
pub fn record_vector_delete(&self, tenant_id: &str, count: u64, bytes: u64) {
let usage = self.get_or_create_usage(tenant_id);
usage.vector_count.fetch_sub(
count.min(usage.vector_count.load(Ordering::Relaxed)),
Ordering::Relaxed,
);
usage.storage_bytes.fetch_sub(
bytes.min(usage.storage_bytes.load(Ordering::Relaxed)),
Ordering::Relaxed,
);
}
pub fn record_collection_create(&self, tenant_id: &str) {
let usage = self.get_or_create_usage(tenant_id);
usage.collection_count.fetch_add(1, Ordering::Relaxed);
}
pub fn record_collection_delete(&self, tenant_id: &str) {
let usage = self.get_or_create_usage(tenant_id);
let current = usage.collection_count.load(Ordering::Relaxed);
if current > 0 {
usage.collection_count.fetch_sub(1, Ordering::Relaxed);
}
}
pub fn start_query(&self, tenant_id: &str) {
let usage = self.get_or_create_usage(tenant_id);
usage.concurrent_count.fetch_add(1, Ordering::Relaxed);
}
pub fn end_query(&self, tenant_id: &str) {
let usage = self.get_or_create_usage(tenant_id);
let current = usage.concurrent_count.load(Ordering::Relaxed);
if current > 0 {
usage.concurrent_count.fetch_sub(1, Ordering::Relaxed);
}
}
pub fn get_usage(&self, tenant_id: &str) -> Option<TenantUsage> {
self.usage.get(tenant_id).map(|u| u.snapshot())
}
pub fn get_quota_status(&self, tenant_id: &str) -> Option<QuotaStatus> {
let config = get_registry().get(tenant_id)?;
let usage = self.get_usage(tenant_id).unwrap_or_default();
Some(QuotaStatus {
tenant_id: tenant_id.to_string(),
vectors: ResourceUsage {
current: usage.vector_count,
limit: config.quota.max_vectors,
usage_percent: (usage.vector_count as f64 / config.quota.max_vectors as f64 * 100.0)
as f32,
},
storage: ResourceUsage {
current: usage.storage_bytes,
limit: config.quota.max_storage_bytes,
usage_percent: (usage.storage_bytes as f64 / config.quota.max_storage_bytes as f64
* 100.0) as f32,
},
qps: RateUsage {
current: usage.queries_this_second,
limit: config.quota.max_qps,
},
concurrent: ResourceUsage {
current: usage.concurrent_queries as u64,
limit: config.quota.max_concurrent as u64,
usage_percent: (usage.concurrent_queries as f64
/ config.quota.max_concurrent as f64
* 100.0) as f32,
},
collections: ResourceUsage {
current: usage.collection_count as u64,
limit: config.quota.max_collections as u64,
usage_percent: (usage.collection_count as f64 / config.quota.max_collections as f64
* 100.0) as f32,
},
})
}
pub fn reset_usage(&self, tenant_id: &str) {
if let Some(usage) = self.usage.get(tenant_id) {
usage.vector_count.store(0, Ordering::Relaxed);
usage.storage_bytes.store(0, Ordering::Relaxed);
usage.collection_count.store(0, Ordering::Relaxed);
usage.rate_count.store(0, Ordering::Relaxed);
usage.concurrent_count.store(0, Ordering::Relaxed);
}
}
pub fn initialize_usage(&self, tenant_id: &str, stored_usage: TenantUsage) {
let usage = self.get_or_create_usage(tenant_id);
usage.reset_from(&stored_usage);
}
}
impl Default for QuotaManager {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResourceUsage {
pub current: u64,
pub limit: u64,
pub usage_percent: f32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RateUsage {
pub current: u32,
pub limit: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QuotaStatus {
pub tenant_id: String,
pub vectors: ResourceUsage,
pub storage: ResourceUsage,
pub qps: RateUsage,
pub concurrent: ResourceUsage,
pub collections: ResourceUsage,
}
impl QuotaStatus {
pub fn is_near_limit(&self) -> bool {
self.vectors.usage_percent > 80.0
|| self.storage.usage_percent > 80.0
|| self.collections.usage_percent > 80.0
}
pub fn is_critical(&self) -> bool {
self.vectors.usage_percent > 95.0 || self.storage.usage_percent > 95.0
}
}
static QUOTA_MANAGER: once_cell::sync::Lazy<QuotaManager> =
once_cell::sync::Lazy::new(QuotaManager::new);
pub fn get_quota_manager() -> &'static QuotaManager {
"A_MANAGER
}
fn chrono_now_millis() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_token_bucket_acquire() {
let bucket = TokenBucket::new(10, 10);
for _ in 0..10 {
assert!(bucket.try_acquire(1));
}
assert!(!bucket.try_acquire(1));
}
#[test]
fn test_tenant_usage_tracking() {
let manager = QuotaManager::new();
manager.record_vector_insert("test-tenant", 100, 1024 * 100);
manager.record_collection_create("test-tenant");
let usage = manager.get_usage("test-tenant").unwrap();
assert_eq!(usage.vector_count, 100);
assert_eq!(usage.storage_bytes, 1024 * 100);
assert_eq!(usage.collection_count, 1);
manager.record_vector_delete("test-tenant", 50, 1024 * 50);
let usage = manager.get_usage("test-tenant").unwrap();
assert_eq!(usage.vector_count, 50);
}
#[test]
fn test_quota_result_messages() {
let result = QuotaResult::RateLimited {
retry_after_ms: 100,
};
assert!(!result.is_allowed());
assert!(result.error_message().unwrap().contains("100"));
let result = QuotaResult::VectorQuotaExceeded {
current: 1000,
limit: 1000,
};
assert!(!result.is_allowed());
assert!(result.error_message().unwrap().contains("1000"));
let result = QuotaResult::Allowed;
assert!(result.is_allowed());
assert!(result.error_message().is_none());
}
#[test]
fn test_concurrent_query_tracking() {
let manager = QuotaManager::new();
manager.start_query("test-tenant");
manager.start_query("test-tenant");
let usage = manager.get_usage("test-tenant").unwrap();
assert_eq!(usage.concurrent_queries, 2);
manager.end_query("test-tenant");
let usage = manager.get_usage("test-tenant").unwrap();
assert_eq!(usage.concurrent_queries, 1);
}
#[test]
fn test_usage_reset() {
let manager = QuotaManager::new();
manager.record_vector_insert("test-tenant", 100, 1024);
manager.record_collection_create("test-tenant");
manager.start_query("test-tenant");
manager.reset_usage("test-tenant");
let usage = manager.get_usage("test-tenant").unwrap();
assert_eq!(usage.vector_count, 0);
assert_eq!(usage.storage_bytes, 0);
assert_eq!(usage.collection_count, 0);
assert_eq!(usage.concurrent_queries, 0);
}
}