ipfrs_storage/
quota.rs

1//! Quota Management for per-tenant storage limits
2//!
3//! This module provides quota enforcement and tracking for multi-tenant storage:
4//! - Per-tenant storage quotas
5//! - Block count limits
6//! - Bandwidth quotas (reads/writes per period)
7//! - Quota enforcement with soft/hard limits
8//! - Usage tracking and reporting
9//! - Quota alerts and notifications
10
11use crate::traits::BlockStore;
12use async_trait::async_trait;
13use dashmap::DashMap;
14use ipfrs_core::{Block, Cid, Error, Result};
15use serde::{Deserialize, Serialize};
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::sync::Arc;
18use std::time::{Duration, SystemTime};
19use tracing::{debug, warn};
20
21/// Tenant identifier
22pub type TenantId = String;
23
24/// Quota configuration for a tenant
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct QuotaConfig {
27    /// Maximum storage in bytes (0 = unlimited)
28    pub max_bytes: u64,
29    /// Maximum number of blocks (0 = unlimited)
30    pub max_blocks: u64,
31    /// Maximum read bandwidth per period (bytes/sec, 0 = unlimited)
32    pub max_read_bandwidth: u64,
33    /// Maximum write bandwidth per period (bytes/sec, 0 = unlimited)
34    pub max_write_bandwidth: u64,
35    /// Soft limit threshold (percentage, e.g., 80 for 80%)
36    pub soft_limit_percent: u8,
37    /// Hard limit enforcement (reject on exceed)
38    pub hard_limit_enabled: bool,
39}
40
41impl Default for QuotaConfig {
42    fn default() -> Self {
43        Self {
44            max_bytes: 0,
45            max_blocks: 0,
46            max_read_bandwidth: 0,
47            max_write_bandwidth: 0,
48            soft_limit_percent: 80,
49            hard_limit_enabled: true,
50        }
51    }
52}
53
54/// Quota usage statistics
55#[derive(Debug)]
56pub struct QuotaUsage {
57    /// Current storage used in bytes
58    pub bytes_used: AtomicU64,
59    /// Current number of blocks
60    pub blocks_count: AtomicU64,
61    /// Total bytes read in current period
62    pub bytes_read: AtomicU64,
63    /// Total bytes written in current period
64    pub bytes_written: AtomicU64,
65    /// Number of quota violations
66    pub violations: AtomicU64,
67    /// Last reset time for bandwidth tracking
68    pub last_reset: parking_lot::Mutex<SystemTime>,
69}
70
71impl QuotaUsage {
72    fn new() -> Self {
73        Self {
74            bytes_used: AtomicU64::new(0),
75            blocks_count: AtomicU64::new(0),
76            bytes_read: AtomicU64::new(0),
77            bytes_written: AtomicU64::new(0),
78            violations: AtomicU64::new(0),
79            last_reset: parking_lot::Mutex::new(SystemTime::now()),
80        }
81    }
82
83    fn record_write(&self, bytes: u64) {
84        self.bytes_used.fetch_add(bytes, Ordering::Relaxed);
85        self.blocks_count.fetch_add(1, Ordering::Relaxed);
86        self.bytes_written.fetch_add(bytes, Ordering::Relaxed);
87    }
88
89    fn record_read(&self, bytes: u64) {
90        self.bytes_read.fetch_add(bytes, Ordering::Relaxed);
91    }
92
93    fn record_delete(&self, bytes: u64) {
94        self.bytes_used.fetch_sub(bytes, Ordering::Relaxed);
95        self.blocks_count.fetch_sub(1, Ordering::Relaxed);
96    }
97
98    fn record_violation(&self) {
99        self.violations.fetch_add(1, Ordering::Relaxed);
100    }
101
102    fn reset_bandwidth(&self) {
103        self.bytes_read.store(0, Ordering::Relaxed);
104        self.bytes_written.store(0, Ordering::Relaxed);
105        *self.last_reset.lock() = SystemTime::now();
106    }
107
108    fn should_reset(&self, period: Duration) -> bool {
109        let last = *self.last_reset.lock();
110        SystemTime::now().duration_since(last).unwrap_or_default() > period
111    }
112}
113
114/// Quota status
115#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
116pub enum QuotaStatus {
117    /// Within limits
118    Ok,
119    /// Exceeded soft limit (warning)
120    SoftLimitExceeded,
121    /// Exceeded hard limit (rejected)
122    HardLimitExceeded,
123}
124
125/// Quota violation type
126#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
127pub enum ViolationType {
128    /// Storage bytes exceeded
129    StorageBytes,
130    /// Block count exceeded
131    BlockCount,
132    /// Read bandwidth exceeded
133    ReadBandwidth,
134    /// Write bandwidth exceeded
135    WriteBandwidth,
136}
137
138/// Quota manager configuration
139#[derive(Debug, Clone)]
140pub struct QuotaManagerConfig {
141    /// Default quota for new tenants
142    pub default_quota: QuotaConfig,
143    /// Bandwidth tracking period
144    pub bandwidth_period: Duration,
145    /// Enable quota enforcement
146    pub enforcement_enabled: bool,
147}
148
149impl Default for QuotaManagerConfig {
150    fn default() -> Self {
151        Self {
152            default_quota: QuotaConfig::default(),
153            bandwidth_period: Duration::from_secs(60),
154            enforcement_enabled: true,
155        }
156    }
157}
158
159/// Tenant quota information
160struct TenantQuota {
161    config: parking_lot::RwLock<QuotaConfig>,
162    usage: QuotaUsage,
163}
164
165/// Quota Manager
166///
167/// Manages per-tenant storage quotas with enforcement
168pub struct QuotaManager {
169    tenants: DashMap<TenantId, TenantQuota>,
170    config: parking_lot::RwLock<QuotaManagerConfig>,
171    /// Mapping of CID to (tenant_id, size)
172    cid_map: DashMap<Cid, (TenantId, u64)>,
173}
174
175impl QuotaManager {
176    /// Create a new quota manager
177    pub fn new(config: QuotaManagerConfig) -> Self {
178        Self {
179            tenants: DashMap::new(),
180            config: parking_lot::RwLock::new(config),
181            cid_map: DashMap::new(),
182        }
183    }
184
185    /// Set quota for a tenant
186    pub fn set_quota(&self, tenant_id: TenantId, config: QuotaConfig) {
187        self.tenants
188            .entry(tenant_id.clone())
189            .and_modify(|tenant| *tenant.config.write() = config.clone())
190            .or_insert_with(|| TenantQuota {
191                config: parking_lot::RwLock::new(config),
192                usage: QuotaUsage::new(),
193            });
194        debug!("Set quota for tenant: {}", tenant_id);
195    }
196
197    /// Get quota configuration for a tenant
198    pub fn get_quota(&self, tenant_id: &str) -> Option<QuotaConfig> {
199        self.tenants
200            .get(tenant_id)
201            .map(|tenant| tenant.config.read().clone())
202    }
203
204    /// Get quota usage for a tenant
205    pub fn get_usage(&self, tenant_id: &str) -> Option<QuotaUsageSnapshot> {
206        self.tenants
207            .get(tenant_id)
208            .map(|tenant| QuotaUsageSnapshot {
209                bytes_used: tenant.usage.bytes_used.load(Ordering::Relaxed),
210                blocks_count: tenant.usage.blocks_count.load(Ordering::Relaxed),
211                bytes_read: tenant.usage.bytes_read.load(Ordering::Relaxed),
212                bytes_written: tenant.usage.bytes_written.load(Ordering::Relaxed),
213                violations: tenant.usage.violations.load(Ordering::Relaxed),
214            })
215    }
216
217    /// Check if a write operation is allowed
218    pub fn check_write_quota(
219        &self,
220        tenant_id: &str,
221        data_size: u64,
222    ) -> std::result::Result<QuotaStatus, ViolationType> {
223        let (enforcement_enabled, bandwidth_period) = {
224            let config_guard = self.config.read();
225            (
226                config_guard.enforcement_enabled,
227                config_guard.bandwidth_period,
228            )
229        };
230
231        if !enforcement_enabled {
232            return Ok(QuotaStatus::Ok);
233        }
234
235        let tenant = match self.tenants.get(tenant_id) {
236            Some(t) => t,
237            None => {
238                // Create tenant with default quota
239                let default_quota = self.config.read().default_quota.clone();
240                self.set_quota(tenant_id.to_string(), default_quota);
241                self.tenants.get(tenant_id).unwrap()
242            }
243        };
244
245        let quota_config = tenant.config.read();
246        let usage = &tenant.usage;
247
248        // Reset bandwidth if period expired
249        if usage.should_reset(bandwidth_period) {
250            usage.reset_bandwidth();
251        }
252
253        // Check storage bytes
254        if quota_config.max_bytes > 0 {
255            let current = usage.bytes_used.load(Ordering::Relaxed);
256            let projected = current + data_size;
257            let soft_limit =
258                (quota_config.max_bytes * quota_config.soft_limit_percent as u64) / 100;
259
260            if projected > quota_config.max_bytes {
261                if quota_config.hard_limit_enabled {
262                    usage.record_violation();
263                    return Err(ViolationType::StorageBytes);
264                }
265                return Ok(QuotaStatus::HardLimitExceeded);
266            } else if projected > soft_limit {
267                warn!(
268                    "Tenant {} exceeded soft storage limit: {} / {}",
269                    tenant_id, projected, quota_config.max_bytes
270                );
271                return Ok(QuotaStatus::SoftLimitExceeded);
272            }
273        }
274
275        // Check block count
276        if quota_config.max_blocks > 0 {
277            let current = usage.blocks_count.load(Ordering::Relaxed);
278            let soft_limit =
279                (quota_config.max_blocks * quota_config.soft_limit_percent as u64) / 100;
280
281            if current + 1 > quota_config.max_blocks {
282                if quota_config.hard_limit_enabled {
283                    usage.record_violation();
284                    return Err(ViolationType::BlockCount);
285                }
286                return Ok(QuotaStatus::HardLimitExceeded);
287            } else if current + 1 > soft_limit {
288                return Ok(QuotaStatus::SoftLimitExceeded);
289            }
290        }
291
292        // Check write bandwidth
293        if quota_config.max_write_bandwidth > 0 {
294            let current = usage.bytes_written.load(Ordering::Relaxed);
295            if current + data_size > quota_config.max_write_bandwidth {
296                if quota_config.hard_limit_enabled {
297                    usage.record_violation();
298                    return Err(ViolationType::WriteBandwidth);
299                }
300                return Ok(QuotaStatus::HardLimitExceeded);
301            }
302        }
303
304        Ok(QuotaStatus::Ok)
305    }
306
307    /// Check if a read operation is allowed
308    pub fn check_read_quota(
309        &self,
310        tenant_id: &str,
311        data_size: u64,
312    ) -> std::result::Result<QuotaStatus, ViolationType> {
313        let (enforcement_enabled, bandwidth_period) = {
314            let config_guard = self.config.read();
315            (
316                config_guard.enforcement_enabled,
317                config_guard.bandwidth_period,
318            )
319        };
320
321        if !enforcement_enabled {
322            return Ok(QuotaStatus::Ok);
323        }
324
325        let tenant = match self.tenants.get(tenant_id) {
326            Some(t) => t,
327            None => return Ok(QuotaStatus::Ok), // Allow reads for unknown tenants
328        };
329
330        let quota_config = tenant.config.read();
331        let usage = &tenant.usage;
332
333        // Reset bandwidth if period expired
334        if usage.should_reset(bandwidth_period) {
335            usage.reset_bandwidth();
336        }
337
338        // Check read bandwidth
339        if quota_config.max_read_bandwidth > 0 {
340            let current = usage.bytes_read.load(Ordering::Relaxed);
341            if current + data_size > quota_config.max_read_bandwidth {
342                if quota_config.hard_limit_enabled {
343                    usage.record_violation();
344                    return Err(ViolationType::ReadBandwidth);
345                }
346                return Ok(QuotaStatus::HardLimitExceeded);
347            }
348        }
349
350        Ok(QuotaStatus::Ok)
351    }
352
353    /// Record a write operation
354    pub fn record_write(&self, tenant_id: &str, cid: Cid, data_size: u64) {
355        if let Some(tenant) = self.tenants.get(tenant_id) {
356            tenant.usage.record_write(data_size);
357            self.cid_map.insert(cid, (tenant_id.to_string(), data_size));
358        }
359    }
360
361    /// Record a read operation
362    pub fn record_read(&self, tenant_id: &str, data_size: u64) {
363        if let Some(tenant) = self.tenants.get(tenant_id) {
364            tenant.usage.record_read(data_size);
365        }
366    }
367
368    /// Record a delete operation
369    pub fn record_delete(&self, cid: &Cid) {
370        if let Some((_, (tenant_id, data_size))) = self.cid_map.remove(cid) {
371            if let Some(tenant) = self.tenants.get(&tenant_id) {
372                tenant.usage.record_delete(data_size);
373            }
374        }
375    }
376
377    /// Get all tenants
378    pub fn list_tenants(&self) -> Vec<TenantId> {
379        self.tenants
380            .iter()
381            .map(|entry| entry.key().clone())
382            .collect()
383    }
384
385    /// Get quota report for a tenant
386    pub fn get_quota_report(&self, tenant_id: &str) -> Option<QuotaReport> {
387        let tenant = self.tenants.get(tenant_id)?;
388        let config = tenant.config.read().clone();
389        let usage_snapshot = QuotaUsageSnapshot {
390            bytes_used: tenant.usage.bytes_used.load(Ordering::Relaxed),
391            blocks_count: tenant.usage.blocks_count.load(Ordering::Relaxed),
392            bytes_read: tenant.usage.bytes_read.load(Ordering::Relaxed),
393            bytes_written: tenant.usage.bytes_written.load(Ordering::Relaxed),
394            violations: tenant.usage.violations.load(Ordering::Relaxed),
395        };
396
397        let storage_percent = if config.max_bytes > 0 {
398            usage_snapshot.bytes_used as f64 / config.max_bytes as f64 * 100.0
399        } else {
400            0.0
401        };
402
403        let blocks_percent = if config.max_blocks > 0 {
404            usage_snapshot.blocks_count as f64 / config.max_blocks as f64 * 100.0
405        } else {
406            0.0
407        };
408
409        Some(QuotaReport {
410            tenant_id: tenant_id.to_string(),
411            config,
412            usage: usage_snapshot,
413            storage_utilization_percent: storage_percent,
414            blocks_utilization_percent: blocks_percent,
415        })
416    }
417}
418
419/// Snapshot of quota usage
420#[derive(Debug, Clone, Serialize, Deserialize)]
421pub struct QuotaUsageSnapshot {
422    pub bytes_used: u64,
423    pub blocks_count: u64,
424    pub bytes_read: u64,
425    pub bytes_written: u64,
426    pub violations: u64,
427}
428
429/// Quota report
430#[derive(Debug, Clone, Serialize, Deserialize)]
431pub struct QuotaReport {
432    pub tenant_id: TenantId,
433    pub config: QuotaConfig,
434    pub usage: QuotaUsageSnapshot,
435    pub storage_utilization_percent: f64,
436    pub blocks_utilization_percent: f64,
437}
438
439/// Quota-enforced block store
440pub struct QuotaBlockStore<S: BlockStore> {
441    inner: Arc<S>,
442    quota_manager: Arc<QuotaManager>,
443    tenant_id: TenantId,
444}
445
446impl<S: BlockStore> QuotaBlockStore<S> {
447    /// Create a new quota-enforced block store
448    pub fn new(inner: Arc<S>, quota_manager: Arc<QuotaManager>, tenant_id: TenantId) -> Self {
449        Self {
450            inner,
451            quota_manager,
452            tenant_id,
453        }
454    }
455
456    /// Get the quota manager
457    pub fn quota_manager(&self) -> &Arc<QuotaManager> {
458        &self.quota_manager
459    }
460
461    /// Get the tenant ID
462    pub fn tenant_id(&self) -> &str {
463        &self.tenant_id
464    }
465}
466
467#[async_trait]
468impl<S: BlockStore + Send + Sync + 'static> BlockStore for QuotaBlockStore<S> {
469    async fn get(&self, cid: &Cid) -> Result<Option<Block>> {
470        let block_opt = self.inner.get(cid).await?;
471
472        if let Some(ref block) = block_opt {
473            // Check read quota
474            match self
475                .quota_manager
476                .check_read_quota(&self.tenant_id, block.data().len() as u64)
477            {
478                Ok(QuotaStatus::Ok) => {
479                    self.quota_manager
480                        .record_read(&self.tenant_id, block.data().len() as u64);
481                }
482                Ok(QuotaStatus::SoftLimitExceeded) => {
483                    warn!("Tenant {} exceeded soft read quota limit", self.tenant_id);
484                    self.quota_manager
485                        .record_read(&self.tenant_id, block.data().len() as u64);
486                }
487                Ok(QuotaStatus::HardLimitExceeded) | Err(_) => {
488                    return Err(Error::InvalidInput(format!(
489                        "Tenant {} exceeded read quota",
490                        self.tenant_id
491                    )))
492                }
493            }
494        }
495
496        Ok(block_opt)
497    }
498
499    async fn put(&self, block: &Block) -> Result<()> {
500        // Check write quota before writing
501        match self
502            .quota_manager
503            .check_write_quota(&self.tenant_id, block.data().len() as u64)
504        {
505            Ok(QuotaStatus::Ok) => {}
506            Ok(QuotaStatus::SoftLimitExceeded) => {
507                warn!(
508                    "Tenant {} exceeded soft storage quota limit",
509                    self.tenant_id
510                );
511            }
512            Ok(QuotaStatus::HardLimitExceeded) | Err(_) => {
513                return Err(Error::InvalidInput(format!(
514                    "Tenant {} exceeded storage quota",
515                    self.tenant_id
516                )))
517            }
518        }
519
520        self.inner.put(block).await?;
521        self.quota_manager
522            .record_write(&self.tenant_id, *block.cid(), block.data().len() as u64);
523        Ok(())
524    }
525
526    async fn has(&self, cid: &Cid) -> Result<bool> {
527        self.inner.has(cid).await
528    }
529
530    async fn delete(&self, cid: &Cid) -> Result<()> {
531        self.inner.delete(cid).await?;
532        self.quota_manager.record_delete(cid);
533        Ok(())
534    }
535
536    fn list_cids(&self) -> Result<Vec<Cid>> {
537        self.inner.list_cids()
538    }
539
540    fn len(&self) -> usize {
541        self.inner.len()
542    }
543}
544
545#[cfg(test)]
546mod tests {
547    use super::*;
548    use crate::memory::MemoryBlockStore;
549    use bytes::Bytes;
550
551    #[tokio::test]
552    async fn test_quota_enforcement() {
553        let config = QuotaManagerConfig {
554            default_quota: QuotaConfig {
555                max_bytes: 1000,
556                max_blocks: 10,
557                hard_limit_enabled: true,
558                ..Default::default()
559            },
560            enforcement_enabled: true,
561            ..Default::default()
562        };
563
564        let manager = Arc::new(QuotaManager::new(config));
565        let store = Arc::new(MemoryBlockStore::new());
566        let quota_store = QuotaBlockStore::new(store, manager.clone(), "tenant1".to_string());
567
568        let data = vec![0u8; 100];
569        let block = Block::new(Bytes::from(data)).unwrap();
570
571        // Should succeed
572        quota_store.put(&block).await.unwrap();
573
574        // Check usage
575        let usage = manager.get_usage("tenant1").unwrap();
576        assert_eq!(usage.bytes_used, 100);
577        assert_eq!(usage.blocks_count, 1);
578    }
579
580    #[tokio::test]
581    async fn test_quota_exceeded() {
582        let config = QuotaManagerConfig {
583            default_quota: QuotaConfig {
584                max_bytes: 50,
585                hard_limit_enabled: true,
586                ..Default::default()
587            },
588            enforcement_enabled: true,
589            ..Default::default()
590        };
591
592        let manager = Arc::new(QuotaManager::new(config));
593        let store = Arc::new(MemoryBlockStore::new());
594        let quota_store = QuotaBlockStore::new(store, manager, "tenant1".to_string());
595
596        let data = vec![0u8; 100];
597        let block = Block::new(Bytes::from(data)).unwrap();
598
599        // Should fail (exceeds quota)
600        let result = quota_store.put(&block).await;
601        assert!(result.is_err());
602    }
603
604    #[test]
605    fn test_quota_report() {
606        let manager = QuotaManager::new(QuotaManagerConfig::default());
607        manager.set_quota(
608            "tenant1".to_string(),
609            QuotaConfig {
610                max_bytes: 1000,
611                max_blocks: 100,
612                ..Default::default()
613            },
614        );
615
616        let cid = cid::Cid::default();
617        manager.record_write("tenant1", cid, 500);
618
619        let report = manager.get_quota_report("tenant1").unwrap();
620        assert_eq!(report.usage.bytes_used, 500);
621        assert_eq!(report.storage_utilization_percent, 50.0);
622    }
623}