1use std::sync::{Arc, RwLock};
2use std::collections::HashMap;
3use chrono::{DateTime, Utc, Duration, Datelike};
4use serde::{Deserialize, Serialize};
5
6use super::tenant::{TenantId, ResourceLimits};
7use crate::error::{EventualiError, Result};
8
9#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Serialize, Deserialize)]
11pub enum ResourceType {
12 Events,
13 Storage,
14 Streams,
15 Projections,
16 Aggregates,
17 ApiCalls,
18}
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
22#[derive(Default)]
23pub enum QuotaTier {
24 Starter,
25 #[default]
26 Standard,
27 Professional,
28 Enterprise,
29}
30
31
32#[derive(Debug, Clone)]
34pub struct QuotaCheckResult {
35 pub allowed: bool,
36 pub current_usage: u64,
37 pub limit: Option<u64>,
38 pub utilization_percentage: f64,
39 pub grace_period_active: bool,
40 pub warning_triggered: bool,
41 pub estimated_overage_cost: f64,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)]
46pub enum AlertType {
47 Warning, Critical, Exceeded, Violation, }
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct QuotaAlert {
56 pub tenant_id: TenantId,
57 pub resource_type: ResourceType,
58 pub alert_type: AlertType,
59 pub current_usage: u64,
60 pub limit: u64,
61 pub utilization_percentage: f64,
62 pub message: String,
63 pub timestamp: DateTime<Utc>,
64 pub acknowledged: bool,
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct AlertSummary {
70 pub total_alerts: usize,
71 pub unacknowledged_alerts: usize,
72 pub critical_alerts: usize,
73 pub warning_alerts: usize,
74 pub last_alert: Option<DateTime<Utc>>,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct BillingAnalytics {
80 pub tenant_id: TenantId,
81 pub current_month_cost: f64,
82 pub projected_month_cost: f64,
83 pub overage_costs: HashMap<ResourceType, f64>,
84 pub cost_breakdown: HashMap<ResourceType, f64>,
85 pub billing_period_start: DateTime<Utc>,
86 pub billing_period_end: DateTime<Utc>,
87 pub cost_trend: Vec<DailyCostEntry>,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct DailyCostEntry {
93 pub date: DateTime<Utc>,
94 pub cost: f64,
95 pub usage_breakdown: HashMap<ResourceType, u64>,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct UsageTrends {
101 pub daily_event_trend: Vec<DailyUsageEntry>,
102 pub storage_growth_trend: Vec<DailyUsageEntry>,
103 pub api_calls_trend: Vec<DailyUsageEntry>,
104 pub peak_usage_times: HashMap<ResourceType, Vec<DateTime<Utc>>>,
105 pub usage_patterns: HashMap<ResourceType, UsagePattern>,
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct DailyUsageEntry {
111 pub date: DateTime<Utc>,
112 pub usage: u64,
113 pub percentage_of_limit: f64,
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize)]
118pub enum UsagePattern {
119 Stable, Growing, Declining, Volatile, Seasonal, }
125
126#[derive(Debug, Clone)]
128pub struct QuotaAlertManager {
129 tenant_id: TenantId,
130 alerts_history: Vec<QuotaAlert>,
131 #[allow(dead_code)] alert_thresholds: HashMap<ResourceType, Vec<f64>>, last_alert_sent: HashMap<(ResourceType, AlertType), DateTime<Utc>>,
134 alert_cooldown: Duration,
135}
136
137impl QuotaAlertManager {
138 pub fn new(tenant_id: TenantId) -> Self {
139 let mut alert_thresholds = HashMap::new();
140
141 for resource_type in [ResourceType::Events, ResourceType::Storage, ResourceType::Streams,
143 ResourceType::Projections, ResourceType::Aggregates, ResourceType::ApiCalls] {
144 alert_thresholds.insert(resource_type, vec![80.0, 90.0, 95.0]);
145 }
146
147 Self {
148 tenant_id,
149 alerts_history: Vec::new(),
150 alert_thresholds,
151 last_alert_sent: HashMap::new(),
152 alert_cooldown: Duration::minutes(15), }
154 }
155
156 pub fn trigger_warning_alert(&mut self, resource_type: ResourceType, utilization: f64) {
157 let alert_type = if utilization >= 90.0 {
158 AlertType::Critical
159 } else {
160 AlertType::Warning
161 };
162
163 let key = (resource_type, alert_type.clone());
165 if let Some(last_sent) = self.last_alert_sent.get(&key) {
166 if Utc::now().signed_duration_since(*last_sent) < self.alert_cooldown {
167 return; }
169 }
170
171 let alert = QuotaAlert {
172 tenant_id: self.tenant_id.clone(),
173 resource_type,
174 alert_type: alert_type.clone(),
175 current_usage: 0, limit: 0, utilization_percentage: utilization,
178 message: format!(
179 "{:?} usage for tenant {} has reached {:.1}%",
180 resource_type, self.tenant_id.as_str(), utilization
181 ),
182 timestamp: Utc::now(),
183 acknowledged: false,
184 };
185
186 self.alerts_history.push(alert);
187 self.last_alert_sent.insert(key, Utc::now());
188
189 if self.alerts_history.len() > 1000 {
191 self.alerts_history.drain(..self.alerts_history.len() - 1000);
192 }
193 }
194
195 pub fn check_and_trigger_alerts(&mut self, _resource_type: ResourceType, _amount: u64) {
196 }
199
200 pub fn get_summary(&self) -> AlertSummary {
201 let unacknowledged = self.alerts_history.iter()
202 .filter(|a| !a.acknowledged)
203 .count();
204
205 let critical = self.alerts_history.iter()
206 .filter(|a| matches!(a.alert_type, AlertType::Critical | AlertType::Exceeded | AlertType::Violation))
207 .count();
208
209 let warning = self.alerts_history.iter()
210 .filter(|a| matches!(a.alert_type, AlertType::Warning))
211 .count();
212
213 AlertSummary {
214 total_alerts: self.alerts_history.len(),
215 unacknowledged_alerts: unacknowledged,
216 critical_alerts: critical,
217 warning_alerts: warning,
218 last_alert: self.alerts_history.last().map(|a| a.timestamp),
219 }
220 }
221
222 pub fn get_alerts_history(&self, limit: usize) -> Vec<QuotaAlert> {
223 let start = if self.alerts_history.len() > limit {
224 self.alerts_history.len() - limit
225 } else {
226 0
227 };
228
229 self.alerts_history[start..].to_vec()
230 }
231
232 pub fn acknowledge_alert(&mut self, alert_index: usize) -> Result<()> {
233 if alert_index < self.alerts_history.len() {
234 self.alerts_history[alert_index].acknowledged = true;
235 Ok(())
236 } else {
237 Err(EventualiError::Tenant(format!("Alert index {alert_index} not found")))
238 }
239 }
240}
241
242#[derive(Debug, Clone)]
244pub struct BillingTracker {
245 tenant_id: TenantId,
246 tier: QuotaTier,
247 current_month_usage: HashMap<ResourceType, u64>,
248 current_month_cost: f64,
249 overage_costs: HashMap<ResourceType, f64>,
250 billing_period_start: DateTime<Utc>,
251 daily_costs: Vec<DailyCostEntry>,
252 rate_card: HashMap<ResourceType, f64>, }
254
255impl BillingTracker {
256 pub fn new(tenant_id: TenantId) -> Self {
257 let now = Utc::now();
258 let billing_start = now.with_day(1).unwrap_or(now); let mut rate_card = HashMap::new();
261 rate_card.insert(ResourceType::Events, 0.0001);
263 rate_card.insert(ResourceType::Storage, 0.001);
264 rate_card.insert(ResourceType::Aggregates, 0.0002);
265 rate_card.insert(ResourceType::Projections, 0.01);
266 rate_card.insert(ResourceType::Streams, 0.05);
267 rate_card.insert(ResourceType::ApiCalls, 0.00005);
268
269 Self {
270 tenant_id,
271 tier: QuotaTier::Standard,
272 current_month_usage: HashMap::new(),
273 current_month_cost: 0.0,
274 overage_costs: HashMap::new(),
275 billing_period_start: billing_start,
276 daily_costs: Vec::new(),
277 rate_card,
278 }
279 }
280
281 pub fn record_usage(&mut self, resource_type: ResourceType, amount: u64) {
282 let current = self.current_month_usage.entry(resource_type).or_insert(0);
284 *current += amount;
285
286 if let Some(&rate) = self.rate_card.get(&resource_type) {
288 let cost = amount as f64 * rate;
289 self.current_month_cost += cost;
290 }
291 }
292
293 pub fn record_overage(&mut self, resource_type: ResourceType, amount: u64, cost_per_unit: f64) {
294 let overage_cost = amount as f64 * cost_per_unit;
295 let current_overage = self.overage_costs.entry(resource_type).or_insert(0.0);
296 *current_overage += overage_cost;
297 self.current_month_cost += overage_cost;
298 }
299
300 pub fn finalize_daily_billing(&mut self) {
301 let today = Utc::now();
302 let daily_entry = DailyCostEntry {
303 date: today,
304 cost: self.calculate_daily_cost(),
305 usage_breakdown: self.current_month_usage.clone(),
306 };
307
308 self.daily_costs.push(daily_entry);
309
310 if self.daily_costs.len() > 30 {
312 self.daily_costs.remove(0);
313 }
314 }
315
316 fn calculate_daily_cost(&self) -> f64 {
317 self.current_month_cost / Utc::now().day() as f64
319 }
320
321 pub fn get_analytics(&self) -> BillingAnalytics {
322 let now = Utc::now();
323 let days_in_month = now.with_day(1)
324 .and_then(|d| d.with_month(d.month() + 1))
325 .and_then(|d| d.with_day(1))
326 .map(|next_month| (next_month - Duration::days(1)).day())
327 .unwrap_or(30);
328
329 let days_elapsed = now.day();
330 let projected_cost = if days_elapsed > 0 {
331 (self.current_month_cost / days_elapsed as f64) * days_in_month as f64
332 } else {
333 self.current_month_cost
334 };
335
336 let mut cost_breakdown = HashMap::new();
337 for (resource_type, &usage) in &self.current_month_usage {
338 if let Some(&rate) = self.rate_card.get(resource_type) {
339 cost_breakdown.insert(*resource_type, usage as f64 * rate);
340 }
341 }
342
343 BillingAnalytics {
344 tenant_id: self.tenant_id.clone(),
345 current_month_cost: self.current_month_cost,
346 projected_month_cost: projected_cost,
347 overage_costs: self.overage_costs.clone(),
348 cost_breakdown,
349 billing_period_start: self.billing_period_start,
350 billing_period_end: now.with_day(days_in_month).unwrap_or(now),
351 cost_trend: self.daily_costs.clone(),
352 }
353 }
354
355 pub fn update_tier(&mut self, new_tier: QuotaTier) {
356 self.tier = new_tier;
357 self.update_rate_card_for_tier();
359 }
360
361 fn update_rate_card_for_tier(&mut self) {
362 let multiplier = match self.tier {
363 QuotaTier::Enterprise => 0.7, QuotaTier::Professional => 0.85, QuotaTier::Standard => 1.0, QuotaTier::Starter => 1.2, };
368
369 for (_, rate) in self.rate_card.iter_mut() {
370 *rate *= multiplier;
371 }
372 }
373
374 pub fn reset_monthly_billing(&mut self) {
375 self.current_month_usage.clear();
376 self.current_month_cost = 0.0;
377 self.overage_costs.clear();
378 self.billing_period_start = Utc::now().with_day(1).unwrap_or(Utc::now());
379 self.daily_costs.clear();
380 }
381}
382
383#[derive(Debug, Clone)]
385pub struct EnhancedResourceTracker {
386 daily_events: u64,
387 daily_reset_time: DateTime<Utc>,
388 storage_used_mb: f64,
389 concurrent_streams: u32,
390 total_projections: u32,
391 total_aggregates: u64,
392 daily_api_calls: u64,
393 api_call_limits: HashMap<ResourceType, u64>,
394 last_updated: DateTime<Utc>,
395
396 usage_history: Vec<DailyUsageEntry>,
398 peak_usage_tracker: HashMap<ResourceType, u64>,
399 usage_patterns: HashMap<ResourceType, Vec<u64>>, }
401
402impl Default for EnhancedResourceTracker {
403 fn default() -> Self {
404 Self::new()
405 }
406}
407
408impl EnhancedResourceTracker {
409 pub fn new() -> Self {
410 let now = Utc::now();
411 let mut api_call_limits = HashMap::new();
412 api_call_limits.insert(ResourceType::ApiCalls, 100_000); Self {
415 daily_events: 0,
416 daily_reset_time: now,
417 storage_used_mb: 0.0,
418 concurrent_streams: 0,
419 total_projections: 0,
420 total_aggregates: 0,
421 daily_api_calls: 0,
422 api_call_limits,
423 last_updated: now,
424 usage_history: Vec::new(),
425 peak_usage_tracker: HashMap::new(),
426 usage_patterns: HashMap::new(),
427 }
428 }
429
430 pub fn record_usage(&mut self, resource_type: ResourceType, amount: u64) {
431 self.last_updated = Utc::now();
432
433 match resource_type {
434 ResourceType::Events => {
435 self.ensure_daily_counter_fresh();
436 self.daily_events += amount;
437 self.update_usage_patterns(resource_type, self.daily_events);
438 },
439 ResourceType::Storage => {
440 self.storage_used_mb += amount as f64;
441 self.update_usage_patterns(resource_type, self.storage_used_mb as u64);
442 },
443 ResourceType::Streams => {
444 self.concurrent_streams += amount as u32;
445 self.update_peak_usage(resource_type, self.concurrent_streams as u64);
446 },
447 ResourceType::Projections => {
448 self.total_projections += amount as u32;
449 self.update_usage_patterns(resource_type, self.total_projections as u64);
450 },
451 ResourceType::Aggregates => {
452 self.total_aggregates += amount;
453 self.update_usage_patterns(resource_type, self.total_aggregates);
454 },
455 ResourceType::ApiCalls => {
456 self.ensure_daily_counter_fresh();
457 self.daily_api_calls += amount;
458 self.update_usage_patterns(resource_type, self.daily_api_calls);
459 },
460 }
461
462 self.update_peak_usage(resource_type, amount);
464 }
465
466 pub fn get_daily_events(&self) -> u64 {
467 if self.is_daily_counter_stale() {
468 0 } else {
470 self.daily_events
471 }
472 }
473
474 pub fn get_api_calls_today(&self) -> u64 {
475 if self.is_daily_counter_stale() {
476 0
477 } else {
478 self.daily_api_calls
479 }
480 }
481
482 pub fn reset_daily_counters(&mut self) {
483 self.store_daily_usage();
485
486 self.daily_events = 0;
487 self.daily_api_calls = 0;
488 self.daily_reset_time = Utc::now();
489 }
490
491 fn ensure_daily_counter_fresh(&mut self) {
492 if self.is_daily_counter_stale() {
493 self.reset_daily_counters();
494 }
495 }
496
497 fn is_daily_counter_stale(&self) -> bool {
498 let now = Utc::now();
499 now.signed_duration_since(self.daily_reset_time) >= Duration::days(1)
500 }
501
502 fn store_daily_usage(&mut self) {
504 let entry = DailyUsageEntry {
505 date: self.daily_reset_time,
506 usage: self.daily_events,
507 percentage_of_limit: 0.0, };
509
510 self.usage_history.push(entry);
511
512 if self.usage_history.len() > 30 {
514 self.usage_history.remove(0);
515 }
516 }
517
518 fn update_usage_patterns(&mut self, resource_type: ResourceType, current_usage: u64) {
520 let patterns = self.usage_patterns.entry(resource_type).or_default();
521 patterns.push(current_usage);
522
523 if patterns.len() > 24 {
525 patterns.remove(0);
526 }
527 }
528
529 fn update_peak_usage(&mut self, resource_type: ResourceType, usage: u64) {
531 let current_peak = self.peak_usage_tracker.get(&resource_type).unwrap_or(&0);
532 if usage > *current_peak {
533 self.peak_usage_tracker.insert(resource_type, usage);
534 }
535 }
536
537 pub fn get_usage_trends(&self) -> UsageTrends {
539 UsageTrends {
540 daily_event_trend: self.usage_history.clone(),
541 storage_growth_trend: self.calculate_storage_trend(),
542 api_calls_trend: self.calculate_api_calls_trend(),
543 peak_usage_times: HashMap::new(), usage_patterns: self.analyze_usage_patterns(),
545 }
546 }
547
548 fn calculate_storage_trend(&self) -> Vec<DailyUsageEntry> {
550 vec![
552 DailyUsageEntry {
553 date: Utc::now() - Duration::days(1),
554 usage: (self.storage_used_mb * 0.9) as u64,
555 percentage_of_limit: 0.0,
556 },
557 DailyUsageEntry {
558 date: Utc::now(),
559 usage: self.storage_used_mb as u64,
560 percentage_of_limit: 0.0,
561 },
562 ]
563 }
564
565 fn calculate_api_calls_trend(&self) -> Vec<DailyUsageEntry> {
567 vec![
568 DailyUsageEntry {
569 date: Utc::now(),
570 usage: self.daily_api_calls,
571 percentage_of_limit: 0.0,
572 },
573 ]
574 }
575
576 fn analyze_usage_patterns(&self) -> HashMap<ResourceType, UsagePattern> {
578 let mut patterns = HashMap::new();
579
580 for (resource_type, usage_data) in &self.usage_patterns {
581 if usage_data.len() < 3 {
582 patterns.insert(*resource_type, UsagePattern::Stable);
583 continue;
584 }
585
586 let pattern = self.detect_pattern(usage_data);
587 patterns.insert(*resource_type, pattern);
588 }
589
590 patterns
591 }
592
593 fn detect_pattern(&self, data: &[u64]) -> UsagePattern {
595 if data.len() < 3 {
596 return UsagePattern::Stable;
597 }
598
599 let variance = self.calculate_variance(data);
600 let trend = self.calculate_trend(data);
601
602 if variance > data.iter().sum::<u64>() as f64 / data.len() as f64 * 0.5 {
603 UsagePattern::Volatile
604 } else if trend > 0.1 {
605 UsagePattern::Growing
606 } else if trend < -0.1 {
607 UsagePattern::Declining
608 } else {
609 UsagePattern::Stable
610 }
611 }
612
613 fn calculate_variance(&self, data: &[u64]) -> f64 {
615 let mean = data.iter().sum::<u64>() as f64 / data.len() as f64;
616 let variance = data.iter()
617 .map(|&x| (x as f64 - mean).powi(2))
618 .sum::<f64>() / data.len() as f64;
619 variance
620 }
621
622 fn calculate_trend(&self, data: &[u64]) -> f64 {
624 if data.len() < 2 {
625 return 0.0;
626 }
627
628 let n = data.len() as f64;
629 let sum_x = (0..data.len()).sum::<usize>() as f64;
630 let sum_y = data.iter().sum::<u64>() as f64;
631 let sum_xy = data.iter().enumerate()
632 .map(|(i, &y)| i as f64 * y as f64)
633 .sum::<f64>();
634 let sum_x2 = (0..data.len()).map(|i| (i * i) as f64).sum::<f64>();
635
636 (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x * sum_x)
637 }
638
639 pub fn has_stable_usage_pattern(&self) -> bool {
641 self.usage_patterns.iter()
642 .all(|(_, data)| {
643 if data.len() < 3 { return true; }
644 let pattern = self.detect_pattern(data);
645 matches!(pattern, UsagePattern::Stable)
646 })
647 }
648
649 pub fn get_events_utilization(&self, limits: &ResourceLimits) -> f64 {
651 if let Some(limit) = limits.max_events_per_day {
652 (self.get_daily_events() as f64 / limit as f64) * 100.0
653 } else {
654 0.0
655 }
656 }
657
658 pub fn get_storage_utilization(&self, limits: &ResourceLimits) -> f64 {
659 if let Some(limit) = limits.max_storage_mb {
660 (self.storage_used_mb / limit as f64) * 100.0
661 } else {
662 0.0
663 }
664 }
665
666 pub fn get_aggregates_utilization(&self, limits: &ResourceLimits) -> f64 {
667 if let Some(limit) = limits.max_aggregates {
668 (self.total_aggregates as f64 / limit as f64) * 100.0
669 } else {
670 0.0
671 }
672 }
673}
674
675#[derive(Debug, Clone)]
677pub struct EnhancedResourceUsage {
678 pub tenant_id: TenantId,
679 pub tier: QuotaTier,
680 pub daily_events: u64,
681 pub storage_used_mb: f64,
682 pub concurrent_streams: u32,
683 pub total_projections: u32,
684 pub total_aggregates: u64,
685 pub api_calls_today: u64,
686 pub limits: ResourceLimits,
687 pub last_updated: DateTime<Utc>,
688 pub usage_trends: UsageTrends,
689 pub cost_analytics: BillingAnalytics,
690 pub alert_summary: AlertSummary,
691 pub performance_score: f64,
692}
693
694impl EnhancedResourceUsage {
695 pub fn utilization_percentage(&self, resource_type: ResourceType) -> Option<f64> {
697 match resource_type {
698 ResourceType::Events => {
699 self.limits.max_events_per_day.map(|limit| {
700 if limit == 0 { 0.0 } else { (self.daily_events as f64 / limit as f64) * 100.0 }
701 })
702 },
703 ResourceType::Storage => {
704 self.limits.max_storage_mb.map(|limit| {
705 if limit == 0 { 0.0 } else { (self.storage_used_mb / limit as f64) * 100.0 }
706 })
707 },
708 ResourceType::Streams => {
709 self.limits.max_concurrent_streams.map(|limit| {
710 if limit == 0 { 0.0 } else { (self.concurrent_streams as f64 / limit as f64) * 100.0 }
711 })
712 },
713 ResourceType::Projections => {
714 self.limits.max_projections.map(|limit| {
715 if limit == 0 { 0.0 } else { (self.total_projections as f64 / limit as f64) * 100.0 }
716 })
717 },
718 ResourceType::Aggregates => {
719 self.limits.max_aggregates.map(|limit| {
720 if limit == 0 { 0.0 } else { (self.total_aggregates as f64 / limit as f64) * 100.0 }
721 })
722 },
723 ResourceType::ApiCalls => {
724 Some((self.api_calls_today as f64 / 100_000.0) * 100.0)
726 },
727 }
728 }
729
730 pub fn has_resources_near_limit(&self) -> bool {
732 [
733 ResourceType::Events,
734 ResourceType::Storage,
735 ResourceType::Streams,
736 ResourceType::Projections,
737 ResourceType::Aggregates,
738 ResourceType::ApiCalls,
739 ].iter().any(|&resource_type| {
740 self.utilization_percentage(resource_type)
741 .is_some_and(|percentage| percentage > 80.0)
742 })
743 }
744}
745
746pub struct TenantQuota {
748 tenant_id: TenantId,
749 limits: ResourceLimits,
750 tier: QuotaTier,
751 tracker: Arc<RwLock<EnhancedResourceTracker>>,
752 alert_manager: Arc<RwLock<QuotaAlertManager>>,
753 billing_tracker: Arc<RwLock<BillingTracker>>,
754}
755
756impl TenantQuota {
757 pub fn new(tenant_id: TenantId, limits: ResourceLimits) -> Self {
758 Self::with_tier(tenant_id, limits, QuotaTier::Standard)
759 }
760
761 pub fn with_tier(tenant_id: TenantId, limits: ResourceLimits, tier: QuotaTier) -> Self {
762 Self {
763 tenant_id: tenant_id.clone(),
764 limits,
765 tier,
766 tracker: Arc::new(RwLock::new(EnhancedResourceTracker::new())),
767 alert_manager: Arc::new(RwLock::new(QuotaAlertManager::new(tenant_id.clone()))),
768 billing_tracker: Arc::new(RwLock::new(BillingTracker::new(tenant_id))),
769 }
770 }
771
772 pub fn check_quota(&self, resource_type: ResourceType, amount: u64) -> Result<QuotaCheckResult> {
774 let tracker = self.tracker.read().unwrap();
775 let mut result = QuotaCheckResult {
776 allowed: true,
777 current_usage: 0,
778 limit: None,
779 utilization_percentage: 0.0,
780 grace_period_active: false,
781 warning_triggered: false,
782 estimated_overage_cost: 0.0,
783 };
784
785 match resource_type {
786 ResourceType::Events => {
787 if let Some(limit) = self.limits.max_events_per_day {
788 let current_daily = tracker.get_daily_events();
789 result.current_usage = current_daily;
790 result.limit = Some(limit);
791 result.utilization_percentage = (current_daily as f64 / limit as f64) * 100.0;
792
793 if current_daily + amount > limit {
794 let grace_limit = self.calculate_grace_limit(&resource_type, limit);
796 if current_daily + amount <= grace_limit {
797 result.grace_period_active = true;
798 result.estimated_overage_cost = self.calculate_overage_cost(&resource_type, amount);
799 } else {
800 result.allowed = false;
801 return Err(EventualiError::from(QuotaExceeded {
802 tenant_id: self.tenant_id.clone(),
803 resource_type: "daily_events".to_string(),
804 current_usage: current_daily,
805 limit,
806 attempted: amount,
807 }));
808 }
809 }
810 }
811 },
812 ResourceType::ApiCalls => {
813 if let Some(limit) = tracker.api_call_limits.get(&resource_type) {
814 let current = tracker.get_api_calls_today();
815 result.current_usage = current;
816 result.limit = Some(*limit);
817 result.utilization_percentage = (current as f64 / *limit as f64) * 100.0;
818
819 if current + amount > *limit {
820 let grace_limit = self.calculate_grace_limit(&resource_type, *limit);
821 if current + amount <= grace_limit {
822 result.grace_period_active = true;
823 result.estimated_overage_cost = self.calculate_overage_cost(&resource_type, amount);
824 } else {
825 result.allowed = false;
826 return Err(EventualiError::from(QuotaExceeded {
827 tenant_id: self.tenant_id.clone(),
828 resource_type: "api_calls".to_string(),
829 current_usage: current,
830 limit: *limit,
831 attempted: amount,
832 }));
833 }
834 }
835 }
836 },
837 _ => {
839 result.current_usage = 0;
841 result.limit = Some(u64::MAX);
842 result.utilization_percentage = 0.0;
843 }
844 }
845
846 if result.utilization_percentage >= 80.0 && result.utilization_percentage < 90.0 {
848 result.warning_triggered = true;
849 self.trigger_warning_alert(resource_type, result.utilization_percentage);
850 }
851
852 Ok(result)
853 }
854
855 pub fn record_usage(&self, resource_type: ResourceType, amount: u64) {
857 {
858 let mut tracker = self.tracker.write().unwrap();
859 tracker.record_usage(resource_type, amount);
860 }
861
862 {
864 let mut billing_tracker = self.billing_tracker.write().unwrap();
865 billing_tracker.record_usage(resource_type, amount);
866 }
867
868 self.check_and_trigger_alerts(resource_type, amount);
870 }
871
872 pub fn get_usage(&self) -> EnhancedResourceUsage {
874 let tracker = self.tracker.read().unwrap();
875 let billing_tracker = self.billing_tracker.read().unwrap();
876 let alert_manager = self.alert_manager.read().unwrap();
877
878 EnhancedResourceUsage {
879 tenant_id: self.tenant_id.clone(),
880 tier: self.tier.clone(),
881 daily_events: tracker.get_daily_events(),
882 storage_used_mb: tracker.storage_used_mb,
883 concurrent_streams: tracker.concurrent_streams,
884 total_projections: tracker.total_projections,
885 total_aggregates: tracker.total_aggregates,
886 api_calls_today: tracker.get_api_calls_today(),
887 limits: self.limits.clone(),
888 last_updated: tracker.last_updated,
889 usage_trends: tracker.get_usage_trends(),
890 cost_analytics: billing_tracker.get_analytics(),
891 alert_summary: alert_manager.get_summary(),
892 performance_score: self.calculate_performance_score(&tracker),
893 }
894 }
895
896 pub fn get_legacy_usage(&self) -> ResourceUsage {
898 let tracker = self.tracker.read().unwrap();
899
900 ResourceUsage {
901 tenant_id: self.tenant_id.clone(),
902 daily_events: tracker.get_daily_events(),
903 storage_used_mb: tracker.storage_used_mb,
904 concurrent_streams: tracker.concurrent_streams,
905 total_projections: tracker.total_projections,
906 total_aggregates: tracker.total_aggregates,
907 limits: self.limits.clone(),
908 last_updated: tracker.last_updated,
909 }
910 }
911
912 pub fn reset_daily_counters(&self) {
914 {
915 let mut tracker = self.tracker.write().unwrap();
916 tracker.reset_daily_counters();
917 }
918
919 {
920 let mut billing_tracker = self.billing_tracker.write().unwrap();
921 billing_tracker.finalize_daily_billing();
922 }
923 }
924
925 fn calculate_grace_limit(&self, resource_type: &ResourceType, base_limit: u64) -> u64 {
927 let grace_percentage = match (&self.tier, resource_type) {
928 (QuotaTier::Enterprise, _) => 0.20, (QuotaTier::Professional, ResourceType::Events) => 0.15, (QuotaTier::Professional, _) => 0.10, (QuotaTier::Standard, ResourceType::Events) => 0.05, (QuotaTier::Starter, _) => 0.02, _ => 0.0,
934 };
935
936 (base_limit as f64 * (1.0 + grace_percentage)) as u64
937 }
938
939 fn calculate_overage_cost(&self, resource_type: &ResourceType, overage_amount: u64) -> f64 {
941 let cost_per_unit = match (&self.tier, resource_type) {
942 (QuotaTier::Enterprise, ResourceType::Events) => 0.0001,
943 (QuotaTier::Enterprise, ResourceType::ApiCalls) => 0.00005,
944 (QuotaTier::Professional, ResourceType::Events) => 0.0002,
945 (QuotaTier::Professional, ResourceType::ApiCalls) => 0.0001,
946 (QuotaTier::Standard, ResourceType::Events) => 0.0005,
947 (QuotaTier::Standard, ResourceType::ApiCalls) => 0.0002,
948 (QuotaTier::Starter, _) => 0.001,
949 _ => 0.0,
950 };
951
952 overage_amount as f64 * cost_per_unit
953 }
954
955 fn trigger_warning_alert(&self, resource_type: ResourceType, utilization_percentage: f64) {
957 let mut alert_manager = self.alert_manager.write().unwrap();
958 alert_manager.trigger_warning_alert(resource_type, utilization_percentage);
959 }
960
961 fn check_and_trigger_alerts(&self, resource_type: ResourceType, amount: u64) {
963 let mut alert_manager = self.alert_manager.write().unwrap();
964 alert_manager.check_and_trigger_alerts(resource_type, amount);
965 }
966
967 fn calculate_performance_score(&self, tracker: &EnhancedResourceTracker) -> f64 {
969 let mut score = 100.0_f64;
970
971 let utilizations = [
973 tracker.get_events_utilization(&self.limits),
974 tracker.get_storage_utilization(&self.limits),
975 tracker.get_aggregates_utilization(&self.limits),
976 ];
977
978 let avg_utilization = utilizations.iter().sum::<f64>() / utilizations.len() as f64;
979
980 if avg_utilization > 90.0 {
981 score -= 30.0; } else if avg_utilization > 70.0 {
983 score -= 15.0; } else if avg_utilization > 50.0 {
985 score -= 5.0; }
987
988 if tracker.has_stable_usage_pattern() {
990 score += 10.0;
991 }
992
993 score.clamp(0.0, 100.0)
994 }
995
996 pub fn get_billing_analytics(&self) -> BillingAnalytics {
998 let billing_tracker = self.billing_tracker.read().unwrap();
999 billing_tracker.get_analytics()
1000 }
1001
1002 pub fn get_quota_alerts(&self, limit: Option<usize>) -> Vec<QuotaAlert> {
1004 let alert_manager = self.alert_manager.read().unwrap();
1005 alert_manager.get_alerts_history(limit.unwrap_or(100))
1006 }
1007
1008 pub fn update_tier(&mut self, new_tier: QuotaTier) {
1010 self.tier = new_tier.clone();
1011
1012 let mut billing_tracker = self.billing_tracker.write().unwrap();
1014 billing_tracker.update_tier(new_tier);
1015 }
1016}
1017
1018#[derive(Debug, thiserror::Error)]
1020#[error("Quota exceeded for tenant {tenant_id}: {resource_type} - current: {current_usage}, limit: {limit}, attempted: {attempted}")]
1021pub struct QuotaExceeded {
1022 pub tenant_id: TenantId,
1023 pub resource_type: String,
1024 pub current_usage: u64,
1025 pub limit: u64,
1026 pub attempted: u64,
1027}
1028
1029impl From<QuotaExceeded> for crate::error::EventualiError {
1030 fn from(err: QuotaExceeded) -> Self {
1031 crate::error::EventualiError::Tenant(err.to_string())
1032 }
1033}
1034
1035#[derive(Debug, Clone)]
1037pub struct ResourceUsage {
1038 pub tenant_id: TenantId,
1039 pub daily_events: u64,
1040 pub storage_used_mb: f64,
1041 pub concurrent_streams: u32,
1042 pub total_projections: u32,
1043 pub total_aggregates: u64,
1044 pub limits: ResourceLimits,
1045 pub last_updated: DateTime<Utc>,
1046}
1047
1048impl ResourceUsage {
1049 pub fn utilization_percentage(&self, resource_type: ResourceType) -> Option<f64> {
1051 match resource_type {
1052 ResourceType::Events => {
1053 self.limits.max_events_per_day.map(|limit| {
1054 if limit == 0 { 0.0 } else { (self.daily_events as f64 / limit as f64) * 100.0 }
1055 })
1056 },
1057 ResourceType::Storage => {
1058 self.limits.max_storage_mb.map(|limit| {
1059 if limit == 0 { 0.0 } else { (self.storage_used_mb / limit as f64) * 100.0 }
1060 })
1061 },
1062 ResourceType::Streams => {
1063 self.limits.max_concurrent_streams.map(|limit| {
1064 if limit == 0 { 0.0 } else { (self.concurrent_streams as f64 / limit as f64) * 100.0 }
1065 })
1066 },
1067 ResourceType::Projections => {
1068 self.limits.max_projections.map(|limit| {
1069 if limit == 0 { 0.0 } else { (self.total_projections as f64 / limit as f64) * 100.0 }
1070 })
1071 },
1072 ResourceType::Aggregates => {
1073 self.limits.max_aggregates.map(|limit| {
1074 if limit == 0 { 0.0 } else { (self.total_aggregates as f64 / limit as f64) * 100.0 }
1075 })
1076 },
1077 ResourceType::ApiCalls => {
1078 Some((0.0 / 100_000.0) * 100.0) },
1081 }
1082 }
1083
1084 pub fn has_resources_near_limit(&self) -> bool {
1086 [
1087 ResourceType::Events,
1088 ResourceType::Storage,
1089 ResourceType::Streams,
1090 ResourceType::Projections,
1091 ResourceType::Aggregates,
1092 ResourceType::ApiCalls,
1093 ].iter().any(|&resource_type| {
1094 self.utilization_percentage(resource_type)
1095 .is_some_and(|percentage| percentage > 80.0)
1096 })
1097 }
1098}