1use serde::{Deserialize, Serialize};
41use std::collections::HashMap;
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct ConsumerLagAnalysis {
46 pub queue_size: usize,
48 pub processing_rate: f64,
50 pub target_lag_seconds: u64,
52 pub lag_seconds: f64,
54 pub is_lagging: bool,
56 pub recommendation: ScalingRecommendation,
58}
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
62pub enum ScalingRecommendation {
63 ScaleUp { additional_workers: usize },
65 Optimal,
67 ScaleDown { workers_to_remove: usize },
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct MessageVelocity {
74 pub previous_size: usize,
76 pub current_size: usize,
78 pub time_window_secs: f64,
80 pub velocity: f64,
82 pub trend: QueueTrend,
84}
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
88pub enum QueueTrend {
89 RapidGrowth,
91 SlowGrowth,
93 Stable,
95 SlowShrink,
97 RapidShrink,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct WorkerScalingSuggestion {
104 pub queue_size: usize,
106 pub current_workers: usize,
108 pub avg_processing_rate: f64,
110 pub target_lag_seconds: u64,
112 pub recommended_workers: usize,
114 pub action: ScalingRecommendation,
116}
117
118#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct MessageAgeDistribution {
121 pub total_messages: usize,
123 pub min_age_secs: f64,
125 pub max_age_secs: f64,
127 pub avg_age_secs: f64,
129 pub p50_age_secs: f64,
131 pub p95_age_secs: f64,
133 pub p99_age_secs: f64,
135 pub messages_exceeding_sla: usize,
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct ProcessingCapacity {
142 pub workers: usize,
144 pub rate_per_worker: f64,
146 pub total_capacity_per_sec: f64,
148 pub total_capacity_per_min: f64,
150 pub total_capacity_per_hour: f64,
152 pub time_to_clear_backlog_secs: f64,
154}
155
156pub fn analyze_mysql_consumer_lag(
178 queue_size: usize,
179 processing_rate: f64,
180 target_lag_seconds: u64,
181) -> ConsumerLagAnalysis {
182 let lag_seconds = if processing_rate > 0.0 {
183 queue_size as f64 / processing_rate
184 } else {
185 f64::INFINITY
186 };
187
188 let is_lagging = lag_seconds > target_lag_seconds as f64;
189
190 let recommendation = if is_lagging {
191 let target_rate = queue_size as f64 / target_lag_seconds as f64;
192 let additional_capacity_needed = target_rate - processing_rate;
193 let workers_needed = (additional_capacity_needed / processing_rate).ceil() as usize;
194 ScalingRecommendation::ScaleUp {
195 additional_workers: workers_needed.max(1),
196 }
197 } else if lag_seconds < (target_lag_seconds as f64 * 0.5) && queue_size > 0 {
198 let excess_capacity = processing_rate - (queue_size as f64 / target_lag_seconds as f64);
200 let workers_to_remove = (excess_capacity / processing_rate).floor() as usize;
201 if workers_to_remove > 0 {
202 ScalingRecommendation::ScaleDown { workers_to_remove }
203 } else {
204 ScalingRecommendation::Optimal
205 }
206 } else {
207 ScalingRecommendation::Optimal
208 };
209
210 ConsumerLagAnalysis {
211 queue_size,
212 processing_rate,
213 target_lag_seconds,
214 lag_seconds,
215 is_lagging,
216 recommendation,
217 }
218}
219
220pub fn calculate_mysql_message_velocity(
243 previous_size: usize,
244 current_size: usize,
245 time_window_secs: f64,
246) -> MessageVelocity {
247 let velocity = if time_window_secs > 0.0 {
248 (current_size as f64 - previous_size as f64) / time_window_secs
249 } else {
250 0.0
251 };
252
253 let trend = if velocity > 10.0 {
254 QueueTrend::RapidGrowth
255 } else if velocity > 1.0 {
256 QueueTrend::SlowGrowth
257 } else if velocity > -1.0 {
258 QueueTrend::Stable
259 } else if velocity > -10.0 {
260 QueueTrend::SlowShrink
261 } else {
262 QueueTrend::RapidShrink
263 };
264
265 MessageVelocity {
266 previous_size,
267 current_size,
268 time_window_secs,
269 velocity,
270 trend,
271 }
272}
273
274pub fn suggest_mysql_worker_scaling(
297 queue_size: usize,
298 current_workers: usize,
299 avg_processing_rate: f64,
300 target_lag_seconds: u64,
301) -> WorkerScalingSuggestion {
302 let current_total_rate = current_workers as f64 * avg_processing_rate;
303 let target_rate = queue_size as f64 / target_lag_seconds as f64;
304
305 let recommended_workers = if target_rate > current_total_rate {
306 ((target_rate / avg_processing_rate).ceil() as usize).max(1)
307 } else {
308 ((target_rate / avg_processing_rate).floor() as usize).max(1)
309 };
310
311 let action = if recommended_workers > current_workers {
312 ScalingRecommendation::ScaleUp {
313 additional_workers: recommended_workers - current_workers,
314 }
315 } else if recommended_workers < current_workers {
316 ScalingRecommendation::ScaleDown {
317 workers_to_remove: current_workers - recommended_workers,
318 }
319 } else {
320 ScalingRecommendation::Optimal
321 };
322
323 WorkerScalingSuggestion {
324 queue_size,
325 current_workers,
326 avg_processing_rate,
327 target_lag_seconds,
328 recommended_workers,
329 action,
330 }
331}
332
333pub fn calculate_mysql_message_age_distribution(
355 message_ages: &[f64],
356 sla_threshold_secs: f64,
357) -> MessageAgeDistribution {
358 if message_ages.is_empty() {
359 return MessageAgeDistribution {
360 total_messages: 0,
361 min_age_secs: 0.0,
362 max_age_secs: 0.0,
363 avg_age_secs: 0.0,
364 p50_age_secs: 0.0,
365 p95_age_secs: 0.0,
366 p99_age_secs: 0.0,
367 messages_exceeding_sla: 0,
368 };
369 }
370
371 let mut sorted_ages = message_ages.to_vec();
372 sorted_ages.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
373
374 let total_messages = sorted_ages.len();
375 let min_age_secs = sorted_ages[0];
376 let max_age_secs = sorted_ages[total_messages - 1];
377 let avg_age_secs = sorted_ages.iter().sum::<f64>() / total_messages as f64;
378
379 let p50_age_secs = percentile(&sorted_ages, 50.0);
380 let p95_age_secs = percentile(&sorted_ages, 95.0);
381 let p99_age_secs = percentile(&sorted_ages, 99.0);
382
383 let messages_exceeding_sla = sorted_ages
384 .iter()
385 .filter(|&&age| age > sla_threshold_secs)
386 .count();
387
388 MessageAgeDistribution {
389 total_messages,
390 min_age_secs,
391 max_age_secs,
392 avg_age_secs,
393 p50_age_secs,
394 p95_age_secs,
395 p99_age_secs,
396 messages_exceeding_sla,
397 }
398}
399
400pub fn estimate_mysql_processing_capacity(
422 workers: usize,
423 rate_per_worker: f64,
424 current_backlog: usize,
425) -> ProcessingCapacity {
426 let total_capacity_per_sec = workers as f64 * rate_per_worker;
427 let total_capacity_per_min = total_capacity_per_sec * 60.0;
428 let total_capacity_per_hour = total_capacity_per_min * 60.0;
429
430 let time_to_clear_backlog_secs = if total_capacity_per_sec > 0.0 {
431 current_backlog as f64 / total_capacity_per_sec
432 } else {
433 f64::INFINITY
434 };
435
436 ProcessingCapacity {
437 workers,
438 rate_per_worker,
439 total_capacity_per_sec,
440 total_capacity_per_min,
441 total_capacity_per_hour,
442 time_to_clear_backlog_secs,
443 }
444}
445
446pub fn calculate_mysql_queue_health_score(
471 queue_size: usize,
472 processing_rate: f64,
473 max_acceptable_size: usize,
474 target_processing_rate: f64,
475) -> f64 {
476 let size_score = if max_acceptable_size > 0 {
478 1.0 - (queue_size as f64 / max_acceptable_size as f64).min(1.0)
479 } else {
480 1.0
481 };
482
483 let rate_score = if target_processing_rate > 0.0 {
485 (processing_rate / target_processing_rate).min(1.0)
486 } else {
487 1.0
488 };
489
490 (size_score * 0.6) + (rate_score * 0.4)
492}
493
494pub fn analyze_mysql_broker_performance(metrics: &HashMap<String, f64>) -> HashMap<String, String> {
519 let mut analysis = HashMap::new();
520
521 if let Some(&latency) = metrics.get("avg_latency_ms") {
522 let status = if latency < 10.0 {
523 "excellent"
524 } else if latency < 50.0 {
525 "good"
526 } else if latency < 100.0 {
527 "acceptable"
528 } else {
529 "poor"
530 };
531 analysis.insert("latency_status".to_string(), status.to_string());
532 }
533
534 if let Some(&throughput) = metrics.get("throughput_msg_per_sec") {
535 let status = if throughput > 1000.0 {
536 "high"
537 } else if throughput > 100.0 {
538 "medium"
539 } else {
540 "low"
541 };
542 analysis.insert("throughput_status".to_string(), status.to_string());
543 }
544
545 if let Some(&error_rate) = metrics.get("error_rate_percent") {
546 let status = if error_rate < 1.0 {
547 "healthy"
548 } else if error_rate < 5.0 {
549 "warning"
550 } else {
551 "critical"
552 };
553 analysis.insert("error_rate_status".to_string(), status.to_string());
554 }
555
556 analysis
557}
558
559fn percentile(sorted_values: &[f64], p: f64) -> f64 {
561 if sorted_values.is_empty() {
562 return 0.0;
563 }
564
565 let index = (p / 100.0 * (sorted_values.len() - 1) as f64).round() as usize;
566 sorted_values[index.min(sorted_values.len() - 1)]
567}
568
569#[derive(Debug, Clone, Serialize, Deserialize)]
571pub struct MysqlCostAnalysis {
572 pub storage_gb: f64,
574 pub total_iops: f64,
576 pub network_egress_gb_per_day: f64,
578 pub estimated_monthly_cost_usd: f64,
580 pub cost_per_1000_messages: f64,
582 pub optimization_recommendations: Vec<String>,
584}
585
586#[derive(Debug, Clone, Serialize, Deserialize)]
588pub struct SlaComplianceReport {
589 pub total_messages: usize,
591 pub messages_within_sla: usize,
593 pub messages_exceeding_sla: usize,
595 pub compliance_percentage: f64,
597 pub avg_processing_time_secs: f64,
599 pub p95_processing_time_secs: f64,
601 pub p99_processing_time_secs: f64,
603 pub status: SlaStatus,
605}
606
607#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
609pub enum SlaStatus {
610 Compliant,
612 Warning,
614 Violation,
616}
617
618#[derive(Debug, Clone, Serialize, Deserialize)]
620pub struct AlertThresholds {
621 pub queue_size_warning: usize,
623 pub queue_size_critical: usize,
625 pub lag_warning_secs: u64,
627 pub lag_critical_secs: u64,
629 pub error_rate_warning_percent: f64,
631 pub error_rate_critical_percent: f64,
633 pub dlq_size_warning: usize,
635 pub dlq_size_critical: usize,
637}
638
639#[derive(Debug, Clone, Serialize, Deserialize)]
641pub struct CapacityForecast {
642 pub current_capacity_per_hour: f64,
644 pub projected_load_per_hour: f64,
646 pub utilization_percent: f64,
648 pub time_to_exhaustion_hours: Option<f64>,
650 pub recommended_additional_workers: usize,
652 pub status: CapacityStatus,
654}
655
656#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
658pub enum CapacityStatus {
659 Sufficient,
661 Warning,
663 Critical,
665 Exceeded,
667}
668
669#[allow(clippy::too_many_arguments)]
702pub fn estimate_mysql_operational_cost(
703 storage_gb: f64,
704 total_iops: f64,
705 network_egress_gb_per_day: f64,
706 messages_per_day: usize,
707 storage_cost_per_gb: f64,
708 iops_cost_per_1000: f64,
709 network_cost_per_gb: f64,
710) -> MysqlCostAnalysis {
711 let monthly_storage_cost = storage_gb * storage_cost_per_gb;
713
714 let iops_per_month = total_iops * 60.0 * 60.0 * 24.0 * 30.0;
716 let monthly_iops_cost = (iops_per_month / 1000.0) * iops_cost_per_1000;
717
718 let monthly_network_cost = network_egress_gb_per_day * 30.0 * network_cost_per_gb;
720
721 let estimated_monthly_cost_usd =
722 monthly_storage_cost + monthly_iops_cost + monthly_network_cost;
723
724 let messages_per_month = messages_per_day * 30;
725 let cost_per_1000_messages = if messages_per_month > 0 {
726 (estimated_monthly_cost_usd / messages_per_month as f64) * 1000.0
727 } else {
728 0.0
729 };
730
731 let mut optimization_recommendations = Vec::new();
732
733 if storage_gb > 500.0 {
735 optimization_recommendations.push(
736 "Consider implementing data archival policy for completed tasks older than 30 days"
737 .to_string(),
738 );
739 }
740 if storage_gb > 1000.0 {
741 optimization_recommendations.push(
742 "Large database: consider table partitioning to improve query performance and enable efficient archival".to_string()
743 );
744 }
745
746 if total_iops > 10000.0 {
748 optimization_recommendations.push(
749 "High IOPS: consider batch operations to reduce database round-trips".to_string(),
750 );
751 optimization_recommendations.push(
752 "Review indexes to ensure optimal query performance and reduce unnecessary scans"
753 .to_string(),
754 );
755 }
756
757 if network_egress_gb_per_day > 100.0 {
759 optimization_recommendations.push(
760 "High network egress: consider payload compression for large task data".to_string(),
761 );
762 }
763
764 if cost_per_1000_messages > 1.0 {
766 optimization_recommendations.push(
767 "High cost per message: review task retention policies and optimize query patterns"
768 .to_string(),
769 );
770 }
771
772 MysqlCostAnalysis {
773 storage_gb,
774 total_iops,
775 network_egress_gb_per_day,
776 estimated_monthly_cost_usd,
777 cost_per_1000_messages,
778 optimization_recommendations,
779 }
780}
781
782pub fn calculate_sla_compliance(
804 processing_times_secs: &[f64],
805 sla_threshold_secs: f64,
806) -> SlaComplianceReport {
807 if processing_times_secs.is_empty() {
808 return SlaComplianceReport {
809 total_messages: 0,
810 messages_within_sla: 0,
811 messages_exceeding_sla: 0,
812 compliance_percentage: 0.0,
813 avg_processing_time_secs: 0.0,
814 p95_processing_time_secs: 0.0,
815 p99_processing_time_secs: 0.0,
816 status: SlaStatus::Violation,
817 };
818 }
819
820 let total_messages = processing_times_secs.len();
821 let messages_within_sla = processing_times_secs
822 .iter()
823 .filter(|&&time| time <= sla_threshold_secs)
824 .count();
825 let messages_exceeding_sla = total_messages - messages_within_sla;
826
827 let compliance_percentage = (messages_within_sla as f64 / total_messages as f64) * 100.0;
828
829 let mut sorted_times = processing_times_secs.to_vec();
830 sorted_times.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
831
832 let avg_processing_time_secs = sorted_times.iter().sum::<f64>() / total_messages as f64;
833 let p95_processing_time_secs = percentile(&sorted_times, 95.0);
834 let p99_processing_time_secs = percentile(&sorted_times, 99.0);
835
836 let status = if compliance_percentage >= 99.0 {
837 SlaStatus::Compliant
838 } else if compliance_percentage >= 95.0 {
839 SlaStatus::Warning
840 } else {
841 SlaStatus::Violation
842 };
843
844 SlaComplianceReport {
845 total_messages,
846 messages_within_sla,
847 messages_exceeding_sla,
848 compliance_percentage,
849 avg_processing_time_secs,
850 p95_processing_time_secs,
851 p99_processing_time_secs,
852 status,
853 }
854}
855
856pub fn calculate_alert_thresholds(
879 avg_queue_size: usize,
880 max_queue_size: usize,
881 _avg_processing_rate: f64,
882 target_lag_secs: u64,
883) -> AlertThresholds {
884 let queue_size_warning = ((avg_queue_size as f64 * 2.0) as usize).min(max_queue_size);
886 let queue_size_critical = ((avg_queue_size as f64 * 5.0) as usize).min(max_queue_size * 2);
887
888 let lag_warning_secs = target_lag_secs * 2;
890 let lag_critical_secs = target_lag_secs * 5;
891
892 let error_rate_warning_percent = 1.0;
894 let error_rate_critical_percent = 5.0;
895
896 let dlq_size_warning = (avg_queue_size as f64 * 0.1) as usize;
898 let dlq_size_critical = (avg_queue_size as f64 * 0.5) as usize;
899
900 AlertThresholds {
901 queue_size_warning,
902 queue_size_critical,
903 lag_warning_secs,
904 lag_critical_secs,
905 error_rate_warning_percent,
906 error_rate_critical_percent,
907 dlq_size_warning,
908 dlq_size_critical,
909 }
910}
911
912pub fn forecast_capacity_needs(
935 current_load_per_hour: f64,
936 growth_rate_percent: f64,
937 forecast_horizon_days: u64,
938 current_workers: usize,
939 processing_rate_per_worker: f64,
940) -> CapacityForecast {
941 let current_capacity_per_hour = current_workers as f64 * processing_rate_per_worker;
942
943 let growth_multiplier = 1.0 + (growth_rate_percent / 100.0);
945 let days_factor = forecast_horizon_days as f64 / 30.0; let projected_load_per_hour = current_load_per_hour * growth_multiplier.powf(days_factor);
947
948 let utilization_percent = (projected_load_per_hour / current_capacity_per_hour) * 100.0;
949
950 let time_to_exhaustion_hours = if projected_load_per_hour > current_capacity_per_hour {
951 let daily_growth = current_load_per_hour * (growth_multiplier.powf(1.0 / 30.0) - 1.0);
953 if daily_growth > 0.0 {
954 Some((current_capacity_per_hour - current_load_per_hour) / daily_growth * 24.0)
955 } else {
956 None
957 }
958 } else {
959 None
960 };
961
962 let recommended_additional_workers = if utilization_percent > 80.0 {
963 let needed_capacity = projected_load_per_hour * 1.2; let total_workers_needed = (needed_capacity / processing_rate_per_worker).ceil() as usize;
965 total_workers_needed.saturating_sub(current_workers)
966 } else {
967 0
968 };
969
970 let status = if utilization_percent > 100.0 {
971 CapacityStatus::Exceeded
972 } else if utilization_percent > 80.0 {
973 CapacityStatus::Critical
974 } else if utilization_percent > 60.0 {
975 CapacityStatus::Warning
976 } else {
977 CapacityStatus::Sufficient
978 };
979
980 CapacityForecast {
981 current_capacity_per_hour,
982 projected_load_per_hour,
983 utilization_percent,
984 time_to_exhaustion_hours,
985 recommended_additional_workers,
986 status,
987 }
988}
989
990#[cfg(test)]
991mod tests {
992 use super::*;
993
994 #[test]
995 fn test_analyze_consumer_lag_optimal() {
996 let lag = analyze_mysql_consumer_lag(100, 50.0, 10);
997 assert_eq!(lag.queue_size, 100);
998 assert_eq!(lag.processing_rate, 50.0);
999 assert_eq!(lag.lag_seconds, 2.0);
1000 assert!(!lag.is_lagging);
1001 assert_eq!(lag.recommendation, ScalingRecommendation::Optimal);
1002 }
1003
1004 #[test]
1005 fn test_analyze_consumer_lag_needs_scale_up() {
1006 let lag = analyze_mysql_consumer_lag(1000, 5.0, 10);
1007 assert!(lag.is_lagging);
1008 assert!(matches!(
1009 lag.recommendation,
1010 ScalingRecommendation::ScaleUp { .. }
1011 ));
1012 }
1013
1014 #[test]
1015 fn test_calculate_message_velocity_growing() {
1016 let velocity = calculate_mysql_message_velocity(1000, 1600, 60.0);
1017 assert_eq!(velocity.velocity, 10.0);
1018 assert_eq!(velocity.trend, QueueTrend::SlowGrowth);
1019 }
1020
1021 #[test]
1022 fn test_calculate_message_velocity_stable() {
1023 let velocity = calculate_mysql_message_velocity(1000, 1010, 60.0);
1024 assert!(velocity.velocity < 1.0);
1025 assert_eq!(velocity.trend, QueueTrend::Stable);
1026 }
1027
1028 #[test]
1029 fn test_suggest_worker_scaling() {
1030 let scaling = suggest_mysql_worker_scaling(2000, 5, 40.0, 100);
1031 assert_eq!(scaling.current_workers, 5);
1032 assert!(scaling.recommended_workers >= 1);
1033 }
1034
1035 #[test]
1036 fn test_message_age_distribution() {
1037 let ages = vec![10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0];
1038 let dist = calculate_mysql_message_age_distribution(&ages, 75.0);
1039 assert_eq!(dist.total_messages, 10);
1040 assert_eq!(dist.min_age_secs, 10.0);
1041 assert_eq!(dist.max_age_secs, 100.0);
1042 assert_eq!(dist.messages_exceeding_sla, 3);
1043 }
1044
1045 #[test]
1046 fn test_message_age_distribution_empty() {
1047 let ages = vec![];
1048 let dist = calculate_mysql_message_age_distribution(&ages, 60.0);
1049 assert_eq!(dist.total_messages, 0);
1050 }
1051
1052 #[test]
1053 fn test_estimate_processing_capacity() {
1054 let capacity = estimate_mysql_processing_capacity(10, 50.0, 5000);
1055 assert_eq!(capacity.workers, 10);
1056 assert_eq!(capacity.total_capacity_per_sec, 500.0);
1057 assert_eq!(capacity.total_capacity_per_min, 30000.0);
1058 assert_eq!(capacity.time_to_clear_backlog_secs, 10.0);
1059 }
1060
1061 #[test]
1062 fn test_calculate_queue_health_score() {
1063 let score = calculate_mysql_queue_health_score(100, 50.0, 1000, 40.0);
1064 assert!(score > 0.5);
1065 assert!(score <= 1.0);
1066 }
1067
1068 #[test]
1069 fn test_analyze_broker_performance() {
1070 let mut metrics = HashMap::new();
1071 metrics.insert("avg_latency_ms".to_string(), 25.0);
1072 metrics.insert("throughput_msg_per_sec".to_string(), 500.0);
1073 metrics.insert("error_rate_percent".to_string(), 0.5);
1074
1075 let analysis = analyze_mysql_broker_performance(&metrics);
1076 assert_eq!(analysis.get("latency_status"), Some(&"good".to_string()));
1077 assert_eq!(
1078 analysis.get("throughput_status"),
1079 Some(&"medium".to_string())
1080 );
1081 assert_eq!(
1082 analysis.get("error_rate_status"),
1083 Some(&"healthy".to_string())
1084 );
1085 }
1086
1087 #[test]
1088 fn test_estimate_operational_cost() {
1089 let cost = estimate_mysql_operational_cost(
1090 100.0, 5000.0, 50.0, 1_000_000, 0.10, 0.10, 0.09, );
1098 assert!(cost.estimated_monthly_cost_usd > 0.0);
1099 assert!(cost.cost_per_1000_messages > 0.0);
1100 assert_eq!(cost.storage_gb, 100.0);
1101 }
1102
1103 #[test]
1104 fn test_estimate_operational_cost_high_storage() {
1105 let cost = estimate_mysql_operational_cost(
1106 1200.0, 10000.0, 50.0, 1_000_000, 0.10, 0.10, 0.09,
1108 );
1109 assert!(!cost.optimization_recommendations.is_empty());
1111 assert!(cost
1112 .optimization_recommendations
1113 .iter()
1114 .any(|r| r.contains("partition")));
1115 }
1116
1117 #[test]
1118 fn test_calculate_sla_compliance_compliant() {
1119 let times = vec![5.0, 10.0, 15.0, 20.0, 25.0];
1120 let report = calculate_sla_compliance(×, 30.0);
1121 assert_eq!(report.total_messages, 5);
1122 assert_eq!(report.messages_within_sla, 5);
1123 assert_eq!(report.messages_exceeding_sla, 0);
1124 assert_eq!(report.compliance_percentage, 100.0);
1125 assert_eq!(report.status, SlaStatus::Compliant);
1126 }
1127
1128 #[test]
1129 fn test_calculate_sla_compliance_violation() {
1130 let times = vec![5.0, 10.0, 15.0, 20.0, 25.0, 100.0, 150.0]; let report = calculate_sla_compliance(×, 30.0);
1132 assert_eq!(report.total_messages, 7);
1133 assert_eq!(report.messages_within_sla, 5);
1134 assert_eq!(report.messages_exceeding_sla, 2);
1135 assert!(report.compliance_percentage < 95.0);
1136 assert_eq!(report.status, SlaStatus::Violation);
1137 }
1138
1139 #[test]
1140 fn test_calculate_sla_compliance_empty() {
1141 let times = vec![];
1142 let report = calculate_sla_compliance(×, 30.0);
1143 assert_eq!(report.total_messages, 0);
1144 assert_eq!(report.status, SlaStatus::Violation);
1145 }
1146
1147 #[test]
1148 fn test_calculate_alert_thresholds() {
1149 let thresholds = calculate_alert_thresholds(100, 1000, 50.0, 60);
1150 assert!(thresholds.queue_size_warning > 0);
1151 assert!(thresholds.queue_size_critical > thresholds.queue_size_warning);
1152 assert_eq!(thresholds.lag_warning_secs, 120);
1153 assert_eq!(thresholds.lag_critical_secs, 300);
1154 assert_eq!(thresholds.error_rate_warning_percent, 1.0);
1155 assert_eq!(thresholds.error_rate_critical_percent, 5.0);
1156 }
1157
1158 #[test]
1159 fn test_forecast_capacity_sufficient() {
1160 let forecast = forecast_capacity_needs(
1161 5000.0, 10.0, 30, 10, 1500.0, );
1167 assert_eq!(forecast.current_capacity_per_hour, 15000.0);
1168 assert!(forecast.projected_load_per_hour > 5000.0);
1169 assert_eq!(forecast.status, CapacityStatus::Sufficient);
1171 assert_eq!(forecast.recommended_additional_workers, 0);
1172 }
1173
1174 #[test]
1175 fn test_forecast_capacity_exceeded() {
1176 let forecast = forecast_capacity_needs(
1177 10000.0, 50.0, 60, 5, 1000.0, );
1183 assert_eq!(forecast.current_capacity_per_hour, 5000.0);
1184 assert!(forecast.projected_load_per_hour > forecast.current_capacity_per_hour);
1185 assert_eq!(forecast.status, CapacityStatus::Exceeded);
1186 assert!(forecast.recommended_additional_workers > 0);
1187 }
1188
1189 #[test]
1190 fn test_forecast_capacity_warning() {
1191 let forecast = forecast_capacity_needs(
1192 8000.0, 20.0, 30, 10, 1200.0, );
1198 assert!(forecast.utilization_percent > 60.0);
1200 assert!(forecast.utilization_percent <= 80.0);
1201 assert_eq!(forecast.status, CapacityStatus::Warning);
1202 }
1203}