1use crate::traits::BlockStore;
12use async_trait::async_trait;
13use dashmap::DashMap;
14use ipfrs_core::{Block, Cid, Error, Result};
15use serde::{Deserialize, Serialize};
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::sync::Arc;
18use std::time::{Duration, SystemTime};
19use tracing::{debug, warn};
20
21pub type TenantId = String;
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct QuotaConfig {
27 pub max_bytes: u64,
29 pub max_blocks: u64,
31 pub max_read_bandwidth: u64,
33 pub max_write_bandwidth: u64,
35 pub soft_limit_percent: u8,
37 pub hard_limit_enabled: bool,
39}
40
41impl Default for QuotaConfig {
42 fn default() -> Self {
43 Self {
44 max_bytes: 0,
45 max_blocks: 0,
46 max_read_bandwidth: 0,
47 max_write_bandwidth: 0,
48 soft_limit_percent: 80,
49 hard_limit_enabled: true,
50 }
51 }
52}
53
54#[derive(Debug)]
56pub struct QuotaUsage {
57 pub bytes_used: AtomicU64,
59 pub blocks_count: AtomicU64,
61 pub bytes_read: AtomicU64,
63 pub bytes_written: AtomicU64,
65 pub violations: AtomicU64,
67 pub last_reset: parking_lot::Mutex<SystemTime>,
69}
70
71impl QuotaUsage {
72 fn new() -> Self {
73 Self {
74 bytes_used: AtomicU64::new(0),
75 blocks_count: AtomicU64::new(0),
76 bytes_read: AtomicU64::new(0),
77 bytes_written: AtomicU64::new(0),
78 violations: AtomicU64::new(0),
79 last_reset: parking_lot::Mutex::new(SystemTime::now()),
80 }
81 }
82
83 fn record_write(&self, bytes: u64) {
84 self.bytes_used.fetch_add(bytes, Ordering::Relaxed);
85 self.blocks_count.fetch_add(1, Ordering::Relaxed);
86 self.bytes_written.fetch_add(bytes, Ordering::Relaxed);
87 }
88
89 fn record_read(&self, bytes: u64) {
90 self.bytes_read.fetch_add(bytes, Ordering::Relaxed);
91 }
92
93 fn record_delete(&self, bytes: u64) {
94 self.bytes_used.fetch_sub(bytes, Ordering::Relaxed);
95 self.blocks_count.fetch_sub(1, Ordering::Relaxed);
96 }
97
98 fn record_violation(&self) {
99 self.violations.fetch_add(1, Ordering::Relaxed);
100 }
101
102 fn reset_bandwidth(&self) {
103 self.bytes_read.store(0, Ordering::Relaxed);
104 self.bytes_written.store(0, Ordering::Relaxed);
105 *self.last_reset.lock() = SystemTime::now();
106 }
107
108 fn should_reset(&self, period: Duration) -> bool {
109 let last = *self.last_reset.lock();
110 SystemTime::now().duration_since(last).unwrap_or_default() > period
111 }
112}
113
114#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
116pub enum QuotaStatus {
117 Ok,
119 SoftLimitExceeded,
121 HardLimitExceeded,
123}
124
125#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
127pub enum ViolationType {
128 StorageBytes,
130 BlockCount,
132 ReadBandwidth,
134 WriteBandwidth,
136}
137
138#[derive(Debug, Clone)]
140pub struct QuotaManagerConfig {
141 pub default_quota: QuotaConfig,
143 pub bandwidth_period: Duration,
145 pub enforcement_enabled: bool,
147}
148
149impl Default for QuotaManagerConfig {
150 fn default() -> Self {
151 Self {
152 default_quota: QuotaConfig::default(),
153 bandwidth_period: Duration::from_secs(60),
154 enforcement_enabled: true,
155 }
156 }
157}
158
159struct TenantQuota {
161 config: parking_lot::RwLock<QuotaConfig>,
162 usage: QuotaUsage,
163}
164
165pub struct QuotaManager {
169 tenants: DashMap<TenantId, TenantQuota>,
170 config: parking_lot::RwLock<QuotaManagerConfig>,
171 cid_map: DashMap<Cid, (TenantId, u64)>,
173}
174
175impl QuotaManager {
176 pub fn new(config: QuotaManagerConfig) -> Self {
178 Self {
179 tenants: DashMap::new(),
180 config: parking_lot::RwLock::new(config),
181 cid_map: DashMap::new(),
182 }
183 }
184
185 pub fn set_quota(&self, tenant_id: TenantId, config: QuotaConfig) {
187 self.tenants
188 .entry(tenant_id.clone())
189 .and_modify(|tenant| *tenant.config.write() = config.clone())
190 .or_insert_with(|| TenantQuota {
191 config: parking_lot::RwLock::new(config),
192 usage: QuotaUsage::new(),
193 });
194 debug!("Set quota for tenant: {}", tenant_id);
195 }
196
197 pub fn get_quota(&self, tenant_id: &str) -> Option<QuotaConfig> {
199 self.tenants
200 .get(tenant_id)
201 .map(|tenant| tenant.config.read().clone())
202 }
203
204 pub fn get_usage(&self, tenant_id: &str) -> Option<QuotaUsageSnapshot> {
206 self.tenants
207 .get(tenant_id)
208 .map(|tenant| QuotaUsageSnapshot {
209 bytes_used: tenant.usage.bytes_used.load(Ordering::Relaxed),
210 blocks_count: tenant.usage.blocks_count.load(Ordering::Relaxed),
211 bytes_read: tenant.usage.bytes_read.load(Ordering::Relaxed),
212 bytes_written: tenant.usage.bytes_written.load(Ordering::Relaxed),
213 violations: tenant.usage.violations.load(Ordering::Relaxed),
214 })
215 }
216
217 pub fn check_write_quota(
219 &self,
220 tenant_id: &str,
221 data_size: u64,
222 ) -> std::result::Result<QuotaStatus, ViolationType> {
223 let (enforcement_enabled, bandwidth_period) = {
224 let config_guard = self.config.read();
225 (
226 config_guard.enforcement_enabled,
227 config_guard.bandwidth_period,
228 )
229 };
230
231 if !enforcement_enabled {
232 return Ok(QuotaStatus::Ok);
233 }
234
235 let tenant = match self.tenants.get(tenant_id) {
236 Some(t) => t,
237 None => {
238 let default_quota = self.config.read().default_quota.clone();
240 self.set_quota(tenant_id.to_string(), default_quota);
241 self.tenants.get(tenant_id).unwrap()
242 }
243 };
244
245 let quota_config = tenant.config.read();
246 let usage = &tenant.usage;
247
248 if usage.should_reset(bandwidth_period) {
250 usage.reset_bandwidth();
251 }
252
253 if quota_config.max_bytes > 0 {
255 let current = usage.bytes_used.load(Ordering::Relaxed);
256 let projected = current + data_size;
257 let soft_limit =
258 (quota_config.max_bytes * quota_config.soft_limit_percent as u64) / 100;
259
260 if projected > quota_config.max_bytes {
261 if quota_config.hard_limit_enabled {
262 usage.record_violation();
263 return Err(ViolationType::StorageBytes);
264 }
265 return Ok(QuotaStatus::HardLimitExceeded);
266 } else if projected > soft_limit {
267 warn!(
268 "Tenant {} exceeded soft storage limit: {} / {}",
269 tenant_id, projected, quota_config.max_bytes
270 );
271 return Ok(QuotaStatus::SoftLimitExceeded);
272 }
273 }
274
275 if quota_config.max_blocks > 0 {
277 let current = usage.blocks_count.load(Ordering::Relaxed);
278 let soft_limit =
279 (quota_config.max_blocks * quota_config.soft_limit_percent as u64) / 100;
280
281 if current + 1 > quota_config.max_blocks {
282 if quota_config.hard_limit_enabled {
283 usage.record_violation();
284 return Err(ViolationType::BlockCount);
285 }
286 return Ok(QuotaStatus::HardLimitExceeded);
287 } else if current + 1 > soft_limit {
288 return Ok(QuotaStatus::SoftLimitExceeded);
289 }
290 }
291
292 if quota_config.max_write_bandwidth > 0 {
294 let current = usage.bytes_written.load(Ordering::Relaxed);
295 if current + data_size > quota_config.max_write_bandwidth {
296 if quota_config.hard_limit_enabled {
297 usage.record_violation();
298 return Err(ViolationType::WriteBandwidth);
299 }
300 return Ok(QuotaStatus::HardLimitExceeded);
301 }
302 }
303
304 Ok(QuotaStatus::Ok)
305 }
306
307 pub fn check_read_quota(
309 &self,
310 tenant_id: &str,
311 data_size: u64,
312 ) -> std::result::Result<QuotaStatus, ViolationType> {
313 let (enforcement_enabled, bandwidth_period) = {
314 let config_guard = self.config.read();
315 (
316 config_guard.enforcement_enabled,
317 config_guard.bandwidth_period,
318 )
319 };
320
321 if !enforcement_enabled {
322 return Ok(QuotaStatus::Ok);
323 }
324
325 let tenant = match self.tenants.get(tenant_id) {
326 Some(t) => t,
327 None => return Ok(QuotaStatus::Ok), };
329
330 let quota_config = tenant.config.read();
331 let usage = &tenant.usage;
332
333 if usage.should_reset(bandwidth_period) {
335 usage.reset_bandwidth();
336 }
337
338 if quota_config.max_read_bandwidth > 0 {
340 let current = usage.bytes_read.load(Ordering::Relaxed);
341 if current + data_size > quota_config.max_read_bandwidth {
342 if quota_config.hard_limit_enabled {
343 usage.record_violation();
344 return Err(ViolationType::ReadBandwidth);
345 }
346 return Ok(QuotaStatus::HardLimitExceeded);
347 }
348 }
349
350 Ok(QuotaStatus::Ok)
351 }
352
353 pub fn record_write(&self, tenant_id: &str, cid: Cid, data_size: u64) {
355 if let Some(tenant) = self.tenants.get(tenant_id) {
356 tenant.usage.record_write(data_size);
357 self.cid_map.insert(cid, (tenant_id.to_string(), data_size));
358 }
359 }
360
361 pub fn record_read(&self, tenant_id: &str, data_size: u64) {
363 if let Some(tenant) = self.tenants.get(tenant_id) {
364 tenant.usage.record_read(data_size);
365 }
366 }
367
368 pub fn record_delete(&self, cid: &Cid) {
370 if let Some((_, (tenant_id, data_size))) = self.cid_map.remove(cid) {
371 if let Some(tenant) = self.tenants.get(&tenant_id) {
372 tenant.usage.record_delete(data_size);
373 }
374 }
375 }
376
377 pub fn list_tenants(&self) -> Vec<TenantId> {
379 self.tenants
380 .iter()
381 .map(|entry| entry.key().clone())
382 .collect()
383 }
384
385 pub fn get_quota_report(&self, tenant_id: &str) -> Option<QuotaReport> {
387 let tenant = self.tenants.get(tenant_id)?;
388 let config = tenant.config.read().clone();
389 let usage_snapshot = QuotaUsageSnapshot {
390 bytes_used: tenant.usage.bytes_used.load(Ordering::Relaxed),
391 blocks_count: tenant.usage.blocks_count.load(Ordering::Relaxed),
392 bytes_read: tenant.usage.bytes_read.load(Ordering::Relaxed),
393 bytes_written: tenant.usage.bytes_written.load(Ordering::Relaxed),
394 violations: tenant.usage.violations.load(Ordering::Relaxed),
395 };
396
397 let storage_percent = if config.max_bytes > 0 {
398 usage_snapshot.bytes_used as f64 / config.max_bytes as f64 * 100.0
399 } else {
400 0.0
401 };
402
403 let blocks_percent = if config.max_blocks > 0 {
404 usage_snapshot.blocks_count as f64 / config.max_blocks as f64 * 100.0
405 } else {
406 0.0
407 };
408
409 Some(QuotaReport {
410 tenant_id: tenant_id.to_string(),
411 config,
412 usage: usage_snapshot,
413 storage_utilization_percent: storage_percent,
414 blocks_utilization_percent: blocks_percent,
415 })
416 }
417}
418
419#[derive(Debug, Clone, Serialize, Deserialize)]
421pub struct QuotaUsageSnapshot {
422 pub bytes_used: u64,
423 pub blocks_count: u64,
424 pub bytes_read: u64,
425 pub bytes_written: u64,
426 pub violations: u64,
427}
428
429#[derive(Debug, Clone, Serialize, Deserialize)]
431pub struct QuotaReport {
432 pub tenant_id: TenantId,
433 pub config: QuotaConfig,
434 pub usage: QuotaUsageSnapshot,
435 pub storage_utilization_percent: f64,
436 pub blocks_utilization_percent: f64,
437}
438
439pub struct QuotaBlockStore<S: BlockStore> {
441 inner: Arc<S>,
442 quota_manager: Arc<QuotaManager>,
443 tenant_id: TenantId,
444}
445
446impl<S: BlockStore> QuotaBlockStore<S> {
447 pub fn new(inner: Arc<S>, quota_manager: Arc<QuotaManager>, tenant_id: TenantId) -> Self {
449 Self {
450 inner,
451 quota_manager,
452 tenant_id,
453 }
454 }
455
456 pub fn quota_manager(&self) -> &Arc<QuotaManager> {
458 &self.quota_manager
459 }
460
461 pub fn tenant_id(&self) -> &str {
463 &self.tenant_id
464 }
465}
466
467#[async_trait]
468impl<S: BlockStore + Send + Sync + 'static> BlockStore for QuotaBlockStore<S> {
469 async fn get(&self, cid: &Cid) -> Result<Option<Block>> {
470 let block_opt = self.inner.get(cid).await?;
471
472 if let Some(ref block) = block_opt {
473 match self
475 .quota_manager
476 .check_read_quota(&self.tenant_id, block.data().len() as u64)
477 {
478 Ok(QuotaStatus::Ok) => {
479 self.quota_manager
480 .record_read(&self.tenant_id, block.data().len() as u64);
481 }
482 Ok(QuotaStatus::SoftLimitExceeded) => {
483 warn!("Tenant {} exceeded soft read quota limit", self.tenant_id);
484 self.quota_manager
485 .record_read(&self.tenant_id, block.data().len() as u64);
486 }
487 Ok(QuotaStatus::HardLimitExceeded) | Err(_) => {
488 return Err(Error::InvalidInput(format!(
489 "Tenant {} exceeded read quota",
490 self.tenant_id
491 )))
492 }
493 }
494 }
495
496 Ok(block_opt)
497 }
498
499 async fn put(&self, block: &Block) -> Result<()> {
500 match self
502 .quota_manager
503 .check_write_quota(&self.tenant_id, block.data().len() as u64)
504 {
505 Ok(QuotaStatus::Ok) => {}
506 Ok(QuotaStatus::SoftLimitExceeded) => {
507 warn!(
508 "Tenant {} exceeded soft storage quota limit",
509 self.tenant_id
510 );
511 }
512 Ok(QuotaStatus::HardLimitExceeded) | Err(_) => {
513 return Err(Error::InvalidInput(format!(
514 "Tenant {} exceeded storage quota",
515 self.tenant_id
516 )))
517 }
518 }
519
520 self.inner.put(block).await?;
521 self.quota_manager
522 .record_write(&self.tenant_id, *block.cid(), block.data().len() as u64);
523 Ok(())
524 }
525
526 async fn has(&self, cid: &Cid) -> Result<bool> {
527 self.inner.has(cid).await
528 }
529
530 async fn delete(&self, cid: &Cid) -> Result<()> {
531 self.inner.delete(cid).await?;
532 self.quota_manager.record_delete(cid);
533 Ok(())
534 }
535
536 fn list_cids(&self) -> Result<Vec<Cid>> {
537 self.inner.list_cids()
538 }
539
540 fn len(&self) -> usize {
541 self.inner.len()
542 }
543}
544
545#[cfg(test)]
546mod tests {
547 use super::*;
548 use crate::memory::MemoryBlockStore;
549 use bytes::Bytes;
550
551 #[tokio::test]
552 async fn test_quota_enforcement() {
553 let config = QuotaManagerConfig {
554 default_quota: QuotaConfig {
555 max_bytes: 1000,
556 max_blocks: 10,
557 hard_limit_enabled: true,
558 ..Default::default()
559 },
560 enforcement_enabled: true,
561 ..Default::default()
562 };
563
564 let manager = Arc::new(QuotaManager::new(config));
565 let store = Arc::new(MemoryBlockStore::new());
566 let quota_store = QuotaBlockStore::new(store, manager.clone(), "tenant1".to_string());
567
568 let data = vec![0u8; 100];
569 let block = Block::new(Bytes::from(data)).unwrap();
570
571 quota_store.put(&block).await.unwrap();
573
574 let usage = manager.get_usage("tenant1").unwrap();
576 assert_eq!(usage.bytes_used, 100);
577 assert_eq!(usage.blocks_count, 1);
578 }
579
580 #[tokio::test]
581 async fn test_quota_exceeded() {
582 let config = QuotaManagerConfig {
583 default_quota: QuotaConfig {
584 max_bytes: 50,
585 hard_limit_enabled: true,
586 ..Default::default()
587 },
588 enforcement_enabled: true,
589 ..Default::default()
590 };
591
592 let manager = Arc::new(QuotaManager::new(config));
593 let store = Arc::new(MemoryBlockStore::new());
594 let quota_store = QuotaBlockStore::new(store, manager, "tenant1".to_string());
595
596 let data = vec![0u8; 100];
597 let block = Block::new(Bytes::from(data)).unwrap();
598
599 let result = quota_store.put(&block).await;
601 assert!(result.is_err());
602 }
603
604 #[test]
605 fn test_quota_report() {
606 let manager = QuotaManager::new(QuotaManagerConfig::default());
607 manager.set_quota(
608 "tenant1".to_string(),
609 QuotaConfig {
610 max_bytes: 1000,
611 max_blocks: 100,
612 ..Default::default()
613 },
614 );
615
616 let cid = cid::Cid::default();
617 manager.record_write("tenant1", cid, 500);
618
619 let report = manager.get_quota_report("tenant1").unwrap();
620 assert_eq!(report.usage.bytes_used, 500);
621 assert_eq!(report.storage_utilization_percent, 50.0);
622 }
623}