use std::collections::BTreeMap;
use super::identity::NodeIdentity;
use super::membership::MembershipCatalog;
use super::ownership::{CollectionId, RangeId, ShardOwnershipCatalog};
pub const NEUTRAL_OPERATOR_WEIGHT: u32 = 100;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct MemberCapacity {
pub usable_disk_bytes: u64,
pub operator_weight: u32,
}
impl MemberCapacity {
pub fn new(usable_disk_bytes: u64, operator_weight: u32) -> Self {
Self {
usable_disk_bytes,
operator_weight,
}
}
pub fn with_disk(usable_disk_bytes: u64) -> Self {
Self::new(usable_disk_bytes, NEUTRAL_OPERATOR_WEIGHT)
}
pub fn weighted_capacity(&self) -> u128 {
self.usable_disk_bytes as u128 * self.operator_weight as u128
/ NEUTRAL_OPERATOR_WEIGHT as u128
}
pub fn is_placeable(&self) -> bool {
self.weighted_capacity() > 0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct RangeLoad {
pub bytes_used: u64,
pub read_ops: u64,
pub write_ops: u64,
}
impl RangeLoad {
pub fn idle(bytes_used: u64) -> Self {
Self {
bytes_used,
read_ops: 0,
write_ops: 0,
}
}
pub fn traffic(&self) -> u64 {
self.read_ops.saturating_add(self.write_ops)
}
}
pub trait PlacementSignals {
fn member_capacity(&self, member: &NodeIdentity) -> MemberCapacity;
fn range_load(&self, collection: &CollectionId, range_id: RangeId) -> RangeLoad;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MoveReason {
CapacityBalance,
HotspotRelief,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PlannedMove {
pub collection: CollectionId,
pub range_id: RangeId,
pub from: NodeIdentity,
pub to: NodeIdentity,
pub bytes: u64,
pub reason: MoveReason,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct HotspotRange {
pub collection: CollectionId,
pub range_id: RangeId,
pub owner: NodeIdentity,
pub traffic: u64,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct RebalancePlan {
pub moves: Vec<PlannedMove>,
pub hotspots: Vec<HotspotRange>,
}
impl RebalancePlan {
pub fn is_empty(&self) -> bool {
self.moves.is_empty() && self.hotspots.is_empty()
}
pub fn no_moves(&self) -> bool {
self.moves.is_empty()
}
pub fn capacity_moves(&self) -> impl Iterator<Item = &PlannedMove> {
self.moves
.iter()
.filter(|m| m.reason == MoveReason::CapacityBalance)
}
pub fn hotspot_moves(&self) -> impl Iterator<Item = &PlannedMove> {
self.moves
.iter()
.filter(|m| m.reason == MoveReason::HotspotRelief)
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct PlacementPolicy {
pub balance_tolerance: f64,
pub hotspot_load_factor: f64,
}
impl Default for PlacementPolicy {
fn default() -> Self {
Self {
balance_tolerance: 0.10,
hotspot_load_factor: 2.0,
}
}
}
fn fair_share(total_bytes: u64, member_capacity: u128, total_capacity: u128) -> u64 {
if total_capacity == 0 {
return 0;
}
let share = total_bytes as u128 * member_capacity / total_capacity;
share.min(u64::MAX as u128) as u64
}
#[derive(Debug, Clone, Default)]
pub struct WeightedPlacementPlanner {
policy: PlacementPolicy,
}
impl WeightedPlacementPlanner {
pub fn new(policy: PlacementPolicy) -> Self {
Self { policy }
}
pub fn policy(&self) -> &PlacementPolicy {
&self.policy
}
pub fn plan_rebalance(
&self,
membership: &MembershipCatalog,
ownership: &ShardOwnershipCatalog,
signals: &impl PlacementSignals,
) -> RebalancePlan {
let mut state = ClusterState::observe(membership, ownership, signals, &self.policy);
let mut moves = state.plan_capacity_moves(&self.policy);
let (hotspots, hotspot_moves) = state.plan_hotspot_moves(&self.policy);
moves.extend(hotspot_moves);
RebalancePlan { moves, hotspots }
}
}
struct ClusterState {
eligible: Vec<NodeIdentity>,
weighted_capacity: BTreeMap<NodeIdentity, u128>,
total_capacity: u128,
total_bytes: u64,
ranges: BTreeMap<(CollectionId, RangeId), RangeFacts>,
owner_of: BTreeMap<(CollectionId, RangeId), NodeIdentity>,
origin_owner: BTreeMap<(CollectionId, RangeId), NodeIdentity>,
used: BTreeMap<NodeIdentity, u64>,
load: BTreeMap<NodeIdentity, u64>,
moved: std::collections::BTreeSet<(CollectionId, RangeId)>,
}
#[derive(Clone, Copy)]
struct RangeFacts {
bytes: u64,
traffic: u64,
}
impl ClusterState {
fn observe(
membership: &MembershipCatalog,
ownership: &ShardOwnershipCatalog,
signals: &impl PlacementSignals,
_policy: &PlacementPolicy,
) -> Self {
let mut weighted_capacity = BTreeMap::new();
let mut eligible = Vec::new();
let mut total_capacity: u128 = 0;
for member in membership.placement_eligible_members() {
let id = member.identity().clone();
let cap = signals.member_capacity(&id).weighted_capacity();
if cap == 0 {
continue;
}
total_capacity += cap;
weighted_capacity.insert(id.clone(), cap);
eligible.push(id);
}
let eligible_set: std::collections::BTreeSet<&NodeIdentity> = eligible.iter().collect();
let mut ranges = BTreeMap::new();
let mut owner_of = BTreeMap::new();
let mut origin_owner = BTreeMap::new();
let mut used: BTreeMap<NodeIdentity, u64> =
eligible.iter().map(|id| (id.clone(), 0)).collect();
let mut load: BTreeMap<NodeIdentity, u64> =
eligible.iter().map(|id| (id.clone(), 0)).collect();
let mut total_bytes: u64 = 0;
for entry in ownership.entries() {
let owner = entry.owner().clone();
if !eligible_set.contains(&owner) {
continue;
}
let key = (entry.collection().clone(), entry.range_id());
let load_facts = signals.range_load(entry.collection(), entry.range_id());
ranges.insert(
key.clone(),
RangeFacts {
bytes: load_facts.bytes_used,
traffic: load_facts.traffic(),
},
);
*used.get_mut(&owner).unwrap() += load_facts.bytes_used;
*load.get_mut(&owner).unwrap() += load_facts.traffic();
total_bytes = total_bytes.saturating_add(load_facts.bytes_used);
owner_of.insert(key.clone(), owner.clone());
origin_owner.insert(key, owner);
}
Self {
eligible,
weighted_capacity,
total_capacity,
total_bytes,
ranges,
owner_of,
origin_owner,
used,
load,
moved: std::collections::BTreeSet::new(),
}
}
fn fair(&self, member: &NodeIdentity) -> u64 {
let cap = self.weighted_capacity.get(member).copied().unwrap_or(0);
fair_share(self.total_bytes, cap, self.total_capacity)
}
fn ranges_owned_by(&self, member: &NodeIdentity) -> Vec<(CollectionId, RangeId)> {
self.owner_of
.iter()
.filter(|(key, owner)| *owner == member && !self.moved.contains(*key))
.map(|(key, _)| key.clone())
.collect()
}
fn apply_move(&mut self, key: &(CollectionId, RangeId), to: &NodeIdentity) {
let facts = self.ranges[key];
let from = self.owner_of[key].clone();
*self.used.get_mut(&from).unwrap() -= facts.bytes;
*self.load.get_mut(&from).unwrap() -= facts.traffic;
*self.used.get_mut(to).unwrap() += facts.bytes;
*self.load.get_mut(to).unwrap() += facts.traffic;
self.owner_of.insert(key.clone(), to.clone());
self.moved.insert(key.clone());
}
fn plan_capacity_moves(&mut self, policy: &PlacementPolicy) -> Vec<PlannedMove> {
let mut planned = Vec::new();
if self.total_capacity == 0 || self.eligible.len() < 2 {
return planned;
}
while let Some(source) = self.most_over(policy) {
let Some(target) = self.most_under(&source) else {
break;
};
let dev_src = self.deviation(&source);
let dev_tgt = self.deviation(&target);
let worst_before = dev_src.abs().max(dev_tgt.abs());
let mut best: Option<((CollectionId, RangeId), f64)> = None;
for key in self.ranges_owned_by(&source) {
let s = self.ranges[&key].bytes as f64;
let after = (dev_src - s).abs().max((dev_tgt + s).abs());
let better = match &best {
None => true,
Some((_, best_after)) => after < *best_after,
};
if better {
best = Some((key, after));
}
}
let Some((key, worst_after)) = best else {
break;
};
if worst_after >= worst_before {
break;
}
let bytes = self.ranges[&key].bytes;
let from = self.origin_owner[&key].clone();
self.apply_move(&key, &target);
planned.push(PlannedMove {
collection: key.0,
range_id: key.1,
from,
to: target,
bytes,
reason: MoveReason::CapacityBalance,
});
}
planned
}
fn deviation(&self, member: &NodeIdentity) -> f64 {
self.used.get(member).copied().unwrap_or(0) as f64 - self.fair(member) as f64
}
fn most_over(&self, policy: &PlacementPolicy) -> Option<NodeIdentity> {
self.eligible
.iter()
.filter(|id| {
let used = self.used.get(*id).copied().unwrap_or(0) as f64;
let fair = self.fair(id) as f64;
used > fair * (1.0 + policy.balance_tolerance) && used > fair
})
.max_by(|a, b| {
self.deviation(a)
.partial_cmp(&self.deviation(b))
.unwrap()
.then_with(|| b.cmp(a))
})
.cloned()
}
fn most_under(&self, source: &NodeIdentity) -> Option<NodeIdentity> {
self.eligible
.iter()
.filter(|id| *id != source && self.deviation(id) < 0.0)
.min_by(|a, b| {
self.deviation(a)
.partial_cmp(&self.deviation(b))
.unwrap()
.then_with(|| a.cmp(b))
})
.cloned()
}
fn plan_hotspot_moves(
&mut self,
policy: &PlacementPolicy,
) -> (Vec<HotspotRange>, Vec<PlannedMove>) {
let mut hotspots = Vec::new();
let mut moves = Vec::new();
let range_count = self.ranges.len();
if range_count == 0 {
return (hotspots, moves);
}
let total_traffic: u64 = self.ranges.values().map(|f| f.traffic).sum();
let mean = total_traffic as f64 / range_count as f64;
let threshold = mean * policy.hotspot_load_factor;
if mean <= 0.0 {
return (hotspots, moves);
}
let mut hot: Vec<((CollectionId, RangeId), u64)> = self
.ranges
.iter()
.filter(|(_, f)| f.traffic as f64 > threshold)
.map(|(key, f)| (key.clone(), f.traffic))
.collect();
hot.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
for (key, traffic) in hot {
let owner = self.owner_of[&key].clone();
hotspots.push(HotspotRange {
collection: key.0.clone(),
range_id: key.1,
owner: self.origin_owner[&key].clone(),
traffic,
});
if self.moved.contains(&key) || self.ranges_owned_by(&owner).len() < 2 {
continue;
}
let facts = self.ranges[&key];
let owner_load = self.load.get(&owner).copied().unwrap_or(0);
let target = self
.eligible
.iter()
.filter(|id| **id != owner)
.filter(|id| {
let used = self.used.get(*id).copied().unwrap_or(0);
let fair = self.fair(id) as f64;
(used + facts.bytes) as f64 <= fair * (1.0 + policy.balance_tolerance)
})
.filter(|id| {
let tgt_load = self.load.get(*id).copied().unwrap_or(0);
tgt_load + facts.traffic < owner_load
})
.min_by(|a, b| {
let la = self.load.get(*a).copied().unwrap_or(0);
let lb = self.load.get(*b).copied().unwrap_or(0);
la.cmp(&lb).then_with(|| a.cmp(b))
})
.cloned();
if let Some(target) = target {
let from = self.origin_owner[&key].clone();
self.apply_move(&key, &target);
moves.push(PlannedMove {
collection: key.0,
range_id: key.1,
from,
to: target,
bytes: facts.bytes,
reason: MoveReason::HotspotRelief,
});
}
}
(hotspots, moves)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cluster::membership::{ClusterId, ClusterMember, MemberKind};
use crate::cluster::ownership::{PlacementMetadata, RangeBounds, RangeOwnership, ShardKeyMode};
use std::collections::HashMap;
fn ident(cn: &str) -> NodeIdentity {
NodeIdentity::from_certificate_subject(cn).unwrap()
}
fn collection(name: &str) -> CollectionId {
CollectionId::new(name).unwrap()
}
fn data_member(cn: &str) -> ClusterMember {
ClusterMember::joined_empty(ident(cn), MemberKind::Data)
}
fn membership(members: &[&str]) -> MembershipCatalog {
MembershipCatalog::new(
ClusterId::new("cluster-x").unwrap(),
members.iter().map(|m| data_member(m)),
)
}
fn catalog(owners: &[&str]) -> (ShardOwnershipCatalog, CollectionId) {
let orders = collection("orders");
let mut catalog = ShardOwnershipCatalog::new();
for (i, owner) in owners.iter().enumerate() {
let lower = vec![i as u8];
let upper = vec![i as u8 + 1];
let bounds = RangeBounds::new(
crate::cluster::ownership::RangeBound::key(lower),
crate::cluster::ownership::RangeBound::key(upper),
)
.unwrap();
catalog
.apply_update(RangeOwnership::establish(
orders.clone(),
RangeId::new(i as u64 + 1),
ShardKeyMode::Hash,
bounds,
ident(owner),
Vec::<NodeIdentity>::new(),
PlacementMetadata::with_replication_factor(1),
))
.unwrap();
}
(catalog, orders)
}
struct FakeSignals {
default_capacity: MemberCapacity,
capacity: HashMap<NodeIdentity, MemberCapacity>,
load: HashMap<u64, RangeLoad>,
default_bytes: u64,
}
impl FakeSignals {
fn uniform(disk: u64, default_bytes: u64) -> Self {
Self {
default_capacity: MemberCapacity::with_disk(disk),
capacity: HashMap::new(),
load: HashMap::new(),
default_bytes,
}
}
fn with_capacity(mut self, cn: &str, cap: MemberCapacity) -> Self {
self.capacity.insert(ident(cn), cap);
self
}
fn with_load(mut self, range_id: u64, load: RangeLoad) -> Self {
self.load.insert(range_id, load);
self
}
}
impl PlacementSignals for FakeSignals {
fn member_capacity(&self, member: &NodeIdentity) -> MemberCapacity {
self.capacity
.get(member)
.copied()
.unwrap_or(self.default_capacity)
}
fn range_load(&self, _collection: &CollectionId, range_id: RangeId) -> RangeLoad {
self.load
.get(&range_id.value())
.copied()
.unwrap_or_else(|| RangeLoad::idle(self.default_bytes))
}
}
#[test]
fn weighted_capacity_scales_disk_by_operator_weight() {
assert_eq!(MemberCapacity::with_disk(1_000).weighted_capacity(), 1_000);
assert_eq!(MemberCapacity::new(1_000, 200).weighted_capacity(), 2_000);
assert_eq!(MemberCapacity::new(1_000, 50).weighted_capacity(), 500);
assert!(!MemberCapacity::with_disk(0).is_placeable());
assert!(MemberCapacity::with_disk(1).is_placeable());
}
#[test]
fn homogeneous_cluster_is_balanced_and_plans_nothing() {
let planner = WeightedPlacementPlanner::default();
let members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
let (catalog, _orders) = catalog(&["CN=node-a", "CN=node-b", "CN=node-c"]);
let signals = FakeSignals::uniform(1_000_000, 100);
let plan = planner.plan_rebalance(&members, &catalog, &signals);
assert!(plan.is_empty(), "balanced homogeneous cluster is a no-op");
}
#[test]
fn homogeneous_cluster_with_skew_spreads_ranges() {
let planner = WeightedPlacementPlanner::default();
let members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
let (catalog, _orders) = catalog(&["CN=node-a", "CN=node-a", "CN=node-a"]);
let signals = FakeSignals::uniform(1_000_000, 100);
let plan = planner.plan_rebalance(&members, &catalog, &signals);
assert_eq!(
plan.capacity_moves().count(),
2,
"two ranges move off node-a"
);
for mv in plan.capacity_moves() {
assert_eq!(mv.from, ident("CN=node-a"));
assert_ne!(mv.to, ident("CN=node-a"));
assert_eq!(mv.reason, MoveReason::CapacityBalance);
}
let targets: std::collections::BTreeSet<_> =
plan.capacity_moves().map(|m| m.to.clone()).collect();
assert_eq!(targets.len(), 2);
}
#[test]
fn heterogeneous_disk_weights_apportion_by_capacity() {
let planner = WeightedPlacementPlanner::default();
let members = membership(&["CN=node-big", "CN=node-small"]);
let (catalog, _orders) = catalog(&[
"CN=node-small",
"CN=node-small",
"CN=node-small",
"CN=node-small",
"CN=node-small",
"CN=node-small",
]);
let signals = FakeSignals::uniform(1_000, 100)
.with_capacity("CN=node-big", MemberCapacity::with_disk(4_000))
.with_capacity("CN=node-small", MemberCapacity::with_disk(1_000));
let plan = planner.plan_rebalance(&members, &catalog, &signals);
assert!(!plan.no_moves(), "imbalanced cluster must plan moves");
let to_big = plan
.capacity_moves()
.filter(|m| m.to == ident("CN=node-big"))
.count();
assert!(
(4..=5).contains(&to_big),
"node-big should receive ~4/5 of 6 ranges, got {to_big}"
);
for mv in plan.capacity_moves() {
assert_eq!(mv.from, ident("CN=node-small"));
assert_eq!(mv.to, ident("CN=node-big"));
}
}
#[test]
fn operator_weight_biases_placement_without_more_disk() {
let planner = WeightedPlacementPlanner::default();
let members = membership(&["CN=node-pref", "CN=node-plain"]);
let (catalog, _orders) = catalog(&[
"CN=node-plain",
"CN=node-plain",
"CN=node-plain",
"CN=node-plain",
]);
let signals = FakeSignals::uniform(1_000, 100)
.with_capacity("CN=node-pref", MemberCapacity::new(1_000, 300));
let plan = planner.plan_rebalance(&members, &catalog, &signals);
let to_pref = plan
.capacity_moves()
.filter(|m| m.to == ident("CN=node-pref"))
.count();
assert!(
to_pref >= 2,
"higher operator weight pulls more ranges, got {to_pref}"
);
}
#[test]
fn expanding_disk_changes_weight_and_next_plan_without_moving_data() {
let planner = WeightedPlacementPlanner::default();
let members = membership(&["CN=node-a", "CN=node-b"]);
let (catalog, orders) = catalog(&[
"CN=node-a",
"CN=node-a",
"CN=node-a",
"CN=node-a",
"CN=node-a",
"CN=node-a",
]);
let before_signals = FakeSignals::uniform(1_000, 100)
.with_capacity("CN=node-a", MemberCapacity::with_disk(3_000))
.with_capacity("CN=node-b", MemberCapacity::with_disk(1_000));
let before = planner.plan_rebalance(&members, &catalog, &before_signals);
let before_to_b = before
.capacity_moves()
.filter(|m| m.to == ident("CN=node-b"))
.count();
let small = MemberCapacity::with_disk(1_000);
let expanded = MemberCapacity::with_disk(8_000);
assert!(
expanded.weighted_capacity() > small.weighted_capacity(),
"expanding disk raises placement weight",
);
let after_signals = FakeSignals::uniform(1_000, 100)
.with_capacity("CN=node-a", MemberCapacity::with_disk(3_000))
.with_capacity("CN=node-b", expanded);
let after = planner.plan_rebalance(&members, &catalog, &after_signals);
let after_to_b = after
.capacity_moves()
.filter(|m| m.to == ident("CN=node-b"))
.count();
assert!(
after_to_b > before_to_b,
"expanded disk pulls more ranges on the next plan ({before_to_b} -> {after_to_b})",
);
for i in 1..=6 {
let range = catalog.range(&orders, RangeId::new(i)).unwrap();
assert_eq!(
range.owner(),
&ident("CN=node-a"),
"range {i} stayed on node-a; planning moved nothing",
);
}
}
#[test]
fn hotspot_traffic_identifies_secondary_candidate() {
let planner = WeightedPlacementPlanner::default();
let members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
let (catalog, _orders) = catalog(&["CN=node-a", "CN=node-a", "CN=node-b", "CN=node-c"]);
let signals = FakeSignals::uniform(0, 0)
.with_capacity("CN=node-a", MemberCapacity::with_disk(2_000))
.with_capacity("CN=node-b", MemberCapacity::with_disk(1_000))
.with_capacity("CN=node-c", MemberCapacity::with_disk(1_000))
.with_load(
1,
RangeLoad {
bytes_used: 2,
read_ops: 1_000,
write_ops: 1_000,
},
)
.with_load(
2,
RangeLoad {
bytes_used: 38,
read_ops: 300,
write_ops: 0,
},
)
.with_load(
3,
RangeLoad {
bytes_used: 20,
read_ops: 100,
write_ops: 0,
},
)
.with_load(
4,
RangeLoad {
bytes_used: 20,
read_ops: 100,
write_ops: 0,
},
);
let plan = planner.plan_rebalance(&members, &catalog, &signals);
assert_eq!(plan.capacity_moves().count(), 0, "capacity is balanced");
assert_eq!(plan.hotspots.len(), 1, "the hot range is surfaced");
assert_eq!(plan.hotspots[0].range_id, RangeId::new(1));
assert_eq!(plan.hotspots[0].owner, ident("CN=node-a"));
assert_eq!(plan.hotspots[0].traffic, 2_000);
let relief: Vec<_> = plan.hotspot_moves().collect();
assert_eq!(relief.len(), 1, "a relief move is planned");
assert_eq!(relief[0].range_id, RangeId::new(1));
assert_eq!(relief[0].from, ident("CN=node-a"));
assert_eq!(
relief[0].to,
ident("CN=node-b"),
"quietest target, tie -> lowest id"
);
assert_eq!(relief[0].reason, MoveReason::HotspotRelief);
}
#[test]
fn no_hotspot_when_traffic_is_even() {
let planner = WeightedPlacementPlanner::default();
let members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
let (catalog, _orders) = catalog(&["CN=node-a", "CN=node-b", "CN=node-c"]);
let signals = FakeSignals::uniform(1_000_000, 100)
.with_load(
1,
RangeLoad {
bytes_used: 10,
read_ops: 100,
write_ops: 100,
},
)
.with_load(
2,
RangeLoad {
bytes_used: 10,
read_ops: 100,
write_ops: 100,
},
)
.with_load(
3,
RangeLoad {
bytes_used: 10,
read_ops: 100,
write_ops: 100,
},
);
let plan = planner.plan_rebalance(&members, &catalog, &signals);
assert!(plan.is_empty(), "balanced, even-traffic cluster is a no-op");
}
#[test]
fn planning_never_mutates_the_catalog() {
let planner = WeightedPlacementPlanner::default();
let members = membership(&["CN=node-a", "CN=node-b"]);
let (catalog, orders) = catalog(&["CN=node-a", "CN=node-a", "CN=node-a", "CN=node-a"]);
let signals = FakeSignals::uniform(1_000, 100);
let before: Vec<_> = (1..=4)
.map(|i| {
let r = catalog.range(&orders, RangeId::new(i)).unwrap();
(r.owner().clone(), r.epoch(), r.version())
})
.collect();
let plan = planner.plan_rebalance(&members, &catalog, &signals);
assert!(!plan.no_moves(), "skewed cluster does plan moves");
for (i, snap) in before.iter().enumerate() {
let r = catalog.range(&orders, RangeId::new(i as u64 + 1)).unwrap();
assert_eq!(&(r.owner().clone(), r.epoch(), r.version()), snap);
}
}
#[test]
fn draining_owner_ranges_are_left_to_the_drain_flow() {
let planner = WeightedPlacementPlanner::default();
let mut members = membership(&["CN=node-a", "CN=node-b"]);
members.begin_drain(&ident("CN=node-a"));
let (catalog, _orders) = catalog(&["CN=node-a", "CN=node-a", "CN=node-a"]);
let signals = FakeSignals::uniform(1_000, 100);
let plan = planner.plan_rebalance(&members, &catalog, &signals);
assert!(
plan.no_moves(),
"draining owner's ranges are not rebalanced"
);
}
#[test]
fn single_member_cluster_plans_nothing() {
let planner = WeightedPlacementPlanner::default();
let members = membership(&["CN=node-a"]);
let (catalog, _orders) = catalog(&["CN=node-a", "CN=node-a"]);
let signals = FakeSignals::uniform(1_000, 100);
let plan = planner.plan_rebalance(&members, &catalog, &signals);
assert!(
plan.no_moves(),
"nowhere to move ranges in a one-member cluster"
);
}
}