use super::queue::QueueStats;
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct RebalancerConfig {
pub total_budget: u64,
pub epoch_duration: Duration,
pub max_change_fraction: f64,
pub min_per_queue: u64,
}
impl Default for RebalancerConfig {
fn default() -> Self {
Self {
total_budget: 4 * 1024 * 1024 * 1024, epoch_duration: Duration::from_secs(1),
max_change_fraction: 0.2,
min_per_queue: 64 * 1024 * 1024, }
}
}
pub struct DynamicRebalancer {
config: RebalancerConfig,
current_allocations: Vec<u64>,
queue_names: Vec<&'static str>,
}
impl DynamicRebalancer {
#[must_use]
pub fn new(
config: RebalancerConfig,
queue_names: Vec<&'static str>,
initial_allocations: Vec<u64>,
) -> Self {
assert_eq!(queue_names.len(), initial_allocations.len());
Self { config, current_allocations: initial_allocations, queue_names }
}
#[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation, clippy::cast_sign_loss)]
pub fn rebalance(&mut self, stats: &[QueueStats]) -> Vec<u64> {
assert_eq!(stats.len(), self.current_allocations.len());
if stats.is_empty() {
return Vec::new();
}
let demands: Vec<f64> = stats
.iter()
.zip(self.current_allocations.iter())
.map(|(s, ¤t_alloc)| {
let base_demand = s.peak_bytes.max(self.config.min_per_queue) as f64;
let starvation_factor = 1.0 + (s.time_blocked_ms as f64 / 1000.0);
let utilization = s.peak_bytes as f64 / current_alloc.max(1) as f64;
let utilization_factor = if utilization > 0.9 { 1.5 } else { 1.0 };
base_demand * starvation_factor * utilization_factor
})
.collect();
let total_demand: f64 = demands.iter().sum();
let ideal: Vec<u64> = demands
.iter()
.map(|&d| {
let fraction = d / total_demand.max(1.0);
(fraction * self.config.total_budget as f64) as u64
})
.collect();
let new_allocations: Vec<u64> = self
.current_allocations
.iter()
.zip(ideal.iter())
.map(|(¤t, &target)| {
let max_change = (current as f64 * self.config.max_change_fraction) as u64;
let max_change = max_change.max(self.config.min_per_queue / 4);
let new = if target > current {
current.saturating_add(max_change.min(target - current))
} else {
current.saturating_sub(max_change.min(current - target))
};
new.max(self.config.min_per_queue)
})
.collect();
let sum: u64 = new_allocations.iter().sum();
let normalized: Vec<u64> = if sum > 0 {
new_allocations
.iter()
.map(|&a| ((a as f64 / sum as f64) * self.config.total_budget as f64) as u64)
.collect()
} else {
let per_queue = self.config.total_budget / new_allocations.len() as u64;
vec![per_queue; new_allocations.len()]
};
self.current_allocations.clone_from(&normalized);
normalized
}
#[must_use]
pub fn current_allocations(&self) -> &[u64] {
&self.current_allocations
}
#[must_use]
pub fn queue_names(&self) -> &[&'static str] {
&self.queue_names
}
pub fn log_state(&self, stats: &[QueueStats]) {
log::debug!("Queue memory rebalance:");
for (i, name) in self.queue_names.iter().enumerate() {
let alloc = self.current_allocations[i];
let stat = &stats[i];
log::debug!(
" {}: {}/{} MB (peak: {} MB, blocked: {}ms)",
name,
stat.peak_bytes / (1024 * 1024),
alloc / (1024 * 1024),
stat.peak_bytes / (1024 * 1024),
stat.time_blocked_ms
);
}
}
}
#[derive(Debug, Clone)]
pub struct InitialAllocation {
pub q2_reorder: u64,
pub q3_reorder: u64,
pub q7_reorder: u64,
pub array_queues: u64,
}
#[must_use]
#[allow(clippy::match_same_arms)]
pub fn initial_allocation_for_command(
command: &str,
strategy: Option<&str>,
total_budget: u64,
) -> InitialAllocation {
let (q2_pct, q3_pct, q7_pct, other_pct) = match (command, strategy) {
("group", Some("identity")) => (20, 50, 15, 15),
("group", Some("edit")) => (25, 45, 15, 15),
("group", Some("adjacency")) => (30, 40, 15, 15),
("group", Some("paired")) => (25, 45, 15, 15),
("group", _) => (25, 45, 15, 15),
("simplex", _) => (25, 45, 15, 15),
("duplex", _) => (25, 45, 15, 15),
("codec", _) => (25, 45, 15, 15),
("filter", _) => (25, 40, 20, 15),
("correct", _) => (30, 40, 15, 15),
("extract", _) => (35, 35, 15, 15),
_ => (25, 40, 15, 20),
};
InitialAllocation {
q2_reorder: total_budget * q2_pct / 100,
q3_reorder: total_budget * q3_pct / 100,
q7_reorder: total_budget * q7_pct / 100,
array_queues: total_budget * other_pct / 100,
}
}
pub fn parse_memory_limit(s: &str) -> Result<u64, String> {
const MIN_BYTES: u64 = 256 * 1024 * 1024;
let bytes = crate::validation::parse_memory_size(s).map_err(|e| e.to_string())?;
if bytes < MIN_BYTES {
return Err(format!(
"Queue memory limit {} is too low (minimum: 256 MiB); \
if you used a decimal suffix like MB, try MiB instead (e.g., '256MiB')",
bytesize::ByteSize(bytes)
));
}
Ok(bytes)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_memory_limit() {
assert_eq!(
parse_memory_limit("4GB").expect("parsing \"4GB\" should succeed"),
4_000_000_000
);
assert_eq!(parse_memory_limit("4G").expect("parsing \"4G\" should succeed"), 4_000_000_000);
assert_eq!(
parse_memory_limit("512MB").expect("parsing \"512MB\" should succeed"),
512_000_000
);
assert_eq!(
parse_memory_limit("512M").expect("parsing \"512M\" should succeed"),
512_000_000
);
assert_eq!(
parse_memory_limit("1gb").expect("parsing \"1gb\" should succeed"),
1_000_000_000
);
assert_eq!(
parse_memory_limit(" 2GB ").expect("parsing \" 2GB \" should succeed"),
2_000_000_000
);
assert_eq!(
parse_memory_limit("1.5GB").expect("parsing \"1.5GB\" should succeed"),
1_500_000_000
);
assert_eq!(
parse_memory_limit("1GiB").expect("parsing \"1GiB\" should succeed"),
1024 * 1024 * 1024
);
}
#[test]
fn test_parse_memory_limit_minimum() {
assert!(parse_memory_limit("100MB").is_err());
assert!(parse_memory_limit("256MiB").is_ok());
assert!(parse_memory_limit("256MB").is_err());
}
#[test]
fn test_initial_allocation() {
let alloc = initial_allocation_for_command("group", Some("adjacency"), 1_000_000_000);
assert_eq!(alloc.q2_reorder, 300_000_000); assert_eq!(alloc.q3_reorder, 400_000_000); assert_eq!(alloc.q7_reorder, 150_000_000); assert_eq!(alloc.array_queues, 150_000_000); }
#[test]
fn test_rebalancer_basic() {
let config = RebalancerConfig {
total_budget: 1_000_000_000, min_per_queue: 100_000_000, ..Default::default()
};
let names = vec!["q1", "q2", "q3"];
let initial = vec![333_333_333, 333_333_333, 333_333_334];
let mut rebalancer = DynamicRebalancer::new(config, names, initial);
let stats = vec![
QueueStats { avg_bytes: 100_000_000, peak_bytes: 200_000_000, time_blocked_ms: 0 },
QueueStats {
avg_bytes: 300_000_000,
peak_bytes: 330_000_000,
time_blocked_ms: 500, },
QueueStats { avg_bytes: 100_000_000, peak_bytes: 150_000_000, time_blocked_ms: 0 },
];
let new_allocs = rebalancer.rebalance(&stats);
assert!(new_allocs[1] > new_allocs[0]);
assert!(new_allocs[1] > new_allocs[2]);
let total: u64 = new_allocs.iter().sum();
let budget: u64 = 1_000_000_000;
assert!(
total >= budget - 10 && total <= budget + 10,
"Total {total} should be close to budget {budget}"
);
}
#[test]
#[allow(clippy::cast_possible_wrap)]
fn test_rebalancer_gradual_change() {
let config = RebalancerConfig {
total_budget: 1_000_000_000,
max_change_fraction: 0.2,
min_per_queue: 100_000_000,
..Default::default()
};
let names = vec!["q1", "q2"];
let initial = vec![500_000_000, 500_000_000];
let mut rebalancer = DynamicRebalancer::new(config, names, initial);
let stats = vec![
QueueStats {
avg_bytes: 450_000_000,
peak_bytes: 500_000_000,
time_blocked_ms: 1000, },
QueueStats { avg_bytes: 50_000_000, peak_bytes: 100_000_000, time_blocked_ms: 0 },
];
let new_allocs = rebalancer.rebalance(&stats);
let change = (new_allocs[0] as i64 - 500_000_000i64).abs();
assert!(change <= 150_000_000); }
}