reddb_server/cluster/placement.rs
1//! Weighted placement and the multi-signal rebalancer planner (issue #1003,
2//! PRD #987, ADR 0037).
3//!
4//! Where the [`supervisor`](super::supervisor) reacts to a *failed* owner, this
5//! module is the proactive counterpart: it decides where ranges *should* live so
6//! the cluster's storage and traffic stay balanced as members come, go, and grow
7//! their disks. It is the glossary's **weighted placement** policy
8//! (`clustering.md`) — *"shard/range placement policy that accounts for advertised
9//! node capacity such as usable disk … and operator weights. Expanding a node's
10//! disk changes its placement weight; data moves only through explicit rebalancing
11//! transitions"* — driven by the **multi-signal rebalancer** — *"Cluster
12//! Supervisor policy that plans ownership transitions using bytes-used versus
13//! weighted capacity as the primary safety signal and read/write load as a
14//! secondary hotspot signal."*
15//!
16//! ## Two signals, one a safety floor and one a hint
17//!
18//! * **Primary — bytes-used vs weighted capacity.** Every member advertises its
19//! usable disk and an operator weight ([`MemberCapacity`]); the product is its
20//! [`weighted_capacity`](MemberCapacity::weighted_capacity), the member's share
21//! of the cluster it is *meant* to hold. The planner compares each member's
22//! bytes-used against its **fair share** (cluster bytes apportioned by weighted
23//! capacity) and proposes moving ranges off members that are over their share
24//! onto members that are under it. This is the safety signal: a member running
25//! out of disk is an availability risk, so capacity balance is what the planner
26//! acts on.
27//! * **Secondary — read/write load.** A range can be perfectly placed by bytes
28//! yet still be a **hotspot**: it absorbs a disproportionate share of the
29//! cluster's read/write traffic. The planner surfaces hotspots
30//! ([`HotspotRange`]) and, when capacity allows, proposes spreading them off
31//! their over-loaded owner. This is a hint layered on top of the capacity
32//! floor, never in place of it — a hotspot move is only taken when it does not
33//! itself create a capacity problem.
34//!
35//! ## Planning, not moving
36//!
37//! [`WeightedPlacementPlanner::plan_rebalance`] reads the membership catalog, the
38//! ownership catalog, and the live signals, and returns a [`RebalancePlan`] of
39//! [`PlannedMove`]s. It takes the ownership catalog by shared reference and
40//! **never mutates it** — *nothing moves implicitly*. Each [`PlannedMove`] is the
41//! intent for one rebalancing transition; executing it (copy the range to the
42//! target, let it catch up to the range commit watermark, then cut over through
43//! the fenced [`Handoff`](super::ownership_transition::TransitionKind::Handoff)
44//! transition machine) is a separate, explicit step. This is why *expanding a
45//! member's disk changes its placement weight but moves no data*: the new weight
46//! changes what the *next* plan proposes, and data only relocates when that plan
47//! is run.
48//!
49//! ## Purity
50//!
51//! All live state — per-member advertised capacity and per-range bytes/traffic —
52//! is read through the [`PlacementSignals`] trait, injected by the caller.
53//! Production backs it onto the disk-usage reporter and the per-range traffic
54//! counters; tests back it onto a scripted fake. The planner itself is a pure
55//! policy over the two catalogs plus those signals, so the whole weighting,
56//! balancing, and hotspot story is exercised deterministically — no disk, no
57//! clock, no network.
58
59use std::collections::BTreeMap;
60
61use super::identity::NodeIdentity;
62use super::membership::MembershipCatalog;
63use super::ownership::{CollectionId, RangeId, ShardOwnershipCatalog};
64
65/// The neutral operator weight: a member with this weight is placed strictly by
66/// its usable disk. The weight is expressed in hundredths, so `100` means a 1.0×
67/// multiplier; `200` doubles a member's placement weight and `50` halves it. An
68/// operator nudges placement without lying about disk by tuning this.
69pub const NEUTRAL_OPERATOR_WEIGHT: u32 = 100;
70
71/// A member's advertised placement capacity: how much usable disk it offers and
72/// the operator's weight multiplier on top of it.
73///
74/// The two combine into the member's [`weighted_capacity`](Self::weighted_capacity)
75/// — its share of the cluster it is meant to hold. Advertising more usable disk,
76/// or a higher operator weight, raises that share; the planner then apportions
77/// ranges toward it on the *next* plan. The struct is pure advertised state: it
78/// records what a member *offers*, never moves anything by itself.
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub struct MemberCapacity {
81 /// Usable disk the member advertises for user ranges, in bytes.
82 pub usable_disk_bytes: u64,
83 /// Operator weight in hundredths ([`NEUTRAL_OPERATOR_WEIGHT`] = 1.0×). Lets an
84 /// operator bias placement on or off a member without misreporting disk.
85 pub operator_weight: u32,
86}
87
88impl MemberCapacity {
89 /// Capacity with an explicit usable disk and operator weight.
90 pub fn new(usable_disk_bytes: u64, operator_weight: u32) -> Self {
91 Self {
92 usable_disk_bytes,
93 operator_weight,
94 }
95 }
96
97 /// Capacity from usable disk alone, at the neutral operator weight — the
98 /// common case where the operator has expressed no preference.
99 pub fn with_disk(usable_disk_bytes: u64) -> Self {
100 Self::new(usable_disk_bytes, NEUTRAL_OPERATOR_WEIGHT)
101 }
102
103 /// The member's **placement weight**: usable disk scaled by the operator
104 /// weight. This is the value the rebalancer apportions the cluster's bytes by,
105 /// and it is exactly what *expanding a member's disk changes* — a larger disk
106 /// (or a higher operator weight) yields a larger weighted capacity and so a
107 /// larger fair share on the next plan. Computed in `u128` so a large disk
108 /// times a large weight cannot overflow.
109 pub fn weighted_capacity(&self) -> u128 {
110 self.usable_disk_bytes as u128 * self.operator_weight as u128
111 / NEUTRAL_OPERATOR_WEIGHT as u128
112 }
113
114 /// Whether this member can hold any ranges at all — a member advertising no
115 /// usable disk (or a zero operator weight) has zero weighted capacity and is
116 /// never a placement target.
117 pub fn is_placeable(&self) -> bool {
118 self.weighted_capacity() > 0
119 }
120}
121
122/// The live load on one range: its on-disk size and its recent read/write
123/// traffic.
124///
125/// `bytes_used` feeds the **primary** capacity signal (it is what a member's
126/// bytes-used is summed from); `read_ops`/`write_ops` feed the **secondary**
127/// hotspot signal. Keeping both on one struct lets a single
128/// [`PlacementSignals::range_load`] call answer everything the planner needs about
129/// a range.
130#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
131pub struct RangeLoad {
132 /// The range's on-disk size in bytes — the primary capacity signal.
133 pub bytes_used: u64,
134 /// Read operations served in the recent observation window.
135 pub read_ops: u64,
136 /// Write operations served in the recent observation window.
137 pub write_ops: u64,
138}
139
140impl RangeLoad {
141 /// A range that occupies `bytes_used` but serves no traffic — handy when only
142 /// the capacity signal matters.
143 pub fn idle(bytes_used: u64) -> Self {
144 Self {
145 bytes_used,
146 read_ops: 0,
147 write_ops: 0,
148 }
149 }
150
151 /// Total read + write traffic — the hotspot signal. A range with high traffic
152 /// relative to the cluster mean is a hotspot candidate regardless of its size.
153 pub fn traffic(&self) -> u64 {
154 self.read_ops.saturating_add(self.write_ops)
155 }
156}
157
158/// The live cluster state the planner reads but does not own: each member's
159/// advertised capacity and each range's bytes/traffic.
160///
161/// Production backs this onto the disk-usage reporter and the per-range traffic
162/// counters; tests back it onto a scripted fake. Keeping it behind a trait is what
163/// makes the planner a pure policy.
164pub trait PlacementSignals {
165 /// The capacity `member` currently advertises. A member that advertises
166 /// nothing (or is unknown) should report a zero-disk [`MemberCapacity`], which
167 /// makes it un-placeable rather than a div-by-zero hazard.
168 fn member_capacity(&self, member: &NodeIdentity) -> MemberCapacity;
169
170 /// The current load on `(collection, range_id)` — its bytes and its recent
171 /// read/write traffic.
172 fn range_load(&self, collection: &CollectionId, range_id: RangeId) -> RangeLoad;
173}
174
175/// Why the planner proposed moving a range.
176#[derive(Debug, Clone, Copy, PartialEq, Eq)]
177pub enum MoveReason {
178 /// The **primary** signal: the source member is over its weighted-capacity
179 /// fair share and the target is under its own. Moving the range relieves a
180 /// disk-pressure (availability) risk.
181 CapacityBalance,
182 /// The **secondary** signal: the range is a read/write hotspot on an
183 /// over-loaded owner, and a target with both load and capacity headroom can
184 /// absorb it. Taken only when it does not create a capacity problem.
185 HotspotRelief,
186}
187
188/// One proposed rebalancing transition: move authority for a range from its
189/// current owner to a target member.
190///
191/// A [`PlannedMove`] is *intent*, not an executed transition. Carrying it out
192/// means copying the range to `to`, letting it catch up to the range commit
193/// watermark, and then cutting over through the fenced
194/// [`Handoff`](super::ownership_transition::TransitionKind::Handoff) machine — a
195/// separate, explicit step. The planner only ever produces these; it moves no
196/// data.
197#[derive(Debug, Clone, PartialEq, Eq)]
198pub struct PlannedMove {
199 pub collection: CollectionId,
200 pub range_id: RangeId,
201 /// The range's current owner in the catalog — the move's source.
202 pub from: NodeIdentity,
203 /// The proposed new owner — an active data member with capacity headroom.
204 pub to: NodeIdentity,
205 /// The range's size in bytes at planning time (what the move relocates).
206 pub bytes: u64,
207 pub reason: MoveReason,
208}
209
210/// A range the **secondary** signal flagged as a read/write hotspot: it serves
211/// traffic well above the cluster mean. Surfaced whether or not a relief move was
212/// possible, so an operator can see a hotspot even when there is no headroom to
213/// relieve it.
214#[derive(Debug, Clone, PartialEq, Eq)]
215pub struct HotspotRange {
216 pub collection: CollectionId,
217 pub range_id: RangeId,
218 /// The range's current owner — the member bearing the hot traffic.
219 pub owner: NodeIdentity,
220 /// The range's read + write traffic in the observation window.
221 pub traffic: u64,
222}
223
224/// The planner's decision for one pass: the moves to schedule and the hotspots it
225/// observed.
226///
227/// A cluster already balanced by capacity, with no hotspot, yields an empty plan
228/// ([`is_empty`](Self::is_empty)) — the no-op a stable cluster must produce.
229#[derive(Debug, Clone, Default, PartialEq, Eq)]
230pub struct RebalancePlan {
231 /// Proposed moves, in a deterministic order (capacity moves first, then
232 /// hotspot-relief moves, each in `(collection, range_id)` order).
233 pub moves: Vec<PlannedMove>,
234 /// Ranges observed to be hotspots this pass, hottest first.
235 pub hotspots: Vec<HotspotRange>,
236}
237
238impl RebalancePlan {
239 /// Nothing to schedule *and* nothing hot — a fully balanced, evenly-loaded
240 /// cluster. Distinct from [`no_moves`](Self::no_moves): a balanced cluster can
241 /// still have an *observed* hotspot it cannot relieve.
242 pub fn is_empty(&self) -> bool {
243 self.moves.is_empty() && self.hotspots.is_empty()
244 }
245
246 /// Whether the plan proposes any actual range movement. False on a cluster
247 /// that is balanced by capacity and has no relievable hotspot, even if a
248 /// hotspot was *observed*.
249 pub fn no_moves(&self) -> bool {
250 self.moves.is_empty()
251 }
252
253 /// The capacity-balance moves only (the primary signal).
254 pub fn capacity_moves(&self) -> impl Iterator<Item = &PlannedMove> {
255 self.moves
256 .iter()
257 .filter(|m| m.reason == MoveReason::CapacityBalance)
258 }
259
260 /// The hotspot-relief moves only (the secondary signal).
261 pub fn hotspot_moves(&self) -> impl Iterator<Item = &PlannedMove> {
262 self.moves
263 .iter()
264 .filter(|m| m.reason == MoveReason::HotspotRelief)
265 }
266}
267
268/// The tunables that gate when imbalance and traffic are worth a move.
269///
270/// The defaults are deliberately slack: a cluster within 10% of its fair share is
271/// "balanced enough" not to churn ownership, and a hotspot must run at 2× the
272/// cluster-mean traffic before it is worth spreading. Tight thresholds would make
273/// the planner thrash on noise.
274#[derive(Debug, Clone, Copy, PartialEq)]
275pub struct PlacementPolicy {
276 /// Fractional tolerance around a member's fair share. A member is "over" only
277 /// when its bytes-used exceeds `fair * (1 + balance_tolerance)`; a move's
278 /// target must have room under `fair * (1 + balance_tolerance)`. Larger values
279 /// tolerate more imbalance for less churn.
280 pub balance_tolerance: f64,
281 /// How many times the cluster-mean range traffic a range must serve to count
282 /// as a hotspot. `2.0` means "twice the average".
283 pub hotspot_load_factor: f64,
284}
285
286impl Default for PlacementPolicy {
287 fn default() -> Self {
288 Self {
289 balance_tolerance: 0.10,
290 hotspot_load_factor: 2.0,
291 }
292 }
293}
294
295/// The fair share of `total_bytes` that a member with `member_capacity` deserves,
296/// out of the cluster's `total_capacity`. Apportions bytes strictly by weighted
297/// capacity; `u128` math keeps a large cluster from overflowing.
298fn fair_share(total_bytes: u64, member_capacity: u128, total_capacity: u128) -> u64 {
299 if total_capacity == 0 {
300 return 0;
301 }
302 let share = total_bytes as u128 * member_capacity / total_capacity;
303 share.min(u64::MAX as u128) as u64
304}
305
306/// The weighted-placement, multi-signal rebalancer planner.
307///
308/// Holds only the [`PlacementPolicy`]; all live state is read through
309/// [`PlacementSignals`] at plan time, so one planner instance serves the whole
310/// cluster lifetime.
311#[derive(Debug, Clone, Default)]
312pub struct WeightedPlacementPlanner {
313 policy: PlacementPolicy,
314}
315
316impl WeightedPlacementPlanner {
317 /// A planner with the given policy.
318 pub fn new(policy: PlacementPolicy) -> Self {
319 Self { policy }
320 }
321
322 pub fn policy(&self) -> &PlacementPolicy {
323 &self.policy
324 }
325
326 /// Plan a rebalance across the whole ownership catalog **without** mutating
327 /// it. Runs the primary capacity-balance pass, then the secondary
328 /// hotspot-relief pass on top, and returns the combined [`RebalancePlan`].
329 /// Ranges owned by members that are not placement-eligible (draining members,
330 /// witnesses) are left to the drain flow and never moved here.
331 pub fn plan_rebalance(
332 &self,
333 membership: &MembershipCatalog,
334 ownership: &ShardOwnershipCatalog,
335 signals: &impl PlacementSignals,
336 ) -> RebalancePlan {
337 let mut state = ClusterState::observe(membership, ownership, signals, &self.policy);
338 let mut moves = state.plan_capacity_moves(&self.policy);
339 let (hotspots, hotspot_moves) = state.plan_hotspot_moves(&self.policy);
340 moves.extend(hotspot_moves);
341 RebalancePlan { moves, hotspots }
342 }
343}
344
345/// The mutable simulation the planner balances against. Built once from the live
346/// catalogs and signals, then evolved as moves are chosen so each successive move
347/// sees the effect of the ones before it. Crucially this is a *copy* of the live
348/// state — evolving it changes nothing in the real catalog, which is what makes
349/// planning side-effect free.
350struct ClusterState {
351 /// Placement-eligible members (active data members) with non-zero weighted
352 /// capacity, in stable identity order.
353 eligible: Vec<NodeIdentity>,
354 weighted_capacity: BTreeMap<NodeIdentity, u128>,
355 /// Total weighted capacity across `eligible` — the denominator of fair share.
356 total_capacity: u128,
357 /// Total bytes across all movable ranges — the numerator of fair share.
358 total_bytes: u64,
359 /// Per-range size and traffic, keyed in `(collection, range_id)` order.
360 ranges: BTreeMap<(CollectionId, RangeId), RangeFacts>,
361 /// Simulated current owner of each movable range (evolves as moves are taken).
362 owner_of: BTreeMap<(CollectionId, RangeId), NodeIdentity>,
363 /// The range's true catalog owner — the `from` every move records, even if the
364 /// simulation has since reassigned it (a range moves at most once per plan).
365 origin_owner: BTreeMap<(CollectionId, RangeId), NodeIdentity>,
366 /// Simulated bytes-used per member.
367 used: BTreeMap<NodeIdentity, u64>,
368 /// Simulated read/write load per member.
369 load: BTreeMap<NodeIdentity, u64>,
370 /// Ranges already scheduled to move — never moved twice in one plan.
371 moved: std::collections::BTreeSet<(CollectionId, RangeId)>,
372}
373
374#[derive(Clone, Copy)]
375struct RangeFacts {
376 bytes: u64,
377 traffic: u64,
378}
379
380impl ClusterState {
381 fn observe(
382 membership: &MembershipCatalog,
383 ownership: &ShardOwnershipCatalog,
384 signals: &impl PlacementSignals,
385 _policy: &PlacementPolicy,
386 ) -> Self {
387 let mut weighted_capacity = BTreeMap::new();
388 let mut eligible = Vec::new();
389 let mut total_capacity: u128 = 0;
390 for member in membership.placement_eligible_members() {
391 let id = member.identity().clone();
392 let cap = signals.member_capacity(&id).weighted_capacity();
393 if cap == 0 {
394 // A placement-eligible member advertising no usable disk is not a
395 // valid target; exclude it so it is never apportioned bytes.
396 continue;
397 }
398 total_capacity += cap;
399 weighted_capacity.insert(id.clone(), cap);
400 eligible.push(id);
401 }
402
403 let eligible_set: std::collections::BTreeSet<&NodeIdentity> = eligible.iter().collect();
404
405 let mut ranges = BTreeMap::new();
406 let mut owner_of = BTreeMap::new();
407 let mut origin_owner = BTreeMap::new();
408 let mut used: BTreeMap<NodeIdentity, u64> =
409 eligible.iter().map(|id| (id.clone(), 0)).collect();
410 let mut load: BTreeMap<NodeIdentity, u64> =
411 eligible.iter().map(|id| (id.clone(), 0)).collect();
412 let mut total_bytes: u64 = 0;
413
414 for entry in ownership.entries() {
415 let owner = entry.owner().clone();
416 // Only ranges owned by an eligible member are movable here; a draining
417 // owner's ranges belong to the drain flow.
418 if !eligible_set.contains(&owner) {
419 continue;
420 }
421 let key = (entry.collection().clone(), entry.range_id());
422 let load_facts = signals.range_load(entry.collection(), entry.range_id());
423 ranges.insert(
424 key.clone(),
425 RangeFacts {
426 bytes: load_facts.bytes_used,
427 traffic: load_facts.traffic(),
428 },
429 );
430 *used.get_mut(&owner).unwrap() += load_facts.bytes_used;
431 *load.get_mut(&owner).unwrap() += load_facts.traffic();
432 total_bytes = total_bytes.saturating_add(load_facts.bytes_used);
433 owner_of.insert(key.clone(), owner.clone());
434 origin_owner.insert(key, owner);
435 }
436
437 Self {
438 eligible,
439 weighted_capacity,
440 total_capacity,
441 total_bytes,
442 ranges,
443 owner_of,
444 origin_owner,
445 used,
446 load,
447 moved: std::collections::BTreeSet::new(),
448 }
449 }
450
451 fn fair(&self, member: &NodeIdentity) -> u64 {
452 let cap = self.weighted_capacity.get(member).copied().unwrap_or(0);
453 fair_share(self.total_bytes, cap, self.total_capacity)
454 }
455
456 /// Ranges currently owned by `member` in the simulation, in `(collection,
457 /// range_id)` order, that have not already been moved this plan.
458 fn ranges_owned_by(&self, member: &NodeIdentity) -> Vec<(CollectionId, RangeId)> {
459 self.owner_of
460 .iter()
461 .filter(|(key, owner)| *owner == member && !self.moved.contains(*key))
462 .map(|(key, _)| key.clone())
463 .collect()
464 }
465
466 fn apply_move(&mut self, key: &(CollectionId, RangeId), to: &NodeIdentity) {
467 let facts = self.ranges[key];
468 let from = self.owner_of[key].clone();
469 *self.used.get_mut(&from).unwrap() -= facts.bytes;
470 *self.load.get_mut(&from).unwrap() -= facts.traffic;
471 *self.used.get_mut(to).unwrap() += facts.bytes;
472 *self.load.get_mut(to).unwrap() += facts.traffic;
473 self.owner_of.insert(key.clone(), to.clone());
474 self.moved.insert(key.clone());
475 }
476
477 /// The **primary** pass: greedily move ranges off members over their
478 /// weighted-capacity fair share onto members under theirs, until no member is
479 /// over tolerance or no move strictly improves the worst imbalance.
480 fn plan_capacity_moves(&mut self, policy: &PlacementPolicy) -> Vec<PlannedMove> {
481 let mut planned = Vec::new();
482 if self.total_capacity == 0 || self.eligible.len() < 2 {
483 return planned;
484 }
485
486 // Each range moves at most once, so the loop is bounded by the range count.
487 // Pick the member most over its fair share (beyond tolerance) each round,
488 // then the member most under its own — the pair whose rebalance helps most.
489 while let Some(source) = self.most_over(policy) {
490 let Some(target) = self.most_under(&source) else {
491 break;
492 };
493
494 let dev_src = self.deviation(&source);
495 let dev_tgt = self.deviation(&target);
496 let worst_before = dev_src.abs().max(dev_tgt.abs());
497
498 // Among the source's still-movable ranges, choose the one that most
499 // reduces the worse of the two deviations after the move.
500 let mut best: Option<((CollectionId, RangeId), f64)> = None;
501 for key in self.ranges_owned_by(&source) {
502 let s = self.ranges[&key].bytes as f64;
503 let after = (dev_src - s).abs().max((dev_tgt + s).abs());
504 let better = match &best {
505 None => true,
506 Some((_, best_after)) => after < *best_after,
507 };
508 if better {
509 best = Some((key, after));
510 }
511 }
512
513 let Some((key, worst_after)) = best else {
514 break;
515 };
516 // Only take the move if it strictly improves the worst imbalance —
517 // otherwise we would churn ownership for nothing.
518 if worst_after >= worst_before {
519 break;
520 }
521
522 let bytes = self.ranges[&key].bytes;
523 let from = self.origin_owner[&key].clone();
524 self.apply_move(&key, &target);
525 planned.push(PlannedMove {
526 collection: key.0,
527 range_id: key.1,
528 from,
529 to: target,
530 bytes,
531 reason: MoveReason::CapacityBalance,
532 });
533 }
534
535 planned
536 }
537
538 /// A member's deviation from its fair share in bytes: positive is over-full,
539 /// negative is under-full.
540 fn deviation(&self, member: &NodeIdentity) -> f64 {
541 self.used.get(member).copied().unwrap_or(0) as f64 - self.fair(member) as f64
542 }
543
544 /// The eligible member furthest over its fair share, beyond the tolerance
545 /// band, or `None` if everyone is within tolerance.
546 fn most_over(&self, policy: &PlacementPolicy) -> Option<NodeIdentity> {
547 self.eligible
548 .iter()
549 .filter(|id| {
550 let used = self.used.get(*id).copied().unwrap_or(0) as f64;
551 let fair = self.fair(id) as f64;
552 used > fair * (1.0 + policy.balance_tolerance) && used > fair
553 })
554 .max_by(|a, b| {
555 self.deviation(a)
556 .partial_cmp(&self.deviation(b))
557 .unwrap()
558 // Tie-break by identity so the plan is deterministic.
559 .then_with(|| b.cmp(a))
560 })
561 .cloned()
562 }
563
564 /// The eligible member furthest *under* its fair share (the best target),
565 /// excluding `source`.
566 fn most_under(&self, source: &NodeIdentity) -> Option<NodeIdentity> {
567 self.eligible
568 .iter()
569 .filter(|id| *id != source && self.deviation(id) < 0.0)
570 .min_by(|a, b| {
571 self.deviation(a)
572 .partial_cmp(&self.deviation(b))
573 .unwrap()
574 // Tie-break by identity so the plan is deterministic.
575 .then_with(|| a.cmp(b))
576 })
577 .cloned()
578 }
579
580 /// The **secondary** pass: identify hotspot ranges (traffic well above the
581 /// cluster mean) and, for each, propose spreading it to a member with both
582 /// load and capacity headroom — but only when that strictly lowers the owner's
583 /// load concentration and respects the capacity tolerance. Returns the
584 /// observed hotspots (hottest first) and any relief moves.
585 fn plan_hotspot_moves(
586 &mut self,
587 policy: &PlacementPolicy,
588 ) -> (Vec<HotspotRange>, Vec<PlannedMove>) {
589 let mut hotspots = Vec::new();
590 let mut moves = Vec::new();
591
592 let range_count = self.ranges.len();
593 if range_count == 0 {
594 return (hotspots, moves);
595 }
596 let total_traffic: u64 = self.ranges.values().map(|f| f.traffic).sum();
597 let mean = total_traffic as f64 / range_count as f64;
598 let threshold = mean * policy.hotspot_load_factor;
599 if mean <= 0.0 {
600 return (hotspots, moves);
601 }
602
603 // Collect hotspots, hottest first; tie-break by key for determinism.
604 let mut hot: Vec<((CollectionId, RangeId), u64)> = self
605 .ranges
606 .iter()
607 .filter(|(_, f)| f.traffic as f64 > threshold)
608 .map(|(key, f)| (key.clone(), f.traffic))
609 .collect();
610 hot.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
611
612 for (key, traffic) in hot {
613 let owner = self.owner_of[&key].clone();
614 hotspots.push(HotspotRange {
615 collection: key.0.clone(),
616 range_id: key.1,
617 // Report the range's true catalog owner — the member actually
618 // bearing the hot traffic — independent of any simulated move.
619 owner: self.origin_owner[&key].clone(),
620 traffic,
621 });
622
623 // A hotspot already scheduled to move (by the capacity pass) needs no
624 // second move, and an owner holding only this one range cannot be
625 // relieved by moving it — that just relocates the hotspot.
626 if self.moved.contains(&key) || self.ranges_owned_by(&owner).len() < 2 {
627 continue;
628 }
629
630 let facts = self.ranges[&key];
631 let owner_load = self.load.get(&owner).copied().unwrap_or(0);
632
633 // Pick the eligible member with the least load that can take the range
634 // without breaching its capacity tolerance and ends up less loaded than
635 // the owner is now — otherwise the move does not spread load.
636 let target = self
637 .eligible
638 .iter()
639 .filter(|id| **id != owner)
640 .filter(|id| {
641 let used = self.used.get(*id).copied().unwrap_or(0);
642 let fair = self.fair(id) as f64;
643 (used + facts.bytes) as f64 <= fair * (1.0 + policy.balance_tolerance)
644 })
645 .filter(|id| {
646 let tgt_load = self.load.get(*id).copied().unwrap_or(0);
647 tgt_load + facts.traffic < owner_load
648 })
649 .min_by(|a, b| {
650 let la = self.load.get(*a).copied().unwrap_or(0);
651 let lb = self.load.get(*b).copied().unwrap_or(0);
652 la.cmp(&lb).then_with(|| a.cmp(b))
653 })
654 .cloned();
655
656 if let Some(target) = target {
657 let from = self.origin_owner[&key].clone();
658 self.apply_move(&key, &target);
659 moves.push(PlannedMove {
660 collection: key.0,
661 range_id: key.1,
662 from,
663 to: target,
664 bytes: facts.bytes,
665 reason: MoveReason::HotspotRelief,
666 });
667 }
668 }
669
670 (hotspots, moves)
671 }
672}
673
674#[cfg(test)]
675mod tests {
676 use super::*;
677 use crate::cluster::membership::{ClusterId, ClusterMember, MemberKind};
678 use crate::cluster::ownership::{PlacementMetadata, RangeBounds, RangeOwnership, ShardKeyMode};
679 use std::collections::HashMap;
680
681 fn ident(cn: &str) -> NodeIdentity {
682 NodeIdentity::from_certificate_subject(cn).unwrap()
683 }
684
685 fn collection(name: &str) -> CollectionId {
686 CollectionId::new(name).unwrap()
687 }
688
689 fn data_member(cn: &str) -> ClusterMember {
690 ClusterMember::joined_empty(ident(cn), MemberKind::Data)
691 }
692
693 fn membership(members: &[&str]) -> MembershipCatalog {
694 MembershipCatalog::new(
695 ClusterId::new("cluster-x").unwrap(),
696 members.iter().map(|m| data_member(m)),
697 )
698 }
699
700 /// A catalog of `n` single-owner ranges in `orders`, assigning range `i` to
701 /// `owners[i]`. Each range is a distinct hash partition so they never overlap.
702 fn catalog(owners: &[&str]) -> (ShardOwnershipCatalog, CollectionId) {
703 let orders = collection("orders");
704 let mut catalog = ShardOwnershipCatalog::new();
705 for (i, owner) in owners.iter().enumerate() {
706 let lower = vec![i as u8];
707 let upper = vec![i as u8 + 1];
708 let bounds = RangeBounds::new(
709 crate::cluster::ownership::RangeBound::key(lower),
710 crate::cluster::ownership::RangeBound::key(upper),
711 )
712 .unwrap();
713 catalog
714 .apply_update(RangeOwnership::establish(
715 orders.clone(),
716 RangeId::new(i as u64 + 1),
717 ShardKeyMode::Hash,
718 bounds,
719 ident(owner),
720 Vec::<NodeIdentity>::new(),
721 PlacementMetadata::with_replication_factor(1),
722 ))
723 .unwrap();
724 }
725 (catalog, orders)
726 }
727
728 /// A scripted [`PlacementSignals`]: per-member capacity (defaulting to a
729 /// uniform disk) and per-range load keyed by range id.
730 struct FakeSignals {
731 default_capacity: MemberCapacity,
732 capacity: HashMap<NodeIdentity, MemberCapacity>,
733 load: HashMap<u64, RangeLoad>,
734 default_bytes: u64,
735 }
736
737 impl FakeSignals {
738 fn uniform(disk: u64, default_bytes: u64) -> Self {
739 Self {
740 default_capacity: MemberCapacity::with_disk(disk),
741 capacity: HashMap::new(),
742 load: HashMap::new(),
743 default_bytes,
744 }
745 }
746
747 fn with_capacity(mut self, cn: &str, cap: MemberCapacity) -> Self {
748 self.capacity.insert(ident(cn), cap);
749 self
750 }
751
752 fn with_load(mut self, range_id: u64, load: RangeLoad) -> Self {
753 self.load.insert(range_id, load);
754 self
755 }
756 }
757
758 impl PlacementSignals for FakeSignals {
759 fn member_capacity(&self, member: &NodeIdentity) -> MemberCapacity {
760 self.capacity
761 .get(member)
762 .copied()
763 .unwrap_or(self.default_capacity)
764 }
765
766 fn range_load(&self, _collection: &CollectionId, range_id: RangeId) -> RangeLoad {
767 self.load
768 .get(&range_id.value())
769 .copied()
770 .unwrap_or_else(|| RangeLoad::idle(self.default_bytes))
771 }
772 }
773
774 // --- weighted capacity model -----------------------------------------
775
776 #[test]
777 fn weighted_capacity_scales_disk_by_operator_weight() {
778 // Neutral weight places strictly by disk.
779 assert_eq!(MemberCapacity::with_disk(1_000).weighted_capacity(), 1_000);
780 // A 2.0x operator weight doubles the placement weight; 0.5x halves it.
781 assert_eq!(MemberCapacity::new(1_000, 200).weighted_capacity(), 2_000);
782 assert_eq!(MemberCapacity::new(1_000, 50).weighted_capacity(), 500);
783 // No disk -> not placeable.
784 assert!(!MemberCapacity::with_disk(0).is_placeable());
785 assert!(MemberCapacity::with_disk(1).is_placeable());
786 }
787
788 // --- acceptance scenario: homogeneous placement ----------------------
789
790 #[test]
791 fn homogeneous_cluster_is_balanced_and_plans_nothing() {
792 // Three members, equal disk, three equal-sized ranges one each: already
793 // perfectly balanced, so the planner proposes no move.
794 let planner = WeightedPlacementPlanner::default();
795 let members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
796 let (catalog, _orders) = catalog(&["CN=node-a", "CN=node-b", "CN=node-c"]);
797 let signals = FakeSignals::uniform(1_000_000, 100);
798
799 let plan = planner.plan_rebalance(&members, &catalog, &signals);
800 assert!(plan.is_empty(), "balanced homogeneous cluster is a no-op");
801 }
802
803 #[test]
804 fn homogeneous_cluster_with_skew_spreads_ranges() {
805 // All three ranges sit on node-a while node-b and node-c are empty. With
806 // equal capacity the fair share is one range each, so the planner moves two
807 // ranges off node-a.
808 let planner = WeightedPlacementPlanner::default();
809 let members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
810 let (catalog, _orders) = catalog(&["CN=node-a", "CN=node-a", "CN=node-a"]);
811 let signals = FakeSignals::uniform(1_000_000, 100);
812
813 let plan = planner.plan_rebalance(&members, &catalog, &signals);
814 assert_eq!(
815 plan.capacity_moves().count(),
816 2,
817 "two ranges move off node-a"
818 );
819 for mv in plan.capacity_moves() {
820 assert_eq!(mv.from, ident("CN=node-a"));
821 assert_ne!(mv.to, ident("CN=node-a"));
822 assert_eq!(mv.reason, MoveReason::CapacityBalance);
823 }
824 // node-b and node-c each receive exactly one range.
825 let targets: std::collections::BTreeSet<_> =
826 plan.capacity_moves().map(|m| m.to.clone()).collect();
827 assert_eq!(targets.len(), 2);
828 }
829
830 // --- acceptance scenario: heterogeneous disk weights -----------------
831
832 #[test]
833 fn heterogeneous_disk_weights_apportion_by_capacity() {
834 // node-big advertises 4x the disk of node-small. Six equal ranges all start
835 // on node-small; fair shares are big≈4.8, small≈1.2 ranges, so the planner
836 // moves the bulk onto node-big.
837 let planner = WeightedPlacementPlanner::default();
838 let members = membership(&["CN=node-big", "CN=node-small"]);
839 let (catalog, _orders) = catalog(&[
840 "CN=node-small",
841 "CN=node-small",
842 "CN=node-small",
843 "CN=node-small",
844 "CN=node-small",
845 "CN=node-small",
846 ]);
847 let signals = FakeSignals::uniform(1_000, 100)
848 .with_capacity("CN=node-big", MemberCapacity::with_disk(4_000))
849 .with_capacity("CN=node-small", MemberCapacity::with_disk(1_000));
850
851 let plan = planner.plan_rebalance(&members, &catalog, &signals);
852 assert!(!plan.no_moves(), "imbalanced cluster must plan moves");
853 // Every move goes from small to big, and big ends with ~4-5 of the 6 ranges.
854 let to_big = plan
855 .capacity_moves()
856 .filter(|m| m.to == ident("CN=node-big"))
857 .count();
858 assert!(
859 (4..=5).contains(&to_big),
860 "node-big should receive ~4/5 of 6 ranges, got {to_big}"
861 );
862 for mv in plan.capacity_moves() {
863 assert_eq!(mv.from, ident("CN=node-small"));
864 assert_eq!(mv.to, ident("CN=node-big"));
865 }
866 }
867
868 #[test]
869 fn operator_weight_biases_placement_without_more_disk() {
870 // Same disk on both, but node-pref carries a 3x operator weight, so it
871 // deserves the larger share of four ranges that all start on node-plain.
872 let planner = WeightedPlacementPlanner::default();
873 let members = membership(&["CN=node-pref", "CN=node-plain"]);
874 let (catalog, _orders) = catalog(&[
875 "CN=node-plain",
876 "CN=node-plain",
877 "CN=node-plain",
878 "CN=node-plain",
879 ]);
880 let signals = FakeSignals::uniform(1_000, 100)
881 .with_capacity("CN=node-pref", MemberCapacity::new(1_000, 300));
882
883 let plan = planner.plan_rebalance(&members, &catalog, &signals);
884 let to_pref = plan
885 .capacity_moves()
886 .filter(|m| m.to == ident("CN=node-pref"))
887 .count();
888 assert!(
889 to_pref >= 2,
890 "higher operator weight pulls more ranges, got {to_pref}"
891 );
892 }
893
894 // --- acceptance scenario: capacity expansion -------------------------
895
896 #[test]
897 fn expanding_disk_changes_weight_and_next_plan_without_moving_data() {
898 // Start heterogeneous: node-a small, node-b large, all six ranges on node-a.
899 let planner = WeightedPlacementPlanner::default();
900 let members = membership(&["CN=node-a", "CN=node-b"]);
901 let (catalog, orders) = catalog(&[
902 "CN=node-a",
903 "CN=node-a",
904 "CN=node-a",
905 "CN=node-a",
906 "CN=node-a",
907 "CN=node-a",
908 ]);
909
910 // Before expansion: node-b has only modest disk, so it receives a modest
911 // share.
912 let before_signals = FakeSignals::uniform(1_000, 100)
913 .with_capacity("CN=node-a", MemberCapacity::with_disk(3_000))
914 .with_capacity("CN=node-b", MemberCapacity::with_disk(1_000));
915 let before = planner.plan_rebalance(&members, &catalog, &before_signals);
916 let before_to_b = before
917 .capacity_moves()
918 .filter(|m| m.to == ident("CN=node-b"))
919 .count();
920
921 // Operator expands node-b's disk 8x. Its placement weight jumps...
922 let small = MemberCapacity::with_disk(1_000);
923 let expanded = MemberCapacity::with_disk(8_000);
924 assert!(
925 expanded.weighted_capacity() > small.weighted_capacity(),
926 "expanding disk raises placement weight",
927 );
928 let after_signals = FakeSignals::uniform(1_000, 100)
929 .with_capacity("CN=node-a", MemberCapacity::with_disk(3_000))
930 .with_capacity("CN=node-b", expanded);
931 let after = planner.plan_rebalance(&members, &catalog, &after_signals);
932 let after_to_b = after
933 .capacity_moves()
934 .filter(|m| m.to == ident("CN=node-b"))
935 .count();
936
937 // ...so the *next* plan apportions more ranges to node-b than before.
938 assert!(
939 after_to_b > before_to_b,
940 "expanded disk pulls more ranges on the next plan ({before_to_b} -> {after_to_b})",
941 );
942
943 // But planning never moved data: the catalog still shows all six ranges on
944 // node-a. Data only relocates when a transition plan is executed.
945 for i in 1..=6 {
946 let range = catalog.range(&orders, RangeId::new(i)).unwrap();
947 assert_eq!(
948 range.owner(),
949 &ident("CN=node-a"),
950 "range {i} stayed on node-a; planning moved nothing",
951 );
952 }
953 }
954
955 // --- acceptance scenario: hotspot signal influence -------------------
956
957 #[test]
958 fn hotspot_traffic_identifies_secondary_candidate() {
959 // Capacity is *balanced* — node-a owns two ranges but has twice the disk,
960 // so every member sits exactly on its fair share and the primary pass
961 // proposes nothing. Yet range 1 on node-a serves a huge read/write load (a
962 // small, read-hammered range) while the others are quiet. The secondary
963 // signal flags it as a hotspot and, because node-a also carries other
964 // traffic and a quiet member has both load and capacity headroom, proposes
965 // spreading it.
966 let planner = WeightedPlacementPlanner::default();
967 let members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
968 // node-a owns ranges 1 and 2; node-b owns 3; node-c owns 4.
969 let (catalog, _orders) = catalog(&["CN=node-a", "CN=node-a", "CN=node-b", "CN=node-c"]);
970 // node-a has 2x disk so its fair share covers both its ranges (40 bytes);
971 // node-b and node-c each match their single 20-byte range.
972 let signals = FakeSignals::uniform(0, 0)
973 .with_capacity("CN=node-a", MemberCapacity::with_disk(2_000))
974 .with_capacity("CN=node-b", MemberCapacity::with_disk(1_000))
975 .with_capacity("CN=node-c", MemberCapacity::with_disk(1_000))
976 // The hot range is tiny on disk but hammered; node-a keeps real
977 // residual traffic on range 2.
978 .with_load(
979 1,
980 RangeLoad {
981 bytes_used: 2,
982 read_ops: 1_000,
983 write_ops: 1_000,
984 },
985 )
986 .with_load(
987 2,
988 RangeLoad {
989 bytes_used: 38,
990 read_ops: 300,
991 write_ops: 0,
992 },
993 )
994 .with_load(
995 3,
996 RangeLoad {
997 bytes_used: 20,
998 read_ops: 100,
999 write_ops: 0,
1000 },
1001 )
1002 .with_load(
1003 4,
1004 RangeLoad {
1005 bytes_used: 20,
1006 read_ops: 100,
1007 write_ops: 0,
1008 },
1009 );
1010
1011 let plan = planner.plan_rebalance(&members, &catalog, &signals);
1012 // Capacity is balanced, so no capacity-balance move is proposed.
1013 assert_eq!(plan.capacity_moves().count(), 0, "capacity is balanced");
1014 // Range 1 is identified as a hotspot, attributed to its real owner.
1015 assert_eq!(plan.hotspots.len(), 1, "the hot range is surfaced");
1016 assert_eq!(plan.hotspots[0].range_id, RangeId::new(1));
1017 assert_eq!(plan.hotspots[0].owner, ident("CN=node-a"));
1018 assert_eq!(plan.hotspots[0].traffic, 2_000);
1019 // And a hotspot-relief move spreads it off node-a onto the quietest member.
1020 let relief: Vec<_> = plan.hotspot_moves().collect();
1021 assert_eq!(relief.len(), 1, "a relief move is planned");
1022 assert_eq!(relief[0].range_id, RangeId::new(1));
1023 assert_eq!(relief[0].from, ident("CN=node-a"));
1024 assert_eq!(
1025 relief[0].to,
1026 ident("CN=node-b"),
1027 "quietest target, tie -> lowest id"
1028 );
1029 assert_eq!(relief[0].reason, MoveReason::HotspotRelief);
1030 }
1031
1032 #[test]
1033 fn no_hotspot_when_traffic_is_even() {
1034 // Balanced capacity (one range each, equal disk) and equal traffic: nothing
1035 // is a hotspot and the secondary signal proposes nothing.
1036 let planner = WeightedPlacementPlanner::default();
1037 let members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
1038 let (catalog, _orders) = catalog(&["CN=node-a", "CN=node-b", "CN=node-c"]);
1039 let signals = FakeSignals::uniform(1_000_000, 100)
1040 .with_load(
1041 1,
1042 RangeLoad {
1043 bytes_used: 10,
1044 read_ops: 100,
1045 write_ops: 100,
1046 },
1047 )
1048 .with_load(
1049 2,
1050 RangeLoad {
1051 bytes_used: 10,
1052 read_ops: 100,
1053 write_ops: 100,
1054 },
1055 )
1056 .with_load(
1057 3,
1058 RangeLoad {
1059 bytes_used: 10,
1060 read_ops: 100,
1061 write_ops: 100,
1062 },
1063 );
1064
1065 let plan = planner.plan_rebalance(&members, &catalog, &signals);
1066 assert!(plan.is_empty(), "balanced, even-traffic cluster is a no-op");
1067 }
1068
1069 // --- acceptance scenario: no implicit data movement ------------------
1070
1071 #[test]
1072 fn planning_never_mutates_the_catalog() {
1073 // A deliberately skewed cluster yields a non-empty plan, yet the ownership
1074 // catalog is byte-for-byte identical before and after planning: the planner
1075 // only *describes* moves, it never performs them.
1076 let planner = WeightedPlacementPlanner::default();
1077 let members = membership(&["CN=node-a", "CN=node-b"]);
1078 let (catalog, orders) = catalog(&["CN=node-a", "CN=node-a", "CN=node-a", "CN=node-a"]);
1079 let signals = FakeSignals::uniform(1_000, 100);
1080
1081 // Snapshot every range's owner/epoch/version before planning.
1082 let before: Vec<_> = (1..=4)
1083 .map(|i| {
1084 let r = catalog.range(&orders, RangeId::new(i)).unwrap();
1085 (r.owner().clone(), r.epoch(), r.version())
1086 })
1087 .collect();
1088
1089 let plan = planner.plan_rebalance(&members, &catalog, &signals);
1090 assert!(!plan.no_moves(), "skewed cluster does plan moves");
1091
1092 // The catalog is unchanged: same owners, same epochs, same versions.
1093 for (i, snap) in before.iter().enumerate() {
1094 let r = catalog.range(&orders, RangeId::new(i as u64 + 1)).unwrap();
1095 assert_eq!(&(r.owner().clone(), r.epoch(), r.version()), snap);
1096 }
1097 }
1098
1099 #[test]
1100 fn draining_owner_ranges_are_left_to_the_drain_flow() {
1101 // node-a is draining (not placement-eligible). Its ranges are not moved by
1102 // the rebalancer — drain owns evacuating them — so no plan targets or
1103 // sources it for placement balancing.
1104 let planner = WeightedPlacementPlanner::default();
1105 let mut members = membership(&["CN=node-a", "CN=node-b"]);
1106 members.begin_drain(&ident("CN=node-a"));
1107 let (catalog, _orders) = catalog(&["CN=node-a", "CN=node-a", "CN=node-a"]);
1108 let signals = FakeSignals::uniform(1_000, 100);
1109
1110 let plan = planner.plan_rebalance(&members, &catalog, &signals);
1111 // node-a's ranges are not movable here, and node-b is the only eligible
1112 // member, so there is nothing to balance.
1113 assert!(
1114 plan.no_moves(),
1115 "draining owner's ranges are not rebalanced"
1116 );
1117 }
1118
1119 #[test]
1120 fn single_member_cluster_plans_nothing() {
1121 let planner = WeightedPlacementPlanner::default();
1122 let members = membership(&["CN=node-a"]);
1123 let (catalog, _orders) = catalog(&["CN=node-a", "CN=node-a"]);
1124 let signals = FakeSignals::uniform(1_000, 100);
1125
1126 let plan = planner.plan_rebalance(&members, &catalog, &signals);
1127 assert!(
1128 plan.no_moves(),
1129 "nowhere to move ranges in a one-member cluster"
1130 );
1131 }
1132}