use crate::numa_scheduler::topology::{cores_in_node, node_of_core};
use crate::numa_scheduler::types::NumaTopology;
pub fn load_imbalance(queue_lengths: &[usize]) -> f64 {
if queue_lengths.is_empty() {
return 0.0;
}
let n = queue_lengths.len() as f64;
let mean = queue_lengths.iter().sum::<usize>() as f64 / n;
if mean < f64::EPSILON {
return 0.0;
}
let variance = queue_lengths
.iter()
.map(|&q| {
let diff = q as f64 - mean;
diff * diff
})
.sum::<f64>()
/ n;
variance.sqrt() / mean
}
pub fn should_migrate(imbalance: f64, threshold: f64) -> bool {
imbalance > threshold
}
pub fn migration_cost(
from_worker: usize,
to_worker: usize,
topology: &NumaTopology,
task_count: usize,
) -> f64 {
let node_from = worker_numa_node(from_worker, topology);
let node_to = worker_numa_node(to_worker, topology);
let factor = if node_from == node_to {
1.0_f64
} else {
2.5_f64
};
factor * task_count as f64
}
pub fn migration_plan(
queue_lengths: &[usize],
topology: &NumaTopology,
) -> Vec<(usize, usize, usize)> {
if queue_lengths.is_empty() {
return Vec::new();
}
let n = queue_lengths.len();
let total: usize = queue_lengths.iter().sum();
let mean = total as f64 / n as f64;
let mut lengths: Vec<f64> = queue_lengths.iter().map(|&q| q as f64).collect();
let mut plan: Vec<(usize, usize, usize)> = Vec::new();
let max_rounds = n * 2;
for _ in 0..max_rounds {
let maybe_from = (0..n)
.filter(|&w| lengths[w] > mean + 0.5)
.max_by(|&a, &b| {
lengths[a]
.partial_cmp(&lengths[b])
.unwrap_or(std::cmp::Ordering::Equal)
});
let from = match maybe_from {
Some(w) => w,
None => break,
};
let node_from = worker_numa_node(from, topology);
let maybe_to = find_underloaded_in_node(from, node_from, &lengths, mean, topology)
.or_else(|| find_underloaded_any(from, &lengths, mean));
let to = match maybe_to {
Some(w) => w,
None => break,
};
let surplus = (lengths[from] - mean).floor() as usize;
let n_move = (surplus / 2).max(1);
let cost = migration_cost(from, to, topology, n_move);
let benefit = n_move as f64;
if cost > benefit {
break;
}
lengths[from] -= n_move as f64;
lengths[to] += n_move as f64;
plan.push((from, to, n_move));
}
plan
}
pub fn worker_numa_node(worker_id: usize, topology: &NumaTopology) -> usize {
if topology.n_cores_per_node == 0 {
return 0;
}
let core_id = worker_id % topology.cores.len().max(1);
node_of_core(topology, core_id)
}
fn find_underloaded_in_node(
exclude: usize,
node: usize,
lengths: &[f64],
mean: f64,
topology: &NumaTopology,
) -> Option<usize> {
let cores_on_node = cores_in_node(topology, node);
let n = lengths.len();
let candidates: Vec<usize> = (0..n)
.filter(|&w| {
if w == exclude {
return false;
}
let core_id = w % topology.cores.len().max(1);
cores_on_node.contains(&core_id)
})
.filter(|&w| lengths[w] < mean - 0.5)
.collect();
candidates.into_iter().min_by(|&a, &b| {
lengths[a]
.partial_cmp(&lengths[b])
.unwrap_or(std::cmp::Ordering::Equal)
})
}
fn find_underloaded_any(exclude: usize, lengths: &[f64], mean: f64) -> Option<usize> {
(0..lengths.len())
.filter(|&w| w != exclude && lengths[w] < mean - 0.5)
.min_by(|&a, &b| {
lengths[a]
.partial_cmp(&lengths[b])
.unwrap_or(std::cmp::Ordering::Equal)
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::numa_scheduler::types::NumaTopology;
fn topo() -> NumaTopology {
NumaTopology::from_config(2, 4)
}
#[test]
fn test_load_imbalance_balanced() {
let queues = vec![4, 4, 4, 4];
let cov = load_imbalance(&queues);
assert!(cov < 1e-9, "CoV should be ~0 for equal queues, got {}", cov);
}
#[test]
fn test_load_imbalance_skewed() {
let queues = vec![0, 0, 0, 100];
let cov = load_imbalance(&queues);
assert!(
cov > 0.5,
"CoV should be high for skewed queues, got {}",
cov
);
}
#[test]
fn test_should_migrate_threshold() {
assert!(should_migrate(0.6, 0.5));
assert!(!should_migrate(0.4, 0.5));
assert!(!should_migrate(0.5, 0.5)); }
#[test]
fn test_migration_plan_intra_numa() {
let t = topo();
let queues = vec![20usize, 0, 2, 2, 2, 2, 2, 2];
let plan = migration_plan(&queues, &t);
assert!(!plan.is_empty(), "Expected at least one migration");
let (from, to, _n) = plan[0];
assert_eq!(from, 0);
assert!(to < 4, "Expected intra-NUMA target (0..4), got {}", to);
}
#[test]
fn test_migration_cost_local_cheaper() {
let t = topo();
let cost_local = migration_cost(0, 1, &t, 4);
let cost_remote = migration_cost(0, 4, &t, 4);
assert!(
cost_local < cost_remote,
"Local migration ({}) should be cheaper than remote ({})",
cost_local,
cost_remote
);
}
#[test]
fn test_migration_plan_no_op_when_balanced() {
let t = topo();
let queues = vec![5, 5, 5, 5, 5, 5, 5, 5];
let plan = migration_plan(&queues, &t);
assert!(plan.is_empty(), "No migrations needed for balanced queues");
}
#[test]
fn test_worker_numa_node_mapping() {
let t = topo(); for w in 0..4 {
assert_eq!(
worker_numa_node(w, &t),
0,
"worker {} should be on node 0",
w
);
}
for w in 4..8 {
assert_eq!(
worker_numa_node(w, &t),
1,
"worker {} should be on node 1",
w
);
}
}
}