use crate::record::distributed_region::ReplicaInfo;
use crate::types::symbol::Symbol;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AssignmentStrategy {
Full,
Striped,
MinimumK,
Weighted,
}
#[derive(Debug)]
pub struct SymbolAssigner {
strategy: AssignmentStrategy,
}
impl SymbolAssigner {
#[inline]
#[must_use]
pub const fn new(strategy: AssignmentStrategy) -> Self {
Self { strategy }
}
#[inline]
#[must_use]
pub const fn strategy(&self) -> AssignmentStrategy {
self.strategy
}
#[must_use]
pub fn assign(
&self,
symbols: &[Symbol],
replicas: &[ReplicaInfo],
k: u16,
) -> Vec<ReplicaAssignment> {
if replicas.is_empty() || symbols.is_empty() {
return Vec::new();
}
match self.strategy {
AssignmentStrategy::Full => Self::assign_full(symbols, replicas, k),
AssignmentStrategy::Striped => Self::assign_striped(symbols, replicas, k),
AssignmentStrategy::MinimumK => Self::assign_minimum_k(symbols, replicas, k),
AssignmentStrategy::Weighted => Self::assign_weighted(symbols, replicas, k),
}
}
fn assign_full(symbols: &[Symbol], replicas: &[ReplicaInfo], k: u16) -> Vec<ReplicaAssignment> {
let all_indices: Vec<usize> = (0..symbols.len()).collect();
replicas
.iter()
.map(|r| ReplicaAssignment {
replica_id: r.id.clone(),
symbol_indices: all_indices.clone(),
can_decode: symbols.len() >= k as usize,
})
.collect()
}
fn assign_striped(
symbols: &[Symbol],
replicas: &[ReplicaInfo],
k: u16,
) -> Vec<ReplicaAssignment> {
let n = replicas.len();
let mut assignments: Vec<Vec<usize>> = vec![Vec::new(); n];
for (i, _) in symbols.iter().enumerate() {
assignments[i % n].push(i);
}
replicas
.iter()
.enumerate()
.map(|(i, r)| {
let indices = &assignments[i];
ReplicaAssignment {
replica_id: r.id.clone(),
symbol_indices: indices.clone(),
can_decode: indices.len() >= k as usize,
}
})
.collect()
}
fn assign_minimum_k(
symbols: &[Symbol],
replicas: &[ReplicaInfo],
k: u16,
) -> Vec<ReplicaAssignment> {
let k_usize = k as usize;
replicas
.iter()
.enumerate()
.map(|(replica_idx, r)| {
let mut indices = Vec::with_capacity(k_usize);
for j in 0..std::cmp::min(k_usize, symbols.len()) {
let idx = (replica_idx * symbols.len() / replicas.len() + j) % symbols.len();
if !indices.contains(&idx) {
indices.push(idx);
}
}
let mut fill = 0;
while indices.len() < k_usize && fill < symbols.len() {
if !indices.contains(&fill) {
indices.push(fill);
}
fill += 1;
}
ReplicaAssignment {
replica_id: r.id.clone(),
can_decode: indices.len() >= k_usize,
symbol_indices: indices,
}
})
.collect()
}
fn assign_weighted(
symbols: &[Symbol],
replicas: &[ReplicaInfo],
k: u16,
) -> Vec<ReplicaAssignment> {
let mut assignments: Vec<Vec<usize>> = vec![Vec::new(); replicas.len()];
let mut assigned_counts = vec![0_u64; replicas.len()];
for (symbol_idx, _) in symbols.iter().enumerate() {
let mut best_idx = 0usize;
let mut best_projected_total =
u64::from(replicas[best_idx].symbol_count) + assigned_counts[best_idx];
for candidate_idx in 1..replicas.len() {
let candidate_projected_total = u64::from(replicas[candidate_idx].symbol_count)
+ assigned_counts[candidate_idx];
if candidate_projected_total < best_projected_total
|| (candidate_projected_total == best_projected_total
&& assigned_counts[candidate_idx] < assigned_counts[best_idx])
{
best_idx = candidate_idx;
best_projected_total = candidate_projected_total;
}
}
assignments[best_idx].push(symbol_idx);
assigned_counts[best_idx] += 1;
}
replicas
.iter()
.enumerate()
.map(|(replica_idx, replica)| {
let indices = &assignments[replica_idx];
ReplicaAssignment {
replica_id: replica.id.clone(),
symbol_indices: indices.clone(),
can_decode: indices.len() >= k as usize,
}
})
.collect()
}
}
#[derive(Debug, Clone)]
pub struct ReplicaAssignment {
pub replica_id: String,
pub symbol_indices: Vec<usize>,
pub can_decode: bool,
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_replicas(count: usize) -> Vec<ReplicaInfo> {
(0..count)
.map(|i| ReplicaInfo::new(&format!("r{i}"), &format!("addr{i}")))
.collect()
}
fn create_test_replicas_with_symbol_counts(symbol_counts: &[u32]) -> Vec<ReplicaInfo> {
symbol_counts
.iter()
.enumerate()
.map(|(i, &symbol_count)| {
let mut replica = ReplicaInfo::new(&format!("r{i}"), &format!("addr{i}"));
replica.symbol_count = symbol_count;
replica
})
.collect()
}
fn create_test_symbols(count: usize) -> Vec<Symbol> {
(0..count)
.map(|i| Symbol::new_for_test(1, 0, i as u32, &[0u8; 128]))
.collect()
}
#[test]
fn full_assignment_all_replicas_get_all() {
let assigner = SymbolAssigner::new(AssignmentStrategy::Full);
let symbols = create_test_symbols(10);
let replicas = create_test_replicas(3);
let assignments = assigner.assign(&symbols, &replicas, 5);
assert_eq!(assignments.len(), 3);
for assignment in &assignments {
assert_eq!(assignment.symbol_indices.len(), 10);
assert!(assignment.can_decode);
}
}
#[test]
fn striped_assignment_distributes_evenly() {
let assigner = SymbolAssigner::new(AssignmentStrategy::Striped);
let symbols = create_test_symbols(9);
let replicas = create_test_replicas(3);
let assignments = assigner.assign(&symbols, &replicas, 5);
for assignment in &assignments {
assert_eq!(assignment.symbol_indices.len(), 3);
}
}
#[test]
fn striped_no_overlap() {
let assigner = SymbolAssigner::new(AssignmentStrategy::Striped);
let symbols = create_test_symbols(12);
let replicas = create_test_replicas(3);
let assignments = assigner.assign(&symbols, &replicas, 4);
let mut all: Vec<usize> = Vec::new();
for a in &assignments {
all.extend_from_slice(&a.symbol_indices);
}
all.sort_unstable();
all.dedup();
assert_eq!(all.len(), 12, "all symbols should be assigned exactly once");
}
#[test]
fn minimum_k_assignment() {
let assigner = SymbolAssigner::new(AssignmentStrategy::MinimumK);
let symbols = create_test_symbols(15);
let replicas = create_test_replicas(3);
let assignments = assigner.assign(&symbols, &replicas, 10);
for assignment in &assignments {
assert!(
assignment.symbol_indices.len() >= 10,
"replica {} got {} symbols, need >= 10",
assignment.replica_id,
assignment.symbol_indices.len()
);
assert!(assignment.can_decode);
}
}
#[test]
fn empty_symbols_returns_empty() {
let assigner = SymbolAssigner::new(AssignmentStrategy::Full);
let symbols: Vec<Symbol> = vec![];
let replicas = create_test_replicas(3);
let assignments = assigner.assign(&symbols, &replicas, 5);
assert!(assignments.is_empty());
}
#[test]
fn empty_replicas_returns_empty() {
let assigner = SymbolAssigner::new(AssignmentStrategy::Full);
let symbols = create_test_symbols(10);
let replicas: Vec<ReplicaInfo> = vec![];
let assignments = assigner.assign(&symbols, &replicas, 5);
assert!(assignments.is_empty());
}
#[test]
fn weighted_prefers_less_loaded_replicas() {
let assigner = SymbolAssigner::new(AssignmentStrategy::Weighted);
let symbols = create_test_symbols(18);
let replicas = create_test_replicas_with_symbol_counts(&[0, 4, 9]);
let assignments = assigner.assign(&symbols, &replicas, 3);
let counts: Vec<_> = assignments
.iter()
.map(|assignment| assignment.symbol_indices.len())
.collect();
assert_eq!(counts.iter().sum::<usize>(), symbols.len());
assert!(
counts[0] > counts[1],
"lighter replica should get more symbols"
);
assert!(
counts[1] > counts[2],
"heaviest replica should get the fewest symbols"
);
let mut all_indices: Vec<_> = assignments
.iter()
.flat_map(|assignment| assignment.symbol_indices.iter().copied())
.collect();
all_indices.sort_unstable();
all_indices.dedup();
assert_eq!(
all_indices.len(),
symbols.len(),
"weighted assignment must not duplicate symbols"
);
}
#[test]
fn weighted_equal_loads_balance_like_striping() {
let assigner = SymbolAssigner::new(AssignmentStrategy::Weighted);
let symbols = create_test_symbols(10);
let replicas = create_test_replicas_with_symbol_counts(&[2, 2, 2]);
let assignments = assigner.assign(&symbols, &replicas, 3);
let counts: Vec<_> = assignments
.iter()
.map(|assignment| assignment.symbol_indices.len())
.collect();
let min = counts.iter().copied().min().unwrap_or(0);
let max = counts.iter().copied().max().unwrap_or(0);
assert_eq!(counts.iter().sum::<usize>(), symbols.len());
assert!(
max - min <= 1,
"equal loads should distribute nearly evenly, got {counts:?}"
);
}
#[test]
fn weighted_avoids_heavier_replica_until_projected_loads_match() {
let assigner = SymbolAssigner::new(AssignmentStrategy::Weighted);
let symbols = create_test_symbols(2);
let replicas = create_test_replicas_with_symbol_counts(&[0, 100]);
let assignments = assigner.assign(&symbols, &replicas, 1);
let counts: Vec<_> = assignments
.iter()
.map(|assignment| assignment.symbol_indices.len())
.collect();
assert_eq!(counts, vec![2, 0]);
}
#[test]
fn full_more_replicas_than_symbols() {
let assigner = SymbolAssigner::new(AssignmentStrategy::Full);
let symbols = create_test_symbols(3);
let replicas = create_test_replicas(10);
let assignments = assigner.assign(&symbols, &replicas, 2);
assert_eq!(assignments.len(), 10);
for a in &assignments {
assert_eq!(a.symbol_indices.len(), 3);
assert!(a.can_decode);
}
}
#[test]
fn full_single_symbol() {
let assigner = SymbolAssigner::new(AssignmentStrategy::Full);
let symbols = create_test_symbols(1);
let replicas = create_test_replicas(3);
let assignments = assigner.assign(&symbols, &replicas, 1);
for a in &assignments {
assert_eq!(a.symbol_indices.len(), 1);
assert!(a.can_decode);
}
}
#[test]
fn full_k_greater_than_symbol_count() {
let assigner = SymbolAssigner::new(AssignmentStrategy::Full);
let symbols = create_test_symbols(5);
let replicas = create_test_replicas(2);
let assignments = assigner.assign(&symbols, &replicas, 10);
for a in &assignments {
assert_eq!(a.symbol_indices.len(), 5);
assert!(!a.can_decode);
}
}
#[test]
fn striped_uneven_distribution() {
let assigner = SymbolAssigner::new(AssignmentStrategy::Striped);
let symbols = create_test_symbols(10);
let replicas = create_test_replicas(3);
let assignments = assigner.assign(&symbols, &replicas, 3);
let total: usize = assignments.iter().map(|a| a.symbol_indices.len()).sum();
assert_eq!(total, 10, "all symbols assigned");
for a in &assignments {
assert!(!a.symbol_indices.is_empty());
assert!(a.symbol_indices.len() <= 4);
}
}
#[test]
fn striped_single_replica() {
let assigner = SymbolAssigner::new(AssignmentStrategy::Striped);
let symbols = create_test_symbols(5);
let replicas = create_test_replicas(1);
let assignments = assigner.assign(&symbols, &replicas, 3);
assert_eq!(assignments.len(), 1);
assert_eq!(assignments[0].symbol_indices.len(), 5);
assert!(assignments[0].can_decode);
}
#[test]
fn striped_more_replicas_than_symbols() {
let assigner = SymbolAssigner::new(AssignmentStrategy::Striped);
let symbols = create_test_symbols(3);
let replicas = create_test_replicas(5);
let assignments = assigner.assign(&symbols, &replicas, 2);
let total: usize = assignments.iter().map(|a| a.symbol_indices.len()).sum();
assert_eq!(total, 3);
let nonempty = assignments
.iter()
.filter(|a| !a.symbol_indices.is_empty())
.count();
assert_eq!(nonempty, 3);
}
#[test]
fn minimum_k_single_replica() {
let assigner = SymbolAssigner::new(AssignmentStrategy::MinimumK);
let symbols = create_test_symbols(10);
let replicas = create_test_replicas(1);
let assignments = assigner.assign(&symbols, &replicas, 5);
assert_eq!(assignments.len(), 1);
assert!(assignments[0].symbol_indices.len() >= 5);
assert!(assignments[0].can_decode);
}
#[test]
fn minimum_k_k_equals_symbol_count() {
let assigner = SymbolAssigner::new(AssignmentStrategy::MinimumK);
let symbols = create_test_symbols(5);
let replicas = create_test_replicas(3);
let assignments = assigner.assign(&symbols, &replicas, 5);
for a in &assignments {
assert_eq!(a.symbol_indices.len(), 5);
assert!(a.can_decode);
}
}
#[test]
fn minimum_k_k_greater_than_symbols() {
let assigner = SymbolAssigner::new(AssignmentStrategy::MinimumK);
let symbols = create_test_symbols(5);
let replicas = create_test_replicas(2);
let assignments = assigner.assign(&symbols, &replicas, 10);
for a in &assignments {
assert!(!a.can_decode);
}
}
#[test]
fn minimum_k_no_duplicate_indices() {
let assigner = SymbolAssigner::new(AssignmentStrategy::MinimumK);
let symbols = create_test_symbols(20);
let replicas = create_test_replicas(4);
let assignments = assigner.assign(&symbols, &replicas, 8);
for a in &assignments {
let mut sorted = a.symbol_indices.clone();
sorted.sort_unstable();
sorted.dedup();
assert_eq!(
sorted.len(),
a.symbol_indices.len(),
"no duplicate indices for replica {}",
a.replica_id
);
}
}
#[test]
fn strategy_accessor() {
let assigner = SymbolAssigner::new(AssignmentStrategy::Striped);
assert_eq!(assigner.strategy(), AssignmentStrategy::Striped);
}
#[test]
fn both_empty_returns_empty() {
let assigner = SymbolAssigner::new(AssignmentStrategy::Full);
let assignments = assigner.assign(&[], &[], 5);
assert!(assignments.is_empty());
}
#[test]
fn full_k_zero() {
let assigner = SymbolAssigner::new(AssignmentStrategy::Full);
let symbols = create_test_symbols(5);
let replicas = create_test_replicas(2);
let assignments = assigner.assign(&symbols, &replicas, 0);
for a in &assignments {
assert!(a.can_decode);
}
}
#[test]
fn assignment_strategy_debug_clone_copy_eq() {
let s = AssignmentStrategy::Striped;
let dbg = format!("{s:?}");
assert!(dbg.contains("Striped"), "{dbg}");
let copied = s;
let cloned = s;
assert_eq!(copied, cloned);
assert_ne!(s, AssignmentStrategy::Full);
}
#[test]
fn replica_assignment_debug_clone() {
let ra = ReplicaAssignment {
replica_id: "r0".to_string(),
symbol_indices: vec![0, 1, 2],
can_decode: true,
};
let dbg = format!("{ra:?}");
assert!(dbg.contains("ReplicaAssignment"), "{dbg}");
let cloned = ra;
assert_eq!(cloned.replica_id, "r0");
assert_eq!(cloned.symbol_indices, [0, 1, 2]);
}
}