#[allow(clippy::too_many_arguments)]
pub fn suggest_connection_pool_size(
peak_concurrent_requests: usize,
avg_concurrent_requests: usize,
max_allowed_connections: usize,
) -> (usize, usize, usize) {
if max_allowed_connections == 0 {
return (0, 0, 0);
}
let min_connections = (avg_concurrent_requests / 10)
.clamp(1, 5)
.min(max_allowed_connections);
let recommended_max = ((peak_concurrent_requests as f64 * 1.5).ceil() as usize)
.clamp(min_connections + 1, max_allowed_connections);
let recommended_initial = ((avg_concurrent_requests as f64 * 1.2).ceil() as usize)
.clamp(min_connections, recommended_max);
(min_connections, recommended_max, recommended_initial)
}
pub fn calculate_message_processing_trend(
processing_times_ms: &[u64],
) -> (&'static str, f64, &'static str) {
if processing_times_ms.len() < 3 {
return ("stable", 0.0, "insufficient_data");
}
let n = processing_times_ms.len() as f64;
let sum_x: f64 = (0..processing_times_ms.len()).map(|i| i as f64).sum();
let sum_y: f64 = processing_times_ms.iter().map(|&t| t as f64).sum();
let sum_xy: f64 = processing_times_ms
.iter()
.enumerate()
.map(|(i, &t)| i as f64 * t as f64)
.sum();
let sum_x_squared: f64 = (0..processing_times_ms.len())
.map(|i| (i as f64) * (i as f64))
.sum();
let slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x_squared - sum_x * sum_x);
let avg_time = sum_y / n;
let normalized_slope = if avg_time > 0.0 {
(slope / avg_time).abs()
} else {
0.0
};
let (direction, recommendation) = if slope < -1.0 {
("improving", "maintain_current_optimizations")
} else if slope > 1.0 {
("degrading", "investigate_performance_issues")
} else {
("stable", "monitor_continuously")
};
let strength = normalized_slope.min(1.0);
(direction, strength, recommendation)
}
pub fn suggest_prefetch_count(
avg_processing_time_ms: u64,
concurrent_workers: usize,
max_prefetch: usize,
) -> usize {
if concurrent_workers == 0 || max_prefetch == 0 {
return 1;
}
let msgs_per_sec_per_worker = if avg_processing_time_ms > 0 {
1000.0 / avg_processing_time_ms as f64
} else {
10.0 };
let base_prefetch = if msgs_per_sec_per_worker >= 10.0 {
20 } else if msgs_per_sec_per_worker >= 1.0 {
10 } else {
5 };
let adjusted_prefetch = (base_prefetch * concurrent_workers).max(1);
adjusted_prefetch.min(max_prefetch)
}
pub fn analyze_dead_letter_queue(
dlq_size: usize,
total_processed: u64,
dlq_growth_per_hour: usize,
) -> (&'static str, &'static str, &'static str) {
if total_processed == 0 {
return ("low", "no_data", "monitor");
}
let failure_rate = dlq_size as f64 / total_processed as f64;
let (severity, primary_issue, recommendation) = if failure_rate > 0.1 {
(
"critical",
"high_failure_rate",
"immediate_investigation_required",
)
} else if failure_rate > 0.05 {
(
"high",
"elevated_failure_rate",
"investigate_error_patterns",
)
} else if dlq_growth_per_hour > 100 {
(
"high",
"rapid_dlq_growth",
"monitor_closely_and_investigate",
)
} else if failure_rate > 0.01 {
("medium", "moderate_failures", "review_recent_failures")
} else if dlq_size > 0 {
("low", "normal_failures", "periodic_dlq_review")
} else {
("low", "healthy", "continue_monitoring")
};
(severity, primary_issue, recommendation)
}
pub fn forecast_queue_capacity_ml(
historical_sizes: &[usize],
forecast_hours: usize,
) -> (usize, f64, &'static str) {
if historical_sizes.len() < 3 {
return (
historical_sizes.last().copied().unwrap_or(0),
0.0,
"insufficient_data",
);
}
let n = historical_sizes.len() as f64;
let x_mean = (n - 1.0) / 2.0;
let y_mean = historical_sizes.iter().sum::<usize>() as f64 / n;
let mut numerator = 0.0;
let mut denominator = 0.0;
for (i, &size) in historical_sizes.iter().enumerate() {
let x = i as f64;
let y = size as f64;
numerator += (x - x_mean) * (y - y_mean);
denominator += (x - x_mean).powi(2);
}
let slope = if denominator > 0.0 {
numerator / denominator
} else {
0.0
};
let future_x = (historical_sizes.len() - 1) as f64 + forecast_hours as f64;
let forecast = (y_mean + slope * (future_x - x_mean)).max(0.0) as usize;
let variance: f64 = historical_sizes
.iter()
.map(|&s| {
let predicted = y_mean
+ slope * (historical_sizes.iter().position(|&x| x == s).unwrap() as f64 - x_mean);
(s as f64 - predicted).powi(2)
})
.sum::<f64>()
/ n;
let std_dev = variance.sqrt();
let coefficient_of_variation = std_dev / y_mean;
let confidence = if coefficient_of_variation < 0.1 {
"high"
} else if coefficient_of_variation < 0.3 {
"medium"
} else {
"low"
};
(forecast, slope, confidence)
}
pub fn optimize_batch_strategy(
avg_message_size: usize,
network_latency_ms: u64,
processing_time_ms: u64,
throughput_target: u64,
) -> (usize, u64, u64, &'static str) {
let latency_weight =
network_latency_ms as f64 / (network_latency_ms + processing_time_ms) as f64;
let base_batch_size = if latency_weight > 0.5 {
100
} else {
20
};
let max_batch_bytes = 4 * 1024 * 1024; let size_limited_batch = (max_batch_bytes / avg_message_size.max(1)).min(1000);
let batch_size = base_batch_size.min(size_limited_batch);
let messages_per_ms = throughput_target as f64 / 1000.0;
let fill_time_ms = if messages_per_ms > 0.0 {
(batch_size as f64 / messages_per_ms) as u64
} else {
100
};
let max_wait_ms = fill_time_ms.clamp(10, 5000);
let batch_overhead = network_latency_ms + 10; let time_per_batch = batch_overhead + (batch_size as u64 * processing_time_ms);
let batches_per_sec = 1000u64.checked_div(time_per_batch).unwrap_or(0);
let estimated_throughput = batches_per_sec * batch_size as u64;
let strategy = if batch_size > 50 {
"throughput_optimized"
} else if network_latency_ms > 100 {
"latency_optimized"
} else {
"balanced"
};
(batch_size, max_wait_ms, estimated_throughput, strategy)
}
pub fn calculate_multi_queue_efficiency(
queue_sizes: &[usize],
processing_rates: &[u64],
) -> (f64, f64, &'static str) {
if queue_sizes.is_empty() || queue_sizes.len() != processing_rates.len() {
return (0.0, 0.0, "invalid_input");
}
let drain_times: Vec<f64> = queue_sizes
.iter()
.zip(processing_rates.iter())
.map(|(&size, &rate)| {
if rate > 0 {
size as f64 / rate as f64
} else {
f64::INFINITY
}
})
.collect();
let max_rate = *processing_rates.iter().max().unwrap_or(&1);
let efficiency = if max_rate > 0 {
processing_rates
.iter()
.map(|&r| r as f64 / max_rate as f64)
.sum::<f64>()
/ processing_rates.len() as f64
} else {
0.0
};
let avg_drain_time = drain_times.iter().filter(|t| t.is_finite()).sum::<f64>()
/ drain_times.iter().filter(|t| t.is_finite()).count() as f64;
let variance = drain_times
.iter()
.filter(|t| t.is_finite())
.map(|&t| (t - avg_drain_time).powi(2))
.sum::<f64>()
/ drain_times.iter().filter(|t| t.is_finite()).count() as f64;
let coefficient_of_variation = if avg_drain_time > 0.0 {
variance.sqrt() / avg_drain_time
} else {
0.0
};
let load_balance_score = (1.0 / (1.0 + coefficient_of_variation)).min(1.0);
let recommendation = if load_balance_score < 0.5 {
"rebalance_workers"
} else if efficiency < 0.6 {
"increase_capacity"
} else if load_balance_score > 0.8 && efficiency > 0.8 {
"optimal"
} else {
"monitor"
};
(efficiency, load_balance_score, recommendation)
}
pub fn predict_resource_exhaustion(
current_usage: usize,
max_capacity: usize,
growth_rate_per_hour: usize,
) -> (usize, &'static str, &'static str) {
if growth_rate_per_hour == 0 {
return (usize::MAX, "healthy", "monitor");
}
if current_usage >= max_capacity {
return (0, "critical", "immediate_action_required");
}
let remaining = max_capacity.saturating_sub(current_usage);
let hours_until_exhausted = remaining / growth_rate_per_hour.max(1);
let (severity, action) = if hours_until_exhausted < 1 {
("critical", "immediate_scaling_required")
} else if hours_until_exhausted < 4 {
("high", "scale_within_hour")
} else if hours_until_exhausted < 24 {
("warning", "plan_scaling")
} else if hours_until_exhausted < 168 {
("low", "monitor_trends")
} else {
("healthy", "normal_monitoring")
};
(hours_until_exhausted, severity, action)
}
pub fn suggest_autoscaling_policy(
peak_load: u64,
average_load: u64,
min_load: u64,
load_volatility: u64,
) -> (usize, usize, f64, f64, &'static str) {
let worker_capacity = 100;
let min_workers = ((min_load as f64 / worker_capacity as f64) * 1.2).ceil() as usize;
let min_workers = min_workers.max(2);
let max_workers = ((peak_load as f64 / worker_capacity as f64) * 1.5).ceil() as usize;
let max_workers = max_workers.max(min_workers * 2);
let volatility_ratio = if average_load > 0 {
load_volatility as f64 / average_load as f64
} else {
0.0
};
let (scale_up_threshold, scale_down_threshold, policy_type) = if volatility_ratio > 0.5 {
(0.6, 0.3, "aggressive")
} else if volatility_ratio > 0.2 {
(0.7, 0.4, "balanced")
} else {
(0.8, 0.5, "conservative")
};
(
min_workers,
max_workers,
scale_up_threshold,
scale_down_threshold,
policy_type,
)
}
pub fn calculate_message_affinity(message_key: &str, num_workers: usize) -> usize {
if num_workers == 0 {
return 0;
}
let mut hash: u64 = 0;
for byte in message_key.bytes() {
hash = hash.wrapping_mul(31).wrapping_add(byte as u64);
}
(hash % num_workers as u64) as usize
}
pub fn analyze_queue_temperature(
messages_per_min: usize,
avg_age_secs: usize,
) -> (&'static str, &'static str) {
if messages_per_min > 50 && avg_age_secs < 60 {
("hot", "maintain_resources")
}
else if messages_per_min > 10 && avg_age_secs < 300 {
("warm", "monitor")
}
else if messages_per_min < 5 || avg_age_secs > 600 {
("cold", "consider_scaling_down")
}
else {
("lukewarm", "monitor")
}
}
pub fn detect_processing_bottleneck(
publish_rate: usize,
consume_rate: usize,
queue_size: usize,
processing_time_ms: usize,
) -> (&'static str, &'static str, &'static str) {
let rate_ratio = if consume_rate > 0 {
publish_rate as f64 / consume_rate as f64
} else {
f64::INFINITY
};
if rate_ratio > 1.5 && queue_size > 1000 {
return (
"consumer",
"high",
"scale_up_consumers_or_optimize_processing",
);
}
if processing_time_ms > 1000 {
return ("processing", "medium", "optimize_task_logic_or_add_caching");
}
if queue_size > 5000 {
return ("queue", "medium", "increase_consumer_capacity");
}
if rate_ratio < 0.5 && queue_size < 100 {
return (
"publisher",
"low",
"consider_scaling_down_consumers_to_save_cost",
);
}
("none", "low", "system_healthy")
}
pub fn calculate_optimal_prefetch_multiplier(
avg_processing_time_ms: usize,
network_latency_ms: usize,
concurrency: usize,
) -> f64 {
if avg_processing_time_ms == 0 || concurrency == 0 {
return 1.0;
}
let base_multiplier = if avg_processing_time_ms < 50 {
5.0
} else if avg_processing_time_ms < 200 {
3.0
} else {
2.0
};
let latency_factor = if network_latency_ms > 100 {
1.5 } else if network_latency_ms > 50 {
1.2
} else {
1.0
};
let concurrency_factor = (concurrency as f64).sqrt();
let multiplier = base_multiplier * latency_factor / concurrency_factor;
multiplier.clamp(1.0, 10.0)
}
pub fn suggest_queue_consolidation(
queue_sizes: &[usize],
queue_rates: &[usize],
) -> (bool, &'static str, &'static str) {
if queue_sizes.is_empty() || queue_rates.is_empty() {
return (false, "no_data", "n/a");
}
if queue_sizes.len() != queue_rates.len() {
return (false, "invalid_input", "ensure_matching_array_lengths");
}
let avg_size: usize = queue_sizes.iter().sum::<usize>() / queue_sizes.len();
let avg_rate: usize = queue_rates.iter().sum::<usize>() / queue_rates.len();
if avg_size < 20 && avg_rate < 5 && queue_sizes.len() > 3 {
return (
true,
"overhead",
"reduce_queue_count_to_minimize_management_overhead",
);
}
let underutilized = queue_sizes
.iter()
.zip(queue_rates.iter())
.filter(|(&size, &rate)| size < 50 && rate < 10)
.count();
if underutilized as f64 / queue_sizes.len() as f64 > 0.6 {
return (
true,
"low_throughput",
"consolidate_into_fewer_queues_with_routing",
);
}
(false, "efficient", "maintain_current_structure")
}
pub fn calculate_queue_utilization_efficiency(
current_size: usize,
capacity: usize,
messages_per_sec: usize,
peak_messages_per_sec: usize,
) -> (f64, f64, &'static str) {
if capacity == 0 {
return (0.0, 0.0, "invalid_capacity");
}
let size_util = (current_size as f64 / capacity as f64).min(1.0);
let throughput_eff = if peak_messages_per_sec > 0 {
(messages_per_sec as f64 / peak_messages_per_sec as f64).min(1.0)
} else {
0.0
};
let efficiency_score = (size_util * 0.4 + throughput_eff * 0.6).min(1.0);
let recommendation = if efficiency_score > 0.8 {
"excellent_utilization"
} else if efficiency_score > 0.6 {
"good_utilization"
} else if efficiency_score > 0.4 {
"moderate_utilization_consider_optimization"
} else {
"poor_utilization_needs_attention"
};
(size_util, efficiency_score, recommendation)
}