pub fn analyze_message_flow_pattern(
recent_counts: &[usize],
window_size: usize,
) -> (bool, &'static str, &'static str) {
if recent_counts.is_empty() || window_size == 0 {
return (false, "none", "insufficient_data");
}
let len = recent_counts.len().min(window_size);
let window = &recent_counts[recent_counts.len().saturating_sub(len)..];
if window.len() < 3 {
return (false, "none", "insufficient_data");
}
let mean = window.iter().sum::<usize>() as f64 / window.len() as f64;
let variance = window
.iter()
.map(|&x| {
let diff = x as f64 - mean;
diff * diff
})
.sum::<f64>()
/ window.len() as f64;
let std_dev = variance.sqrt();
let last_value = window[window.len() - 1] as f64;
let deviation = (last_value - mean).abs();
if deviation >= 3.0 * std_dev {
let pattern = if last_value > mean {
"sudden_spike"
} else {
"sudden_drop"
};
(true, "high", pattern)
} else if deviation >= 2.0 * std_dev {
let pattern = if last_value > mean {
"moderate_increase"
} else {
"moderate_decrease"
};
(true, "medium", pattern)
} else {
let is_increasing = window.windows(2).all(|w| w[1] >= w[0]);
let is_decreasing = window.windows(2).all(|w| w[1] <= w[0]);
if is_increasing {
(false, "low", "gradual_increase")
} else if is_decreasing {
(false, "low", "gradual_decrease")
} else {
(false, "none", "normal")
}
}
}
pub fn estimate_optimal_worker_pool(
avg_msg_per_sec: usize,
avg_processing_ms: usize,
target_latency_ms: usize,
worker_overhead_pct: u8,
) -> (usize, f64, f64) {
if avg_msg_per_sec == 0 || avg_processing_ms == 0 {
return (1, 0.0, 0.0);
}
let processing_time_sec = avg_processing_ms as f64 / 1000.0;
let base_workers = (avg_msg_per_sec as f64 * processing_time_sec).ceil() as usize;
let overhead_factor = 1.0 + (worker_overhead_pct as f64 / 100.0);
let adjusted_workers = (base_workers as f64 * overhead_factor).ceil() as usize;
let latency_buffer = if target_latency_ms < avg_processing_ms {
let ratio = avg_processing_ms as f64 / target_latency_ms as f64;
(adjusted_workers as f64 * ratio).ceil() as usize
} else {
adjusted_workers
};
let optimal_workers = latency_buffer.max(1);
let worker_capacity = 1000.0 / avg_processing_ms as f64; let max_throughput = worker_capacity * optimal_workers as f64 / overhead_factor;
let utilization = (avg_msg_per_sec as f64 / max_throughput).min(1.0);
(optimal_workers, max_throughput, utilization)
}
pub fn analyze_compression_benefit(
avg_message_size: usize,
message_count_per_sec: usize,
content_type: &str,
) -> (bool, f64, String) {
let compressible_types = [
"application/json",
"text/plain",
"text/html",
"text/xml",
"application/xml",
];
let is_compressible = compressible_types.iter().any(|&t| content_type.contains(t));
if !is_compressible {
return (false, 1.0, "content_type_not_compressible".to_string());
}
if avg_message_size < 500 {
return (false, 1.0, "message_too_small".to_string());
}
let estimated_ratio = if content_type.contains("json") {
0.3 } else if content_type.contains("xml") {
0.25 } else if content_type.contains("text") {
0.4 } else {
0.5
};
let bytes_per_sec = avg_message_size * message_count_per_sec;
let savings_per_sec = (bytes_per_sec as f64 * (1.0 - estimated_ratio)) as usize;
let should_compress = savings_per_sec > 1_000_000 || avg_message_size > 10_000;
let recommendation = if should_compress {
format!("enable_compression_saves_{}_bytes_per_sec", savings_per_sec)
} else {
"compression_overhead_not_worth_it".to_string()
};
(should_compress, estimated_ratio, recommendation)
}
pub fn calculate_queue_migration_plan(
message_count: usize,
batch_size: usize,
messages_per_sec: usize,
) -> (usize, usize, String) {
if message_count == 0 || batch_size == 0 {
return (0, 0, "no_migration_needed".to_string());
}
let safe_rate = messages_per_sec.max(1);
let batches = (message_count as f64 / batch_size as f64).ceil() as usize;
let total_time_secs = message_count / safe_rate;
let recommendation = if total_time_secs < 60 {
"fast_migration_proceed".to_string()
} else if total_time_secs < 3600 {
format!("moderate_migration_{}_minutes", total_time_secs / 60)
} else {
format!(
"slow_migration_{}_hours_consider_increasing_rate",
total_time_secs / 3600
)
};
(batches, total_time_secs, recommendation)
}
pub fn profile_message_patterns(
message_sizes: &[usize],
arrival_intervals_ms: &[usize],
) -> (String, f64, String) {
if message_sizes.is_empty() || arrival_intervals_ms.is_empty() {
return ("unknown".to_string(), 0.0, "insufficient_data".to_string());
}
let avg_size = message_sizes.iter().sum::<usize>() as f64 / message_sizes.len() as f64;
let size_variance = message_sizes
.iter()
.map(|&s| {
let diff = s as f64 - avg_size;
diff * diff
})
.sum::<f64>()
/ message_sizes.len() as f64;
let size_std_dev = size_variance.sqrt();
let size_cv = if avg_size > 0.0 {
size_std_dev / avg_size
} else {
0.0
};
let avg_interval =
arrival_intervals_ms.iter().sum::<usize>() as f64 / arrival_intervals_ms.len() as f64;
let interval_variance = arrival_intervals_ms
.iter()
.map(|&i| {
let diff = i as f64 - avg_interval;
diff * diff
})
.sum::<f64>()
/ arrival_intervals_ms.len() as f64;
let interval_std_dev = interval_variance.sqrt();
let interval_cv = if avg_interval > 0.0 {
interval_std_dev / avg_interval
} else {
0.0
};
let pattern_type = if size_cv < 0.1 && interval_cv < 0.1 {
"highly_regular".to_string()
} else if size_cv < 0.3 && interval_cv < 0.3 {
"regular".to_string()
} else if size_cv > 0.8 || interval_cv > 0.8 {
"highly_irregular".to_string()
} else {
"moderately_irregular".to_string()
};
let consistency_score = 1.0 - ((size_cv + interval_cv) / 2.0).min(1.0);
let recommendation = if consistency_score > 0.8 {
"predictable_use_static_buffers".to_string()
} else if consistency_score > 0.5 {
"moderate_use_adaptive_buffers".to_string()
} else {
"unpredictable_use_dynamic_scaling".to_string()
};
(pattern_type, consistency_score, recommendation)
}
pub fn calculate_network_efficiency(
bytes_sent: usize,
bytes_received: usize,
max_bandwidth_bytes: usize,
) -> (f64, f64, String) {
if max_bandwidth_bytes == 0 {
return (0.0, 0.0, "invalid_bandwidth".to_string());
}
let total_bytes = bytes_sent + bytes_received;
let bandwidth_util_pct = (total_bytes as f64 / max_bandwidth_bytes as f64 * 100.0).min(100.0);
let send_ratio = if total_bytes > 0 {
bytes_sent as f64 / total_bytes as f64
} else {
0.0
};
let balance_score = 1.0 - (send_ratio - 0.5).abs() * 2.0;
let utilization_score = if bandwidth_util_pct < 70.0 {
bandwidth_util_pct / 70.0 } else if bandwidth_util_pct < 90.0 {
1.0 } else {
(100.0 - bandwidth_util_pct) / 10.0 };
let efficiency_score = (balance_score * 0.4 + utilization_score * 0.6).clamp(0.0, 1.0);
let recommendation = if efficiency_score > 0.8 {
"excellent_efficiency".to_string()
} else if efficiency_score > 0.6 {
"good_efficiency".to_string()
} else if bandwidth_util_pct > 90.0 {
"increase_bandwidth_overutilized".to_string()
} else if bandwidth_util_pct < 30.0 {
"reduce_bandwidth_underutilized".to_string()
} else if (send_ratio - 0.5).abs() > 0.3 {
"imbalanced_traffic_pattern".to_string()
} else {
"optimize_message_patterns".to_string()
};
(efficiency_score, bandwidth_util_pct, recommendation)
}
pub fn detect_message_hotspots(message_counts: &[usize]) -> (bool, usize, f64, String) {
if message_counts.is_empty() {
return (false, 0, 1.0, "no_data".to_string());
}
if message_counts.len() == 1 {
return (false, 0, 1.0, "single_queue".to_string());
}
let max_count = *message_counts.iter().max().unwrap();
let avg_count = message_counts.iter().sum::<usize>() as f64 / message_counts.len() as f64;
let hotspot_index = message_counts.iter().position(|&c| c == max_count).unwrap();
let imbalance_ratio = if avg_count > 0.0 {
max_count as f64 / avg_count
} else {
1.0
};
let has_hotspot = imbalance_ratio > 2.0;
let recommendation = if !has_hotspot {
"balanced_distribution".to_string()
} else if imbalance_ratio > 5.0 {
format!("severe_hotspot_rebalance_queue_{}", hotspot_index)
} else if imbalance_ratio > 3.0 {
format!(
"moderate_hotspot_check_routing_keys_queue_{}",
hotspot_index
)
} else {
format!("minor_hotspot_monitor_queue_{}", hotspot_index)
};
(has_hotspot, hotspot_index, imbalance_ratio, recommendation)
}
pub fn recommend_queue_topology(
messages_per_sec: usize,
avg_processing_ms: usize,
num_consumers: usize,
requires_ordering: bool,
) -> (String, usize, String) {
if messages_per_sec == 0 {
return ("single_queue".to_string(), 1, "low_volume".to_string());
}
let processing_rate = if avg_processing_ms > 0 {
(1000.0 / avg_processing_ms as f64) as usize
} else {
1000
};
let total_capacity = processing_rate * num_consumers;
let load_ratio = messages_per_sec as f64 / total_capacity as f64;
let (topology_type, queue_count) = if requires_ordering {
if load_ratio > 0.8 {
("partitioned".to_string(), num_consumers.max(4))
} else {
("single_queue".to_string(), 1)
}
} else {
if load_ratio > 1.5 {
let recommended = (messages_per_sec as f64 / (processing_rate as f64 * 0.7)) as usize;
("multi_queue".to_string(), recommended.clamp(2, 32))
} else if load_ratio > 0.8 {
("priority_queues".to_string(), 3) } else if messages_per_sec > 1000 {
("worker_pool".to_string(), num_consumers.max(2))
} else {
("single_queue".to_string(), 1)
}
};
let recommendation = if load_ratio > 1.5 {
format!(
"overloaded_increase_consumers_or_queues_load_ratio_{:.2}",
load_ratio
)
} else if load_ratio > 1.0 {
format!("near_capacity_scale_soon_load_ratio_{:.2}", load_ratio)
} else if load_ratio < 0.3 {
format!(
"underutilized_reduce_resources_load_ratio_{:.2}",
load_ratio
)
} else {
format!("optimal_topology_{}", topology_type)
};
(topology_type, queue_count, recommendation)
}
pub fn calculate_message_deduplication_window(
avg_message_interval_ms: usize,
retry_count: usize,
max_delivery_delay_ms: usize,
) -> (usize, usize, String) {
let retry_window_ms = avg_message_interval_ms * retry_count.max(1);
let total_window_ms = retry_window_ms + max_delivery_delay_ms;
let window_secs = ((total_window_ms * 2) / 1000).max(60);
let messages_per_sec = if avg_message_interval_ms > 0 {
(1000.0 / avg_message_interval_ms as f64) as usize
} else {
100 };
let cache_size = (messages_per_sec * window_secs).clamp(1000, 1000000);
let recommendation = if window_secs > 3600 {
format!(
"long_window_{}_secs_consider_persistent_storage",
window_secs
)
} else if window_secs > 600 {
format!("medium_window_{}_secs_cache_{}", window_secs, cache_size)
} else {
format!("short_window_{}_secs_cache_{}", window_secs, cache_size)
};
(window_secs, cache_size, recommendation)
}
pub fn analyze_retry_effectiveness(
total_messages: usize,
failed_messages: usize,
retry_successes: usize,
final_failures: usize,
) -> (f64, f64, String) {
if failed_messages == 0 {
return (100.0, 100.0, "no_failures_retries_not_needed".to_string());
}
let effectiveness = (retry_successes as f64 / failed_messages as f64) * 100.0;
let total_successes = total_messages - final_failures;
let success_rate = (total_successes as f64 / total_messages as f64) * 100.0;
let recommendation = if effectiveness > 80.0 {
format!(
"highly_effective_retries_{:.1}pct_success_keep_current_policy",
effectiveness
)
} else if effectiveness > 50.0 {
format!(
"moderately_effective_retries_{:.1}pct_consider_backoff_tuning",
effectiveness
)
} else if effectiveness > 20.0 {
format!(
"low_effectiveness_{:.1}pct_review_retry_strategy",
effectiveness
)
} else {
format!(
"ineffective_retries_{:.1}pct_investigate_root_cause",
effectiveness
)
};
(effectiveness, success_rate, recommendation)
}
pub fn calculate_queue_overflow_risk(
current_size: usize,
max_size: usize,
enqueue_rate: usize,
dequeue_rate: usize,
) -> (f64, i64, String) {
if max_size == 0 {
return (100.0, 0, "invalid_max_size_queue_misconfigured".to_string());
}
let net_rate = enqueue_rate as i64 - dequeue_rate as i64;
if net_rate <= 0 {
let drain_time_secs = if dequeue_rate > enqueue_rate && current_size > 0 {
(current_size as f64 / (dequeue_rate - enqueue_rate) as f64) as i64
} else {
-1
};
return (
0.0,
drain_time_secs,
"healthy_queue_draining_no_overflow_risk".to_string(),
);
}
let remaining_capacity = max_size.saturating_sub(current_size);
let time_to_full_secs = if net_rate > 0 {
(remaining_capacity as f64 / net_rate as f64) as i64
} else {
i64::MAX
};
let utilization = (current_size as f64 / max_size as f64) * 100.0;
let risk = if utilization >= 90.0 {
95.0 } else if utilization >= 80.0 {
80.0 } else if utilization >= 60.0 {
60.0 } else if time_to_full_secs < 60 {
75.0 } else if time_to_full_secs < 300 {
50.0 } else if time_to_full_secs < 3600 {
30.0 } else {
10.0 };
let recommendation = if risk >= 90.0 {
format!(
"critical_risk_{:.1}pct_utilized_ttf_{}_secs_immediate_action",
utilization, time_to_full_secs
)
} else if risk >= 70.0 {
format!(
"high_risk_{:.1}pct_utilized_ttf_{}_secs_scale_consumers",
utilization, time_to_full_secs
)
} else if risk >= 40.0 {
format!(
"medium_risk_{:.1}pct_utilized_ttf_{}_secs_monitor_closely",
utilization, time_to_full_secs
)
} else {
format!("low_risk_{:.1}pct_utilized_queue_healthy", utilization)
};
(risk, time_to_full_secs, recommendation)
}
pub fn calculate_message_throughput_trend(
throughput_samples: &[f64],
window_size: usize,
) -> (f64, String, f64) {
if throughput_samples.is_empty() {
return (0.0, "no_data".to_string(), 0.0);
}
let actual_window = window_size.min(throughput_samples.len());
let recent_samples =
&throughput_samples[throughput_samples.len().saturating_sub(actual_window)..];
if recent_samples.is_empty() {
return (0.0, "no_data".to_string(), 0.0);
}
let current_avg: f64 = recent_samples.iter().sum::<f64>() / recent_samples.len() as f64;
if recent_samples.len() < 2 {
return (current_avg, "stable".to_string(), 0.0);
}
let mid = recent_samples.len() / 2;
let first_half_avg: f64 = recent_samples[..mid].iter().sum::<f64>() / mid as f64;
let second_half_avg: f64 =
recent_samples[mid..].iter().sum::<f64>() / (recent_samples.len() - mid) as f64;
let change_pct = if first_half_avg > 0.0 {
((second_half_avg - first_half_avg) / first_half_avg) * 100.0
} else {
0.0
};
let trend = if change_pct > 10.0 {
"increasing"
} else if change_pct < -10.0 {
"decreasing"
} else {
"stable"
};
(current_avg, trend.to_string(), change_pct)
}
pub fn detect_queue_starvation(
queue_size: usize,
consumer_count: usize,
min_queue_depth: usize,
) -> (bool, String, String) {
if consumer_count == 0 {
return (
false,
"no_consumers".to_string(),
"no_consumers_to_starve".to_string(),
);
}
let recommended_depth = consumer_count * min_queue_depth;
let depth_ratio = queue_size as f64 / recommended_depth as f64;
let (is_starving, severity) = if depth_ratio < 0.25 {
(true, "high")
} else if depth_ratio < 0.5 {
(true, "medium")
} else if depth_ratio < 0.75 {
(true, "low")
} else {
(false, "healthy")
};
let recommendation = if is_starving {
if consumer_count > queue_size {
format!(
"high_starvation_reduce_consumers_from_{}_to_{}_or_increase_queue_depth",
consumer_count,
queue_size.max(1)
)
} else {
format!(
"{}_starvation_increase_queue_depth_from_{}_to_{}_or_reduce_consumers",
severity, queue_size, recommended_depth
)
}
} else {
format!(
"healthy_queue_depth_{}_for_{}_consumers",
queue_size, consumer_count
)
};
(is_starving, severity.to_string(), recommendation)
}
pub fn estimate_broker_capacity(
avg_processing_ms: usize,
worker_count: usize,
connection_pool_size: usize,
target_utilization: f64,
) -> (f64, f64, String) {
if avg_processing_ms == 0 || worker_count == 0 {
return (
0.0,
0.0,
"invalid_input_processing_time_or_workers_zero".to_string(),
);
}
let msg_per_sec_per_worker = 1000.0 / avg_processing_ms as f64;
let theoretical_max = msg_per_sec_per_worker * worker_count as f64;
let connection_limited = if connection_pool_size < worker_count {
msg_per_sec_per_worker * connection_pool_size as f64
} else {
theoretical_max
};
let safe_max = connection_limited * target_utilization;
let headroom_pct = ((1.0 - target_utilization) * 100.0).max(0.0);
let recommendation = if headroom_pct < 5.0 {
format!(
"critical_capacity_{:.1}_msgs_sec_headroom_{:.1}pct_scale_immediately",
safe_max, headroom_pct
)
} else if headroom_pct < 15.0 {
format!(
"limited_capacity_{:.1}_msgs_sec_headroom_{:.1}pct_plan_scaling",
safe_max, headroom_pct
)
} else {
format!(
"healthy_capacity_{:.1}_msgs_sec_headroom_{:.1}pct",
safe_max, headroom_pct
)
};
(safe_max, headroom_pct, recommendation)
}