use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConsumerLagAnalysis {
pub queue_size: usize,
pub processing_rate: f64,
pub target_lag_seconds: u64,
pub lag_seconds: f64,
pub is_lagging: bool,
pub recommendation: ScalingRecommendation,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ScalingRecommendation {
ScaleUp { additional_workers: usize },
Optimal,
ScaleDown { workers_to_remove: usize },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageVelocity {
pub previous_size: usize,
pub current_size: usize,
pub time_window_secs: f64,
pub velocity: f64,
pub trend: QueueTrend,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum QueueTrend {
RapidGrowth,
SlowGrowth,
Stable,
SlowShrink,
RapidShrink,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkerScalingSuggestion {
pub queue_size: usize,
pub current_workers: usize,
pub avg_processing_rate: f64,
pub target_lag_seconds: u64,
pub recommended_workers: usize,
pub action: ScalingRecommendation,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageAgeDistribution {
pub total_messages: usize,
pub min_age_secs: f64,
pub max_age_secs: f64,
pub avg_age_secs: f64,
pub p50_age_secs: f64,
pub p95_age_secs: f64,
pub p99_age_secs: f64,
pub messages_exceeding_sla: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProcessingCapacity {
pub workers: usize,
pub rate_per_worker: f64,
pub total_capacity_per_sec: f64,
pub total_capacity_per_min: f64,
pub total_capacity_per_hour: f64,
pub time_to_clear_backlog_secs: f64,
}
pub fn analyze_mysql_consumer_lag(
queue_size: usize,
processing_rate: f64,
target_lag_seconds: u64,
) -> ConsumerLagAnalysis {
let lag_seconds = if processing_rate > 0.0 {
queue_size as f64 / processing_rate
} else {
f64::INFINITY
};
let is_lagging = lag_seconds > target_lag_seconds as f64;
let recommendation = if is_lagging {
let target_rate = queue_size as f64 / target_lag_seconds as f64;
let additional_capacity_needed = target_rate - processing_rate;
let workers_needed = (additional_capacity_needed / processing_rate).ceil() as usize;
ScalingRecommendation::ScaleUp {
additional_workers: workers_needed.max(1),
}
} else if lag_seconds < (target_lag_seconds as f64 * 0.5) && queue_size > 0 {
let excess_capacity = processing_rate - (queue_size as f64 / target_lag_seconds as f64);
let workers_to_remove = (excess_capacity / processing_rate).floor() as usize;
if workers_to_remove > 0 {
ScalingRecommendation::ScaleDown { workers_to_remove }
} else {
ScalingRecommendation::Optimal
}
} else {
ScalingRecommendation::Optimal
};
ConsumerLagAnalysis {
queue_size,
processing_rate,
target_lag_seconds,
lag_seconds,
is_lagging,
recommendation,
}
}
pub fn calculate_mysql_message_velocity(
previous_size: usize,
current_size: usize,
time_window_secs: f64,
) -> MessageVelocity {
let velocity = if time_window_secs > 0.0 {
(current_size as f64 - previous_size as f64) / time_window_secs
} else {
0.0
};
let trend = if velocity > 10.0 {
QueueTrend::RapidGrowth
} else if velocity > 1.0 {
QueueTrend::SlowGrowth
} else if velocity > -1.0 {
QueueTrend::Stable
} else if velocity > -10.0 {
QueueTrend::SlowShrink
} else {
QueueTrend::RapidShrink
};
MessageVelocity {
previous_size,
current_size,
time_window_secs,
velocity,
trend,
}
}
pub fn suggest_mysql_worker_scaling(
queue_size: usize,
current_workers: usize,
avg_processing_rate: f64,
target_lag_seconds: u64,
) -> WorkerScalingSuggestion {
let current_total_rate = current_workers as f64 * avg_processing_rate;
let target_rate = queue_size as f64 / target_lag_seconds as f64;
let recommended_workers = if target_rate > current_total_rate {
((target_rate / avg_processing_rate).ceil() as usize).max(1)
} else {
((target_rate / avg_processing_rate).floor() as usize).max(1)
};
let action = if recommended_workers > current_workers {
ScalingRecommendation::ScaleUp {
additional_workers: recommended_workers - current_workers,
}
} else if recommended_workers < current_workers {
ScalingRecommendation::ScaleDown {
workers_to_remove: current_workers - recommended_workers,
}
} else {
ScalingRecommendation::Optimal
};
WorkerScalingSuggestion {
queue_size,
current_workers,
avg_processing_rate,
target_lag_seconds,
recommended_workers,
action,
}
}
pub fn calculate_mysql_message_age_distribution(
message_ages: &[f64],
sla_threshold_secs: f64,
) -> MessageAgeDistribution {
if message_ages.is_empty() {
return MessageAgeDistribution {
total_messages: 0,
min_age_secs: 0.0,
max_age_secs: 0.0,
avg_age_secs: 0.0,
p50_age_secs: 0.0,
p95_age_secs: 0.0,
p99_age_secs: 0.0,
messages_exceeding_sla: 0,
};
}
let mut sorted_ages = message_ages.to_vec();
sorted_ages.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let total_messages = sorted_ages.len();
let min_age_secs = sorted_ages[0];
let max_age_secs = sorted_ages[total_messages - 1];
let avg_age_secs = sorted_ages.iter().sum::<f64>() / total_messages as f64;
let p50_age_secs = percentile(&sorted_ages, 50.0);
let p95_age_secs = percentile(&sorted_ages, 95.0);
let p99_age_secs = percentile(&sorted_ages, 99.0);
let messages_exceeding_sla = sorted_ages
.iter()
.filter(|&&age| age > sla_threshold_secs)
.count();
MessageAgeDistribution {
total_messages,
min_age_secs,
max_age_secs,
avg_age_secs,
p50_age_secs,
p95_age_secs,
p99_age_secs,
messages_exceeding_sla,
}
}
pub fn estimate_mysql_processing_capacity(
workers: usize,
rate_per_worker: f64,
current_backlog: usize,
) -> ProcessingCapacity {
let total_capacity_per_sec = workers as f64 * rate_per_worker;
let total_capacity_per_min = total_capacity_per_sec * 60.0;
let total_capacity_per_hour = total_capacity_per_min * 60.0;
let time_to_clear_backlog_secs = if total_capacity_per_sec > 0.0 {
current_backlog as f64 / total_capacity_per_sec
} else {
f64::INFINITY
};
ProcessingCapacity {
workers,
rate_per_worker,
total_capacity_per_sec,
total_capacity_per_min,
total_capacity_per_hour,
time_to_clear_backlog_secs,
}
}
pub fn calculate_mysql_queue_health_score(
queue_size: usize,
processing_rate: f64,
max_acceptable_size: usize,
target_processing_rate: f64,
) -> f64 {
let size_score = if max_acceptable_size > 0 {
1.0 - (queue_size as f64 / max_acceptable_size as f64).min(1.0)
} else {
1.0
};
let rate_score = if target_processing_rate > 0.0 {
(processing_rate / target_processing_rate).min(1.0)
} else {
1.0
};
(size_score * 0.6) + (rate_score * 0.4)
}
pub fn analyze_mysql_broker_performance(metrics: &HashMap<String, f64>) -> HashMap<String, String> {
let mut analysis = HashMap::new();
if let Some(&latency) = metrics.get("avg_latency_ms") {
let status = if latency < 10.0 {
"excellent"
} else if latency < 50.0 {
"good"
} else if latency < 100.0 {
"acceptable"
} else {
"poor"
};
analysis.insert("latency_status".to_string(), status.to_string());
}
if let Some(&throughput) = metrics.get("throughput_msg_per_sec") {
let status = if throughput > 1000.0 {
"high"
} else if throughput > 100.0 {
"medium"
} else {
"low"
};
analysis.insert("throughput_status".to_string(), status.to_string());
}
if let Some(&error_rate) = metrics.get("error_rate_percent") {
let status = if error_rate < 1.0 {
"healthy"
} else if error_rate < 5.0 {
"warning"
} else {
"critical"
};
analysis.insert("error_rate_status".to_string(), status.to_string());
}
analysis
}
fn percentile(sorted_values: &[f64], p: f64) -> f64 {
if sorted_values.is_empty() {
return 0.0;
}
let index = (p / 100.0 * (sorted_values.len() - 1) as f64).round() as usize;
sorted_values[index.min(sorted_values.len() - 1)]
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MysqlCostAnalysis {
pub storage_gb: f64,
pub total_iops: f64,
pub network_egress_gb_per_day: f64,
pub estimated_monthly_cost_usd: f64,
pub cost_per_1000_messages: f64,
pub optimization_recommendations: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SlaComplianceReport {
pub total_messages: usize,
pub messages_within_sla: usize,
pub messages_exceeding_sla: usize,
pub compliance_percentage: f64,
pub avg_processing_time_secs: f64,
pub p95_processing_time_secs: f64,
pub p99_processing_time_secs: f64,
pub status: SlaStatus,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum SlaStatus {
Compliant,
Warning,
Violation,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertThresholds {
pub queue_size_warning: usize,
pub queue_size_critical: usize,
pub lag_warning_secs: u64,
pub lag_critical_secs: u64,
pub error_rate_warning_percent: f64,
pub error_rate_critical_percent: f64,
pub dlq_size_warning: usize,
pub dlq_size_critical: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CapacityForecast {
pub current_capacity_per_hour: f64,
pub projected_load_per_hour: f64,
pub utilization_percent: f64,
pub time_to_exhaustion_hours: Option<f64>,
pub recommended_additional_workers: usize,
pub status: CapacityStatus,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum CapacityStatus {
Sufficient,
Warning,
Critical,
Exceeded,
}
#[allow(clippy::too_many_arguments)]
pub fn estimate_mysql_operational_cost(
storage_gb: f64,
total_iops: f64,
network_egress_gb_per_day: f64,
messages_per_day: usize,
storage_cost_per_gb: f64,
iops_cost_per_1000: f64,
network_cost_per_gb: f64,
) -> MysqlCostAnalysis {
let monthly_storage_cost = storage_gb * storage_cost_per_gb;
let iops_per_month = total_iops * 60.0 * 60.0 * 24.0 * 30.0;
let monthly_iops_cost = (iops_per_month / 1000.0) * iops_cost_per_1000;
let monthly_network_cost = network_egress_gb_per_day * 30.0 * network_cost_per_gb;
let estimated_monthly_cost_usd =
monthly_storage_cost + monthly_iops_cost + monthly_network_cost;
let messages_per_month = messages_per_day * 30;
let cost_per_1000_messages = if messages_per_month > 0 {
(estimated_monthly_cost_usd / messages_per_month as f64) * 1000.0
} else {
0.0
};
let mut optimization_recommendations = Vec::new();
if storage_gb > 500.0 {
optimization_recommendations.push(
"Consider implementing data archival policy for completed tasks older than 30 days"
.to_string(),
);
}
if storage_gb > 1000.0 {
optimization_recommendations.push(
"Large database: consider table partitioning to improve query performance and enable efficient archival".to_string()
);
}
if total_iops > 10000.0 {
optimization_recommendations.push(
"High IOPS: consider batch operations to reduce database round-trips".to_string(),
);
optimization_recommendations.push(
"Review indexes to ensure optimal query performance and reduce unnecessary scans"
.to_string(),
);
}
if network_egress_gb_per_day > 100.0 {
optimization_recommendations.push(
"High network egress: consider payload compression for large task data".to_string(),
);
}
if cost_per_1000_messages > 1.0 {
optimization_recommendations.push(
"High cost per message: review task retention policies and optimize query patterns"
.to_string(),
);
}
MysqlCostAnalysis {
storage_gb,
total_iops,
network_egress_gb_per_day,
estimated_monthly_cost_usd,
cost_per_1000_messages,
optimization_recommendations,
}
}
pub fn calculate_sla_compliance(
processing_times_secs: &[f64],
sla_threshold_secs: f64,
) -> SlaComplianceReport {
if processing_times_secs.is_empty() {
return SlaComplianceReport {
total_messages: 0,
messages_within_sla: 0,
messages_exceeding_sla: 0,
compliance_percentage: 0.0,
avg_processing_time_secs: 0.0,
p95_processing_time_secs: 0.0,
p99_processing_time_secs: 0.0,
status: SlaStatus::Violation,
};
}
let total_messages = processing_times_secs.len();
let messages_within_sla = processing_times_secs
.iter()
.filter(|&&time| time <= sla_threshold_secs)
.count();
let messages_exceeding_sla = total_messages - messages_within_sla;
let compliance_percentage = (messages_within_sla as f64 / total_messages as f64) * 100.0;
let mut sorted_times = processing_times_secs.to_vec();
sorted_times.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let avg_processing_time_secs = sorted_times.iter().sum::<f64>() / total_messages as f64;
let p95_processing_time_secs = percentile(&sorted_times, 95.0);
let p99_processing_time_secs = percentile(&sorted_times, 99.0);
let status = if compliance_percentage >= 99.0 {
SlaStatus::Compliant
} else if compliance_percentage >= 95.0 {
SlaStatus::Warning
} else {
SlaStatus::Violation
};
SlaComplianceReport {
total_messages,
messages_within_sla,
messages_exceeding_sla,
compliance_percentage,
avg_processing_time_secs,
p95_processing_time_secs,
p99_processing_time_secs,
status,
}
}
pub fn calculate_alert_thresholds(
avg_queue_size: usize,
max_queue_size: usize,
_avg_processing_rate: f64,
target_lag_secs: u64,
) -> AlertThresholds {
let queue_size_warning = ((avg_queue_size as f64 * 2.0) as usize).min(max_queue_size);
let queue_size_critical = ((avg_queue_size as f64 * 5.0) as usize).min(max_queue_size * 2);
let lag_warning_secs = target_lag_secs * 2;
let lag_critical_secs = target_lag_secs * 5;
let error_rate_warning_percent = 1.0;
let error_rate_critical_percent = 5.0;
let dlq_size_warning = (avg_queue_size as f64 * 0.1) as usize;
let dlq_size_critical = (avg_queue_size as f64 * 0.5) as usize;
AlertThresholds {
queue_size_warning,
queue_size_critical,
lag_warning_secs,
lag_critical_secs,
error_rate_warning_percent,
error_rate_critical_percent,
dlq_size_warning,
dlq_size_critical,
}
}
pub fn forecast_capacity_needs(
current_load_per_hour: f64,
growth_rate_percent: f64,
forecast_horizon_days: u64,
current_workers: usize,
processing_rate_per_worker: f64,
) -> CapacityForecast {
let current_capacity_per_hour = current_workers as f64 * processing_rate_per_worker;
let growth_multiplier = 1.0 + (growth_rate_percent / 100.0);
let days_factor = forecast_horizon_days as f64 / 30.0; let projected_load_per_hour = current_load_per_hour * growth_multiplier.powf(days_factor);
let utilization_percent = (projected_load_per_hour / current_capacity_per_hour) * 100.0;
let time_to_exhaustion_hours = if projected_load_per_hour > current_capacity_per_hour {
let daily_growth = current_load_per_hour * (growth_multiplier.powf(1.0 / 30.0) - 1.0);
if daily_growth > 0.0 {
Some((current_capacity_per_hour - current_load_per_hour) / daily_growth * 24.0)
} else {
None
}
} else {
None
};
let recommended_additional_workers = if utilization_percent > 80.0 {
let needed_capacity = projected_load_per_hour * 1.2; let total_workers_needed = (needed_capacity / processing_rate_per_worker).ceil() as usize;
total_workers_needed.saturating_sub(current_workers)
} else {
0
};
let status = if utilization_percent > 100.0 {
CapacityStatus::Exceeded
} else if utilization_percent > 80.0 {
CapacityStatus::Critical
} else if utilization_percent > 60.0 {
CapacityStatus::Warning
} else {
CapacityStatus::Sufficient
};
CapacityForecast {
current_capacity_per_hour,
projected_load_per_hour,
utilization_percent,
time_to_exhaustion_hours,
recommended_additional_workers,
status,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_analyze_consumer_lag_optimal() {
let lag = analyze_mysql_consumer_lag(100, 50.0, 10);
assert_eq!(lag.queue_size, 100);
assert_eq!(lag.processing_rate, 50.0);
assert_eq!(lag.lag_seconds, 2.0);
assert!(!lag.is_lagging);
assert_eq!(lag.recommendation, ScalingRecommendation::Optimal);
}
#[test]
fn test_analyze_consumer_lag_needs_scale_up() {
let lag = analyze_mysql_consumer_lag(1000, 5.0, 10);
assert!(lag.is_lagging);
assert!(matches!(
lag.recommendation,
ScalingRecommendation::ScaleUp { .. }
));
}
#[test]
fn test_calculate_message_velocity_growing() {
let velocity = calculate_mysql_message_velocity(1000, 1600, 60.0);
assert_eq!(velocity.velocity, 10.0);
assert_eq!(velocity.trend, QueueTrend::SlowGrowth);
}
#[test]
fn test_calculate_message_velocity_stable() {
let velocity = calculate_mysql_message_velocity(1000, 1010, 60.0);
assert!(velocity.velocity < 1.0);
assert_eq!(velocity.trend, QueueTrend::Stable);
}
#[test]
fn test_suggest_worker_scaling() {
let scaling = suggest_mysql_worker_scaling(2000, 5, 40.0, 100);
assert_eq!(scaling.current_workers, 5);
assert!(scaling.recommended_workers >= 1);
}
#[test]
fn test_message_age_distribution() {
let ages = vec![10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0];
let dist = calculate_mysql_message_age_distribution(&ages, 75.0);
assert_eq!(dist.total_messages, 10);
assert_eq!(dist.min_age_secs, 10.0);
assert_eq!(dist.max_age_secs, 100.0);
assert_eq!(dist.messages_exceeding_sla, 3);
}
#[test]
fn test_message_age_distribution_empty() {
let ages = vec![];
let dist = calculate_mysql_message_age_distribution(&ages, 60.0);
assert_eq!(dist.total_messages, 0);
}
#[test]
fn test_estimate_processing_capacity() {
let capacity = estimate_mysql_processing_capacity(10, 50.0, 5000);
assert_eq!(capacity.workers, 10);
assert_eq!(capacity.total_capacity_per_sec, 500.0);
assert_eq!(capacity.total_capacity_per_min, 30000.0);
assert_eq!(capacity.time_to_clear_backlog_secs, 10.0);
}
#[test]
fn test_calculate_queue_health_score() {
let score = calculate_mysql_queue_health_score(100, 50.0, 1000, 40.0);
assert!(score > 0.5);
assert!(score <= 1.0);
}
#[test]
fn test_analyze_broker_performance() {
let mut metrics = HashMap::new();
metrics.insert("avg_latency_ms".to_string(), 25.0);
metrics.insert("throughput_msg_per_sec".to_string(), 500.0);
metrics.insert("error_rate_percent".to_string(), 0.5);
let analysis = analyze_mysql_broker_performance(&metrics);
assert_eq!(analysis.get("latency_status"), Some(&"good".to_string()));
assert_eq!(
analysis.get("throughput_status"),
Some(&"medium".to_string())
);
assert_eq!(
analysis.get("error_rate_status"),
Some(&"healthy".to_string())
);
}
#[test]
fn test_estimate_operational_cost() {
let cost = estimate_mysql_operational_cost(
100.0, 5000.0, 50.0, 1_000_000, 0.10, 0.10, 0.09, );
assert!(cost.estimated_monthly_cost_usd > 0.0);
assert!(cost.cost_per_1000_messages > 0.0);
assert_eq!(cost.storage_gb, 100.0);
}
#[test]
fn test_estimate_operational_cost_high_storage() {
let cost = estimate_mysql_operational_cost(
1200.0, 10000.0, 50.0, 1_000_000, 0.10, 0.10, 0.09,
);
assert!(!cost.optimization_recommendations.is_empty());
assert!(cost
.optimization_recommendations
.iter()
.any(|r| r.contains("partition")));
}
#[test]
fn test_calculate_sla_compliance_compliant() {
let times = vec![5.0, 10.0, 15.0, 20.0, 25.0];
let report = calculate_sla_compliance(×, 30.0);
assert_eq!(report.total_messages, 5);
assert_eq!(report.messages_within_sla, 5);
assert_eq!(report.messages_exceeding_sla, 0);
assert_eq!(report.compliance_percentage, 100.0);
assert_eq!(report.status, SlaStatus::Compliant);
}
#[test]
fn test_calculate_sla_compliance_violation() {
let times = vec![5.0, 10.0, 15.0, 20.0, 25.0, 100.0, 150.0]; let report = calculate_sla_compliance(×, 30.0);
assert_eq!(report.total_messages, 7);
assert_eq!(report.messages_within_sla, 5);
assert_eq!(report.messages_exceeding_sla, 2);
assert!(report.compliance_percentage < 95.0);
assert_eq!(report.status, SlaStatus::Violation);
}
#[test]
fn test_calculate_sla_compliance_empty() {
let times = vec![];
let report = calculate_sla_compliance(×, 30.0);
assert_eq!(report.total_messages, 0);
assert_eq!(report.status, SlaStatus::Violation);
}
#[test]
fn test_calculate_alert_thresholds() {
let thresholds = calculate_alert_thresholds(100, 1000, 50.0, 60);
assert!(thresholds.queue_size_warning > 0);
assert!(thresholds.queue_size_critical > thresholds.queue_size_warning);
assert_eq!(thresholds.lag_warning_secs, 120);
assert_eq!(thresholds.lag_critical_secs, 300);
assert_eq!(thresholds.error_rate_warning_percent, 1.0);
assert_eq!(thresholds.error_rate_critical_percent, 5.0);
}
#[test]
fn test_forecast_capacity_sufficient() {
let forecast = forecast_capacity_needs(
5000.0, 10.0, 30, 10, 1500.0, );
assert_eq!(forecast.current_capacity_per_hour, 15000.0);
assert!(forecast.projected_load_per_hour > 5000.0);
assert_eq!(forecast.status, CapacityStatus::Sufficient);
assert_eq!(forecast.recommended_additional_workers, 0);
}
#[test]
fn test_forecast_capacity_exceeded() {
let forecast = forecast_capacity_needs(
10000.0, 50.0, 60, 5, 1000.0, );
assert_eq!(forecast.current_capacity_per_hour, 5000.0);
assert!(forecast.projected_load_per_hour > forecast.current_capacity_per_hour);
assert_eq!(forecast.status, CapacityStatus::Exceeded);
assert!(forecast.recommended_additional_workers > 0);
}
#[test]
fn test_forecast_capacity_warning() {
let forecast = forecast_capacity_needs(
8000.0, 20.0, 30, 10, 1200.0, );
assert!(forecast.utilization_percent > 60.0);
assert!(forecast.utilization_percent <= 80.0);
assert_eq!(forecast.status, CapacityStatus::Warning);
}
}