oxirs_vec/multi_tenancy/
quota.rs

1//! Resource quota and rate limiting for multi-tenancy
2
3use crate::multi_tenancy::types::{MultiTenancyError, MultiTenancyResult};
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::sync::{Arc, Mutex};
8
9/// Resource types that can be limited
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
11pub enum ResourceType {
12    /// Total number of vectors stored
13    VectorCount,
14
15    /// Storage space in bytes
16    StorageBytes,
17
18    /// Memory usage in bytes
19    MemoryBytes,
20
21    /// API calls per time period
22    ApiCalls,
23
24    /// Queries per second
25    QueriesPerSecond,
26
27    /// Index builds per day
28    IndexBuilds,
29
30    /// Embedding generations per day
31    EmbeddingGenerations,
32
33    /// Concurrent requests
34    ConcurrentRequests,
35
36    /// Batch size
37    BatchSize,
38
39    /// Custom resource type
40    Custom(u32),
41}
42
43impl ResourceType {
44    /// Get resource name
45    pub fn name(&self) -> String {
46        match self {
47            Self::VectorCount => "vector_count".to_string(),
48            Self::StorageBytes => "storage_bytes".to_string(),
49            Self::MemoryBytes => "memory_bytes".to_string(),
50            Self::ApiCalls => "api_calls".to_string(),
51            Self::QueriesPerSecond => "queries_per_second".to_string(),
52            Self::IndexBuilds => "index_builds".to_string(),
53            Self::EmbeddingGenerations => "embedding_generations".to_string(),
54            Self::ConcurrentRequests => "concurrent_requests".to_string(),
55            Self::BatchSize => "batch_size".to_string(),
56            Self::Custom(id) => format!("custom_{}", id),
57        }
58    }
59}
60
61/// Resource quota definition
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct ResourceQuota {
64    /// Resource type
65    pub resource_type: ResourceType,
66
67    /// Maximum allowed value (None = unlimited)
68    pub limit: Option<u64>,
69
70    /// Soft limit for warnings
71    pub soft_limit: Option<u64>,
72
73    /// Time window for rate-based limits (seconds)
74    pub time_window_secs: Option<u64>,
75}
76
77impl ResourceQuota {
78    /// Create a new quota with hard limit
79    pub fn new(resource_type: ResourceType, limit: u64) -> Self {
80        Self {
81            resource_type,
82            limit: Some(limit),
83            soft_limit: None,
84            time_window_secs: None,
85        }
86    }
87
88    /// Create unlimited quota
89    pub fn unlimited(resource_type: ResourceType) -> Self {
90        Self {
91            resource_type,
92            limit: None,
93            soft_limit: None,
94            time_window_secs: None,
95        }
96    }
97
98    /// Set soft limit
99    pub fn with_soft_limit(mut self, soft_limit: u64) -> Self {
100        self.soft_limit = Some(soft_limit);
101        self
102    }
103
104    /// Set time window for rate limits
105    pub fn with_time_window(mut self, window_secs: u64) -> Self {
106        self.time_window_secs = Some(window_secs);
107        self
108    }
109
110    /// Check if value exceeds hard limit
111    pub fn exceeds_hard_limit(&self, value: u64) -> bool {
112        if let Some(limit) = self.limit {
113            value > limit
114        } else {
115            false
116        }
117    }
118
119    /// Check if value exceeds soft limit
120    pub fn exceeds_soft_limit(&self, value: u64) -> bool {
121        if let Some(soft_limit) = self.soft_limit {
122            value > soft_limit
123        } else {
124            false
125        }
126    }
127}
128
129/// Collection of quota limits for a tenant
130#[derive(Debug, Clone, Serialize, Deserialize)]
131pub struct QuotaLimits {
132    /// Tenant identifier
133    pub tenant_id: String,
134
135    /// Resource quotas
136    pub quotas: HashMap<ResourceType, ResourceQuota>,
137
138    /// Whether to enforce quotas strictly
139    pub strict_enforcement: bool,
140}
141
142impl QuotaLimits {
143    /// Create new quota limits for a tenant
144    pub fn new(tenant_id: impl Into<String>) -> Self {
145        Self {
146            tenant_id: tenant_id.into(),
147            quotas: HashMap::new(),
148            strict_enforcement: true,
149        }
150    }
151
152    /// Create default quotas for free tier
153    pub fn free_tier(tenant_id: impl Into<String>) -> Self {
154        let mut limits = Self::new(tenant_id);
155        limits.set_quota(ResourceQuota::new(ResourceType::VectorCount, 10_000));
156        limits.set_quota(ResourceQuota::new(ResourceType::StorageBytes, 100_000_000)); // 100MB
157        limits.set_quota(ResourceQuota::new(ResourceType::ApiCalls, 1000).with_time_window(3600));
158        limits.set_quota(ResourceQuota::new(ResourceType::QueriesPerSecond, 10));
159        limits
160    }
161
162    /// Create default quotas for pro tier
163    pub fn pro_tier(tenant_id: impl Into<String>) -> Self {
164        let mut limits = Self::new(tenant_id);
165        limits.set_quota(ResourceQuota::new(ResourceType::VectorCount, 1_000_000));
166        limits.set_quota(ResourceQuota::new(
167            ResourceType::StorageBytes,
168            10_000_000_000,
169        )); // 10GB
170        limits
171            .set_quota(ResourceQuota::new(ResourceType::ApiCalls, 100_000).with_time_window(3600));
172        limits.set_quota(ResourceQuota::new(ResourceType::QueriesPerSecond, 100));
173        limits
174    }
175
176    /// Create unlimited quotas for enterprise tier
177    pub fn enterprise_tier(tenant_id: impl Into<String>) -> Self {
178        let mut limits = Self::new(tenant_id);
179        limits.set_quota(ResourceQuota::unlimited(ResourceType::VectorCount));
180        limits.set_quota(ResourceQuota::unlimited(ResourceType::StorageBytes));
181        limits.set_quota(ResourceQuota::unlimited(ResourceType::ApiCalls));
182        limits.set_quota(ResourceQuota::unlimited(ResourceType::QueriesPerSecond));
183        limits
184    }
185
186    /// Set a quota
187    pub fn set_quota(&mut self, quota: ResourceQuota) {
188        self.quotas.insert(quota.resource_type, quota);
189    }
190
191    /// Get a quota
192    pub fn get_quota(&self, resource_type: ResourceType) -> Option<&ResourceQuota> {
193        self.quotas.get(&resource_type)
194    }
195
196    /// Check if resource usage is within limits
197    pub fn check_limit(&self, resource_type: ResourceType, value: u64) -> bool {
198        if let Some(quota) = self.get_quota(resource_type) {
199            !quota.exceeds_hard_limit(value)
200        } else {
201            true // No quota defined = unlimited
202        }
203    }
204}
205
206/// Current resource usage for a tenant
207#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct QuotaUsage {
209    /// Tenant identifier
210    pub tenant_id: String,
211
212    /// Current usage by resource type
213    pub usage: HashMap<ResourceType, u64>,
214
215    /// Last updated timestamp
216    pub updated_at: DateTime<Utc>,
217}
218
219impl QuotaUsage {
220    /// Create new usage tracking
221    pub fn new(tenant_id: impl Into<String>) -> Self {
222        Self {
223            tenant_id: tenant_id.into(),
224            usage: HashMap::new(),
225            updated_at: Utc::now(),
226        }
227    }
228
229    /// Get current usage for a resource
230    pub fn get(&self, resource_type: ResourceType) -> u64 {
231        *self.usage.get(&resource_type).unwrap_or(&0)
232    }
233
234    /// Set usage for a resource
235    pub fn set(&mut self, resource_type: ResourceType, value: u64) {
236        self.usage.insert(resource_type, value);
237        self.updated_at = Utc::now();
238    }
239
240    /// Increment usage
241    pub fn increment(&mut self, resource_type: ResourceType, amount: u64) {
242        let current = self.get(resource_type);
243        self.set(resource_type, current + amount);
244    }
245
246    /// Decrement usage
247    pub fn decrement(&mut self, resource_type: ResourceType, amount: u64) {
248        let current = self.get(resource_type);
249        if current >= amount {
250            self.set(resource_type, current - amount);
251        } else {
252            self.set(resource_type, 0);
253        }
254    }
255
256    /// Reset usage for a resource
257    pub fn reset(&mut self, resource_type: ResourceType) {
258        self.set(resource_type, 0);
259    }
260
261    /// Reset all usage
262    pub fn reset_all(&mut self) {
263        self.usage.clear();
264        self.updated_at = Utc::now();
265    }
266}
267
268/// Quota enforcement engine
269pub struct QuotaEnforcer {
270    /// Quota limits by tenant
271    limits: Arc<Mutex<HashMap<String, QuotaLimits>>>,
272
273    /// Current usage by tenant
274    usage: Arc<Mutex<HashMap<String, QuotaUsage>>>,
275}
276
277impl QuotaEnforcer {
278    /// Create new quota enforcer
279    pub fn new() -> Self {
280        Self {
281            limits: Arc::new(Mutex::new(HashMap::new())),
282            usage: Arc::new(Mutex::new(HashMap::new())),
283        }
284    }
285
286    /// Set quota limits for a tenant
287    pub fn set_limits(&self, limits: QuotaLimits) -> MultiTenancyResult<()> {
288        let tenant_id = limits.tenant_id.clone();
289        self.limits
290            .lock()
291            .map_err(|e| MultiTenancyError::InternalError {
292                message: format!("Lock error: {}", e),
293            })?
294            .insert(tenant_id.clone(), limits);
295
296        // Initialize usage if not exists
297        let mut usage_map = self
298            .usage
299            .lock()
300            .map_err(|e| MultiTenancyError::InternalError {
301                message: format!("Lock error: {}", e),
302            })?;
303        usage_map
304            .entry(tenant_id.clone())
305            .or_insert_with(|| QuotaUsage::new(tenant_id));
306
307        Ok(())
308    }
309
310    /// Check if tenant can consume resource
311    pub fn check_quota(
312        &self,
313        tenant_id: &str,
314        resource_type: ResourceType,
315        amount: u64,
316    ) -> MultiTenancyResult<bool> {
317        let limits = self
318            .limits
319            .lock()
320            .map_err(|e| MultiTenancyError::InternalError {
321                message: format!("Lock error: {}", e),
322            })?;
323
324        let usage = self
325            .usage
326            .lock()
327            .map_err(|e| MultiTenancyError::InternalError {
328                message: format!("Lock error: {}", e),
329            })?;
330
331        if let Some(tenant_limits) = limits.get(tenant_id) {
332            if let Some(tenant_usage) = usage.get(tenant_id) {
333                let current = tenant_usage.get(resource_type);
334                let new_usage = current + amount;
335
336                return Ok(tenant_limits.check_limit(resource_type, new_usage));
337            }
338        }
339
340        // No limits defined = allow
341        Ok(true)
342    }
343
344    /// Consume resource quota
345    pub fn consume(
346        &self,
347        tenant_id: &str,
348        resource_type: ResourceType,
349        amount: u64,
350    ) -> MultiTenancyResult<()> {
351        // Check quota first
352        if !self.check_quota(tenant_id, resource_type, amount)? {
353            return Err(MultiTenancyError::QuotaExceeded {
354                tenant_id: tenant_id.to_string(),
355                resource: resource_type.name(),
356            });
357        }
358
359        // Increment usage
360        let mut usage = self
361            .usage
362            .lock()
363            .map_err(|e| MultiTenancyError::InternalError {
364                message: format!("Lock error: {}", e),
365            })?;
366
367        usage
368            .entry(tenant_id.to_string())
369            .or_insert_with(|| QuotaUsage::new(tenant_id))
370            .increment(resource_type, amount);
371
372        Ok(())
373    }
374
375    /// Release resource quota
376    pub fn release(
377        &self,
378        tenant_id: &str,
379        resource_type: ResourceType,
380        amount: u64,
381    ) -> MultiTenancyResult<()> {
382        let mut usage = self
383            .usage
384            .lock()
385            .map_err(|e| MultiTenancyError::InternalError {
386                message: format!("Lock error: {}", e),
387            })?;
388
389        usage
390            .entry(tenant_id.to_string())
391            .or_insert_with(|| QuotaUsage::new(tenant_id))
392            .decrement(resource_type, amount);
393
394        Ok(())
395    }
396
397    /// Get current usage for tenant
398    pub fn get_usage(&self, tenant_id: &str) -> MultiTenancyResult<QuotaUsage> {
399        let usage = self
400            .usage
401            .lock()
402            .map_err(|e| MultiTenancyError::InternalError {
403                message: format!("Lock error: {}", e),
404            })?;
405
406        Ok(usage
407            .get(tenant_id)
408            .cloned()
409            .unwrap_or_else(|| QuotaUsage::new(tenant_id)))
410    }
411
412    /// Reset usage for a tenant
413    pub fn reset_usage(&self, tenant_id: &str) -> MultiTenancyResult<()> {
414        let mut usage = self
415            .usage
416            .lock()
417            .map_err(|e| MultiTenancyError::InternalError {
418                message: format!("Lock error: {}", e),
419            })?;
420
421        if let Some(tenant_usage) = usage.get_mut(tenant_id) {
422            tenant_usage.reset_all();
423        }
424
425        Ok(())
426    }
427}
428
429impl Default for QuotaEnforcer {
430    fn default() -> Self {
431        Self::new()
432    }
433}
434
435/// Rate limiter for API calls
436pub struct RateLimiter {
437    /// Token buckets by tenant
438    buckets: Arc<Mutex<HashMap<String, TokenBucket>>>,
439}
440
441/// Token bucket for rate limiting
442#[derive(Debug, Clone)]
443struct TokenBucket {
444    /// Current number of tokens
445    tokens: f64,
446
447    /// Maximum capacity
448    capacity: f64,
449
450    /// Refill rate (tokens per second)
451    refill_rate: f64,
452
453    /// Last refill time
454    last_refill: DateTime<Utc>,
455}
456
457impl TokenBucket {
458    /// Create new token bucket
459    fn new(capacity: f64, refill_rate: f64) -> Self {
460        Self {
461            tokens: capacity,
462            capacity,
463            refill_rate,
464            last_refill: Utc::now(),
465        }
466    }
467
468    /// Refill tokens based on elapsed time
469    fn refill(&mut self) {
470        let now = Utc::now();
471        let elapsed = (now - self.last_refill).num_milliseconds() as f64 / 1000.0;
472        let new_tokens = elapsed * self.refill_rate;
473
474        self.tokens = (self.tokens + new_tokens).min(self.capacity);
475        self.last_refill = now;
476    }
477
478    /// Try to consume tokens
479    fn try_consume(&mut self, amount: f64) -> bool {
480        self.refill();
481
482        if self.tokens >= amount {
483            self.tokens -= amount;
484            true
485        } else {
486            false
487        }
488    }
489}
490
491impl RateLimiter {
492    /// Create new rate limiter
493    pub fn new() -> Self {
494        Self {
495            buckets: Arc::new(Mutex::new(HashMap::new())),
496        }
497    }
498
499    /// Set rate limit for a tenant (requests per second)
500    pub fn set_rate(
501        &self,
502        tenant_id: impl Into<String>,
503        requests_per_second: f64,
504    ) -> MultiTenancyResult<()> {
505        let bucket = TokenBucket::new(requests_per_second * 2.0, requests_per_second);
506
507        self.buckets
508            .lock()
509            .map_err(|e| MultiTenancyError::InternalError {
510                message: format!("Lock error: {}", e),
511            })?
512            .insert(tenant_id.into(), bucket);
513
514        Ok(())
515    }
516
517    /// Check if request is allowed
518    pub fn allow_request(&self, tenant_id: &str) -> MultiTenancyResult<bool> {
519        let mut buckets = self
520            .buckets
521            .lock()
522            .map_err(|e| MultiTenancyError::InternalError {
523                message: format!("Lock error: {}", e),
524            })?;
525
526        if let Some(bucket) = buckets.get_mut(tenant_id) {
527            Ok(bucket.try_consume(1.0))
528        } else {
529            // No rate limit configured = allow
530            Ok(true)
531        }
532    }
533
534    /// Check if batch request is allowed
535    pub fn allow_batch_request(
536        &self,
537        tenant_id: &str,
538        batch_size: usize,
539    ) -> MultiTenancyResult<bool> {
540        let mut buckets = self
541            .buckets
542            .lock()
543            .map_err(|e| MultiTenancyError::InternalError {
544                message: format!("Lock error: {}", e),
545            })?;
546
547        if let Some(bucket) = buckets.get_mut(tenant_id) {
548            Ok(bucket.try_consume(batch_size as f64))
549        } else {
550            // No rate limit configured = allow
551            Ok(true)
552        }
553    }
554}
555
556impl Default for RateLimiter {
557    fn default() -> Self {
558        Self::new()
559    }
560}
561
562#[cfg(test)]
563mod tests {
564    use super::*;
565    use std::thread;
566    use std::time::Duration as StdDuration;
567
568    #[test]
569    fn test_resource_quota() {
570        let quota = ResourceQuota::new(ResourceType::VectorCount, 1000);
571        assert_eq!(quota.limit, Some(1000));
572        assert!(!quota.exceeds_hard_limit(500));
573        assert!(quota.exceeds_hard_limit(1001));
574
575        let quota = quota.with_soft_limit(800);
576        assert!(!quota.exceeds_soft_limit(700));
577        assert!(quota.exceeds_soft_limit(900));
578    }
579
580    #[test]
581    fn test_quota_limits() {
582        let limits = QuotaLimits::free_tier("tenant1");
583        assert!(limits.get_quota(ResourceType::VectorCount).is_some());
584        assert!(limits.check_limit(ResourceType::VectorCount, 5000));
585        assert!(!limits.check_limit(ResourceType::VectorCount, 20000));
586    }
587
588    #[test]
589    fn test_quota_usage() {
590        let mut usage = QuotaUsage::new("tenant1");
591        assert_eq!(usage.get(ResourceType::VectorCount), 0);
592
593        usage.increment(ResourceType::VectorCount, 100);
594        assert_eq!(usage.get(ResourceType::VectorCount), 100);
595
596        usage.increment(ResourceType::VectorCount, 50);
597        assert_eq!(usage.get(ResourceType::VectorCount), 150);
598
599        usage.decrement(ResourceType::VectorCount, 30);
600        assert_eq!(usage.get(ResourceType::VectorCount), 120);
601
602        usage.reset(ResourceType::VectorCount);
603        assert_eq!(usage.get(ResourceType::VectorCount), 0);
604    }
605
606    #[test]
607    fn test_quota_enforcer() {
608        let enforcer = QuotaEnforcer::new();
609        let limits = QuotaLimits::free_tier("tenant1");
610        enforcer.set_limits(limits).unwrap();
611
612        // Should allow within limits
613        assert!(enforcer
614            .check_quota("tenant1", ResourceType::VectorCount, 5000)
615            .unwrap());
616
617        // Consume some quota
618        enforcer
619            .consume("tenant1", ResourceType::VectorCount, 5000)
620            .unwrap();
621
622        // Should still allow more
623        assert!(enforcer
624            .check_quota("tenant1", ResourceType::VectorCount, 3000)
625            .unwrap());
626
627        // Should reject exceeding quota
628        assert!(!enforcer
629            .check_quota("tenant1", ResourceType::VectorCount, 10000)
630            .unwrap());
631
632        // Consuming beyond quota should fail
633        assert!(enforcer
634            .consume("tenant1", ResourceType::VectorCount, 10000)
635            .is_err());
636    }
637
638    #[test]
639    fn test_rate_limiter() {
640        let limiter = RateLimiter::new();
641        limiter.set_rate("tenant1", 10.0).unwrap(); // 10 requests per second
642
643        // Should allow initial requests
644        assert!(limiter.allow_request("tenant1").unwrap());
645        assert!(limiter.allow_request("tenant1").unwrap());
646
647        // Allow batch request
648        assert!(limiter.allow_batch_request("tenant1", 5).unwrap());
649
650        // After consuming many tokens, should be denied
651        for _ in 0..20 {
652            let _ = limiter.allow_request("tenant1");
653        }
654        assert!(!limiter.allow_request("tenant1").unwrap());
655
656        // After waiting, tokens should refill
657        thread::sleep(StdDuration::from_millis(200));
658        assert!(limiter.allow_request("tenant1").unwrap());
659    }
660
661    #[test]
662    fn test_tier_quotas() {
663        let free = QuotaLimits::free_tier("tenant1");
664        let pro = QuotaLimits::pro_tier("tenant2");
665        let enterprise = QuotaLimits::enterprise_tier("tenant3");
666
667        // Free tier should have restrictive limits
668        assert!(free.check_limit(ResourceType::VectorCount, 5000));
669        assert!(!free.check_limit(ResourceType::VectorCount, 20000));
670
671        // Pro tier should have higher limits
672        assert!(pro.check_limit(ResourceType::VectorCount, 500000));
673        assert!(!pro.check_limit(ResourceType::VectorCount, 2000000));
674
675        // Enterprise should be unlimited
676        assert!(enterprise.check_limit(ResourceType::VectorCount, 10000000));
677        assert!(enterprise.check_limit(ResourceType::VectorCount, 100000000));
678    }
679}