Skip to main content

reddb_server/cluster/
placement.rs

1//! Weighted placement and the multi-signal rebalancer planner (issue #1003,
2//! PRD #987, ADR 0037).
3//!
4//! Where the [`supervisor`](super::supervisor) reacts to a *failed* owner, this
5//! module is the proactive counterpart: it decides where ranges *should* live so
6//! the cluster's storage and traffic stay balanced as members come, go, and grow
7//! their disks. It is the glossary's **weighted placement** policy
8//! (`clustering.md`) — *"shard/range placement policy that accounts for advertised
9//! node capacity such as usable disk … and operator weights. Expanding a node's
10//! disk changes its placement weight; data moves only through explicit rebalancing
11//! transitions"* — driven by the **multi-signal rebalancer** — *"Cluster
12//! Supervisor policy that plans ownership transitions using bytes-used versus
13//! weighted capacity as the primary safety signal and read/write load as a
14//! secondary hotspot signal."*
15//!
16//! ## Two signals, one a safety floor and one a hint
17//!
18//! * **Primary — bytes-used vs weighted capacity.** Every member advertises its
19//!   usable disk and an operator weight ([`MemberCapacity`]); the product is its
20//!   [`weighted_capacity`](MemberCapacity::weighted_capacity), the member's share
21//!   of the cluster it is *meant* to hold. The planner compares each member's
22//!   bytes-used against its **fair share** (cluster bytes apportioned by weighted
23//!   capacity) and proposes moving ranges off members that are over their share
24//!   onto members that are under it. This is the safety signal: a member running
25//!   out of disk is an availability risk, so capacity balance is what the planner
26//!   acts on.
27//! * **Secondary — read/write load.** A range can be perfectly placed by bytes
28//!   yet still be a **hotspot**: it absorbs a disproportionate share of the
29//!   cluster's read/write traffic. The planner surfaces hotspots
30//!   ([`HotspotRange`]) and, when capacity allows, proposes spreading them off
31//!   their over-loaded owner. This is a hint layered on top of the capacity
32//!   floor, never in place of it — a hotspot move is only taken when it does not
33//!   itself create a capacity problem.
34//!
35//! ## Planning, not moving
36//!
37//! [`WeightedPlacementPlanner::plan_rebalance`] reads the membership catalog, the
38//! ownership catalog, and the live signals, and returns a [`RebalancePlan`] of
39//! [`PlannedMove`]s. It takes the ownership catalog by shared reference and
40//! **never mutates it** — *nothing moves implicitly*. Each [`PlannedMove`] is the
41//! intent for one rebalancing transition; executing it (copy the range to the
42//! target, let it catch up to the range commit watermark, then cut over through
43//! the fenced [`Handoff`](super::ownership_transition::TransitionKind::Handoff)
44//! transition machine) is a separate, explicit step. This is why *expanding a
45//! member's disk changes its placement weight but moves no data*: the new weight
46//! changes what the *next* plan proposes, and data only relocates when that plan
47//! is run.
48//!
49//! ## Purity
50//!
51//! All live state — per-member advertised capacity and per-range bytes/traffic —
52//! is read through the [`PlacementSignals`] trait, injected by the caller.
53//! Production backs it onto the disk-usage reporter and the per-range traffic
54//! counters; tests back it onto a scripted fake. The planner itself is a pure
55//! policy over the two catalogs plus those signals, so the whole weighting,
56//! balancing, and hotspot story is exercised deterministically — no disk, no
57//! clock, no network.
58
59use std::collections::BTreeMap;
60
61use super::identity::NodeIdentity;
62use super::membership::MembershipCatalog;
63use super::ownership::{CollectionId, RangeId, ShardOwnershipCatalog};
64
65/// The neutral operator weight: a member with this weight is placed strictly by
66/// its usable disk. The weight is expressed in hundredths, so `100` means a 1.0×
67/// multiplier; `200` doubles a member's placement weight and `50` halves it. An
68/// operator nudges placement without lying about disk by tuning this.
69pub const NEUTRAL_OPERATOR_WEIGHT: u32 = 100;
70
71/// A member's advertised placement capacity: how much usable disk it offers and
72/// the operator's weight multiplier on top of it.
73///
74/// The two combine into the member's [`weighted_capacity`](Self::weighted_capacity)
75/// — its share of the cluster it is meant to hold. Advertising more usable disk,
76/// or a higher operator weight, raises that share; the planner then apportions
77/// ranges toward it on the *next* plan. The struct is pure advertised state: it
78/// records what a member *offers*, never moves anything by itself.
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub struct MemberCapacity {
81    /// Usable disk the member advertises for user ranges, in bytes.
82    pub usable_disk_bytes: u64,
83    /// Operator weight in hundredths ([`NEUTRAL_OPERATOR_WEIGHT`] = 1.0×). Lets an
84    /// operator bias placement on or off a member without misreporting disk.
85    pub operator_weight: u32,
86}
87
88impl MemberCapacity {
89    /// Capacity with an explicit usable disk and operator weight.
90    pub fn new(usable_disk_bytes: u64, operator_weight: u32) -> Self {
91        Self {
92            usable_disk_bytes,
93            operator_weight,
94        }
95    }
96
97    /// Capacity from usable disk alone, at the neutral operator weight — the
98    /// common case where the operator has expressed no preference.
99    pub fn with_disk(usable_disk_bytes: u64) -> Self {
100        Self::new(usable_disk_bytes, NEUTRAL_OPERATOR_WEIGHT)
101    }
102
103    /// The member's **placement weight**: usable disk scaled by the operator
104    /// weight. This is the value the rebalancer apportions the cluster's bytes by,
105    /// and it is exactly what *expanding a member's disk changes* — a larger disk
106    /// (or a higher operator weight) yields a larger weighted capacity and so a
107    /// larger fair share on the next plan. Computed in `u128` so a large disk
108    /// times a large weight cannot overflow.
109    pub fn weighted_capacity(&self) -> u128 {
110        self.usable_disk_bytes as u128 * self.operator_weight as u128
111            / NEUTRAL_OPERATOR_WEIGHT as u128
112    }
113
114    /// Whether this member can hold any ranges at all — a member advertising no
115    /// usable disk (or a zero operator weight) has zero weighted capacity and is
116    /// never a placement target.
117    pub fn is_placeable(&self) -> bool {
118        self.weighted_capacity() > 0
119    }
120}
121
122/// The live load on one range: its on-disk size and its recent read/write
123/// traffic.
124///
125/// `bytes_used` feeds the **primary** capacity signal (it is what a member's
126/// bytes-used is summed from); `read_ops`/`write_ops` feed the **secondary**
127/// hotspot signal. Keeping both on one struct lets a single
128/// [`PlacementSignals::range_load`] call answer everything the planner needs about
129/// a range.
130#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
131pub struct RangeLoad {
132    /// The range's on-disk size in bytes — the primary capacity signal.
133    pub bytes_used: u64,
134    /// Read operations served in the recent observation window.
135    pub read_ops: u64,
136    /// Write operations served in the recent observation window.
137    pub write_ops: u64,
138}
139
140impl RangeLoad {
141    /// A range that occupies `bytes_used` but serves no traffic — handy when only
142    /// the capacity signal matters.
143    pub fn idle(bytes_used: u64) -> Self {
144        Self {
145            bytes_used,
146            read_ops: 0,
147            write_ops: 0,
148        }
149    }
150
151    /// Total read + write traffic — the hotspot signal. A range with high traffic
152    /// relative to the cluster mean is a hotspot candidate regardless of its size.
153    pub fn traffic(&self) -> u64 {
154        self.read_ops.saturating_add(self.write_ops)
155    }
156}
157
158/// The live cluster state the planner reads but does not own: each member's
159/// advertised capacity and each range's bytes/traffic.
160///
161/// Production backs this onto the disk-usage reporter and the per-range traffic
162/// counters; tests back it onto a scripted fake. Keeping it behind a trait is what
163/// makes the planner a pure policy.
164pub trait PlacementSignals {
165    /// The capacity `member` currently advertises. A member that advertises
166    /// nothing (or is unknown) should report a zero-disk [`MemberCapacity`], which
167    /// makes it un-placeable rather than a div-by-zero hazard.
168    fn member_capacity(&self, member: &NodeIdentity) -> MemberCapacity;
169
170    /// The current load on `(collection, range_id)` — its bytes and its recent
171    /// read/write traffic.
172    fn range_load(&self, collection: &CollectionId, range_id: RangeId) -> RangeLoad;
173}
174
175/// Why the planner proposed moving a range.
176#[derive(Debug, Clone, Copy, PartialEq, Eq)]
177pub enum MoveReason {
178    /// The **primary** signal: the source member is over its weighted-capacity
179    /// fair share and the target is under its own. Moving the range relieves a
180    /// disk-pressure (availability) risk.
181    CapacityBalance,
182    /// The **secondary** signal: the range is a read/write hotspot on an
183    /// over-loaded owner, and a target with both load and capacity headroom can
184    /// absorb it. Taken only when it does not create a capacity problem.
185    HotspotRelief,
186}
187
188/// One proposed rebalancing transition: move authority for a range from its
189/// current owner to a target member.
190///
191/// A [`PlannedMove`] is *intent*, not an executed transition. Carrying it out
192/// means copying the range to `to`, letting it catch up to the range commit
193/// watermark, and then cutting over through the fenced
194/// [`Handoff`](super::ownership_transition::TransitionKind::Handoff) machine — a
195/// separate, explicit step. The planner only ever produces these; it moves no
196/// data.
197#[derive(Debug, Clone, PartialEq, Eq)]
198pub struct PlannedMove {
199    pub collection: CollectionId,
200    pub range_id: RangeId,
201    /// The range's current owner in the catalog — the move's source.
202    pub from: NodeIdentity,
203    /// The proposed new owner — an active data member with capacity headroom.
204    pub to: NodeIdentity,
205    /// The range's size in bytes at planning time (what the move relocates).
206    pub bytes: u64,
207    pub reason: MoveReason,
208}
209
210/// A range the **secondary** signal flagged as a read/write hotspot: it serves
211/// traffic well above the cluster mean. Surfaced whether or not a relief move was
212/// possible, so an operator can see a hotspot even when there is no headroom to
213/// relieve it.
214#[derive(Debug, Clone, PartialEq, Eq)]
215pub struct HotspotRange {
216    pub collection: CollectionId,
217    pub range_id: RangeId,
218    /// The range's current owner — the member bearing the hot traffic.
219    pub owner: NodeIdentity,
220    /// The range's read + write traffic in the observation window.
221    pub traffic: u64,
222}
223
224/// The planner's decision for one pass: the moves to schedule and the hotspots it
225/// observed.
226///
227/// A cluster already balanced by capacity, with no hotspot, yields an empty plan
228/// ([`is_empty`](Self::is_empty)) — the no-op a stable cluster must produce.
229#[derive(Debug, Clone, Default, PartialEq, Eq)]
230pub struct RebalancePlan {
231    /// Proposed moves, in a deterministic order (capacity moves first, then
232    /// hotspot-relief moves, each in `(collection, range_id)` order).
233    pub moves: Vec<PlannedMove>,
234    /// Ranges observed to be hotspots this pass, hottest first.
235    pub hotspots: Vec<HotspotRange>,
236}
237
238impl RebalancePlan {
239    /// Nothing to schedule *and* nothing hot — a fully balanced, evenly-loaded
240    /// cluster. Distinct from [`no_moves`](Self::no_moves): a balanced cluster can
241    /// still have an *observed* hotspot it cannot relieve.
242    pub fn is_empty(&self) -> bool {
243        self.moves.is_empty() && self.hotspots.is_empty()
244    }
245
246    /// Whether the plan proposes any actual range movement. False on a cluster
247    /// that is balanced by capacity and has no relievable hotspot, even if a
248    /// hotspot was *observed*.
249    pub fn no_moves(&self) -> bool {
250        self.moves.is_empty()
251    }
252
253    /// The capacity-balance moves only (the primary signal).
254    pub fn capacity_moves(&self) -> impl Iterator<Item = &PlannedMove> {
255        self.moves
256            .iter()
257            .filter(|m| m.reason == MoveReason::CapacityBalance)
258    }
259
260    /// The hotspot-relief moves only (the secondary signal).
261    pub fn hotspot_moves(&self) -> impl Iterator<Item = &PlannedMove> {
262        self.moves
263            .iter()
264            .filter(|m| m.reason == MoveReason::HotspotRelief)
265    }
266}
267
268/// The tunables that gate when imbalance and traffic are worth a move.
269///
270/// The defaults are deliberately slack: a cluster within 10% of its fair share is
271/// "balanced enough" not to churn ownership, and a hotspot must run at 2× the
272/// cluster-mean traffic before it is worth spreading. Tight thresholds would make
273/// the planner thrash on noise.
274#[derive(Debug, Clone, Copy, PartialEq)]
275pub struct PlacementPolicy {
276    /// Fractional tolerance around a member's fair share. A member is "over" only
277    /// when its bytes-used exceeds `fair * (1 + balance_tolerance)`; a move's
278    /// target must have room under `fair * (1 + balance_tolerance)`. Larger values
279    /// tolerate more imbalance for less churn.
280    pub balance_tolerance: f64,
281    /// How many times the cluster-mean range traffic a range must serve to count
282    /// as a hotspot. `2.0` means "twice the average".
283    pub hotspot_load_factor: f64,
284}
285
286impl Default for PlacementPolicy {
287    fn default() -> Self {
288        Self {
289            balance_tolerance: 0.10,
290            hotspot_load_factor: 2.0,
291        }
292    }
293}
294
295/// The fair share of `total_bytes` that a member with `member_capacity` deserves,
296/// out of the cluster's `total_capacity`. Apportions bytes strictly by weighted
297/// capacity; `u128` math keeps a large cluster from overflowing.
298fn fair_share(total_bytes: u64, member_capacity: u128, total_capacity: u128) -> u64 {
299    if total_capacity == 0 {
300        return 0;
301    }
302    let share = total_bytes as u128 * member_capacity / total_capacity;
303    share.min(u64::MAX as u128) as u64
304}
305
306/// The weighted-placement, multi-signal rebalancer planner.
307///
308/// Holds only the [`PlacementPolicy`]; all live state is read through
309/// [`PlacementSignals`] at plan time, so one planner instance serves the whole
310/// cluster lifetime.
311#[derive(Debug, Clone, Default)]
312pub struct WeightedPlacementPlanner {
313    policy: PlacementPolicy,
314}
315
316impl WeightedPlacementPlanner {
317    /// A planner with the given policy.
318    pub fn new(policy: PlacementPolicy) -> Self {
319        Self { policy }
320    }
321
322    pub fn policy(&self) -> &PlacementPolicy {
323        &self.policy
324    }
325
326    /// Plan a rebalance across the whole ownership catalog **without** mutating
327    /// it. Runs the primary capacity-balance pass, then the secondary
328    /// hotspot-relief pass on top, and returns the combined [`RebalancePlan`].
329    /// Ranges owned by members that are not placement-eligible (draining members,
330    /// witnesses) are left to the drain flow and never moved here.
331    pub fn plan_rebalance(
332        &self,
333        membership: &MembershipCatalog,
334        ownership: &ShardOwnershipCatalog,
335        signals: &impl PlacementSignals,
336    ) -> RebalancePlan {
337        let mut state = ClusterState::observe(membership, ownership, signals, &self.policy);
338        let mut moves = state.plan_capacity_moves(&self.policy);
339        let (hotspots, hotspot_moves) = state.plan_hotspot_moves(&self.policy);
340        moves.extend(hotspot_moves);
341        RebalancePlan { moves, hotspots }
342    }
343}
344
345/// The mutable simulation the planner balances against. Built once from the live
346/// catalogs and signals, then evolved as moves are chosen so each successive move
347/// sees the effect of the ones before it. Crucially this is a *copy* of the live
348/// state — evolving it changes nothing in the real catalog, which is what makes
349/// planning side-effect free.
350struct ClusterState {
351    /// Placement-eligible members (active data members) with non-zero weighted
352    /// capacity, in stable identity order.
353    eligible: Vec<NodeIdentity>,
354    weighted_capacity: BTreeMap<NodeIdentity, u128>,
355    /// Total weighted capacity across `eligible` — the denominator of fair share.
356    total_capacity: u128,
357    /// Total bytes across all movable ranges — the numerator of fair share.
358    total_bytes: u64,
359    /// Per-range size and traffic, keyed in `(collection, range_id)` order.
360    ranges: BTreeMap<(CollectionId, RangeId), RangeFacts>,
361    /// Simulated current owner of each movable range (evolves as moves are taken).
362    owner_of: BTreeMap<(CollectionId, RangeId), NodeIdentity>,
363    /// The range's true catalog owner — the `from` every move records, even if the
364    /// simulation has since reassigned it (a range moves at most once per plan).
365    origin_owner: BTreeMap<(CollectionId, RangeId), NodeIdentity>,
366    /// Simulated bytes-used per member.
367    used: BTreeMap<NodeIdentity, u64>,
368    /// Simulated read/write load per member.
369    load: BTreeMap<NodeIdentity, u64>,
370    /// Ranges already scheduled to move — never moved twice in one plan.
371    moved: std::collections::BTreeSet<(CollectionId, RangeId)>,
372}
373
374#[derive(Clone, Copy)]
375struct RangeFacts {
376    bytes: u64,
377    traffic: u64,
378}
379
380impl ClusterState {
381    fn observe(
382        membership: &MembershipCatalog,
383        ownership: &ShardOwnershipCatalog,
384        signals: &impl PlacementSignals,
385        _policy: &PlacementPolicy,
386    ) -> Self {
387        let mut weighted_capacity = BTreeMap::new();
388        let mut eligible = Vec::new();
389        let mut total_capacity: u128 = 0;
390        for member in membership.placement_eligible_members() {
391            let id = member.identity().clone();
392            let cap = signals.member_capacity(&id).weighted_capacity();
393            if cap == 0 {
394                // A placement-eligible member advertising no usable disk is not a
395                // valid target; exclude it so it is never apportioned bytes.
396                continue;
397            }
398            total_capacity += cap;
399            weighted_capacity.insert(id.clone(), cap);
400            eligible.push(id);
401        }
402
403        let eligible_set: std::collections::BTreeSet<&NodeIdentity> = eligible.iter().collect();
404
405        let mut ranges = BTreeMap::new();
406        let mut owner_of = BTreeMap::new();
407        let mut origin_owner = BTreeMap::new();
408        let mut used: BTreeMap<NodeIdentity, u64> =
409            eligible.iter().map(|id| (id.clone(), 0)).collect();
410        let mut load: BTreeMap<NodeIdentity, u64> =
411            eligible.iter().map(|id| (id.clone(), 0)).collect();
412        let mut total_bytes: u64 = 0;
413
414        for entry in ownership.entries() {
415            let owner = entry.owner().clone();
416            // Only ranges owned by an eligible member are movable here; a draining
417            // owner's ranges belong to the drain flow.
418            if !eligible_set.contains(&owner) {
419                continue;
420            }
421            let key = (entry.collection().clone(), entry.range_id());
422            let load_facts = signals.range_load(entry.collection(), entry.range_id());
423            ranges.insert(
424                key.clone(),
425                RangeFacts {
426                    bytes: load_facts.bytes_used,
427                    traffic: load_facts.traffic(),
428                },
429            );
430            *used.get_mut(&owner).unwrap() += load_facts.bytes_used;
431            *load.get_mut(&owner).unwrap() += load_facts.traffic();
432            total_bytes = total_bytes.saturating_add(load_facts.bytes_used);
433            owner_of.insert(key.clone(), owner.clone());
434            origin_owner.insert(key, owner);
435        }
436
437        Self {
438            eligible,
439            weighted_capacity,
440            total_capacity,
441            total_bytes,
442            ranges,
443            owner_of,
444            origin_owner,
445            used,
446            load,
447            moved: std::collections::BTreeSet::new(),
448        }
449    }
450
451    fn fair(&self, member: &NodeIdentity) -> u64 {
452        let cap = self.weighted_capacity.get(member).copied().unwrap_or(0);
453        fair_share(self.total_bytes, cap, self.total_capacity)
454    }
455
456    /// Ranges currently owned by `member` in the simulation, in `(collection,
457    /// range_id)` order, that have not already been moved this plan.
458    fn ranges_owned_by(&self, member: &NodeIdentity) -> Vec<(CollectionId, RangeId)> {
459        self.owner_of
460            .iter()
461            .filter(|(key, owner)| *owner == member && !self.moved.contains(*key))
462            .map(|(key, _)| key.clone())
463            .collect()
464    }
465
466    fn apply_move(&mut self, key: &(CollectionId, RangeId), to: &NodeIdentity) {
467        let facts = self.ranges[key];
468        let from = self.owner_of[key].clone();
469        *self.used.get_mut(&from).unwrap() -= facts.bytes;
470        *self.load.get_mut(&from).unwrap() -= facts.traffic;
471        *self.used.get_mut(to).unwrap() += facts.bytes;
472        *self.load.get_mut(to).unwrap() += facts.traffic;
473        self.owner_of.insert(key.clone(), to.clone());
474        self.moved.insert(key.clone());
475    }
476
477    /// The **primary** pass: greedily move ranges off members over their
478    /// weighted-capacity fair share onto members under theirs, until no member is
479    /// over tolerance or no move strictly improves the worst imbalance.
480    fn plan_capacity_moves(&mut self, policy: &PlacementPolicy) -> Vec<PlannedMove> {
481        let mut planned = Vec::new();
482        if self.total_capacity == 0 || self.eligible.len() < 2 {
483            return planned;
484        }
485
486        // Each range moves at most once, so the loop is bounded by the range count.
487        // Pick the member most over its fair share (beyond tolerance) each round,
488        // then the member most under its own — the pair whose rebalance helps most.
489        while let Some(source) = self.most_over(policy) {
490            let Some(target) = self.most_under(&source) else {
491                break;
492            };
493
494            let dev_src = self.deviation(&source);
495            let dev_tgt = self.deviation(&target);
496            let worst_before = dev_src.abs().max(dev_tgt.abs());
497
498            // Among the source's still-movable ranges, choose the one that most
499            // reduces the worse of the two deviations after the move.
500            let mut best: Option<((CollectionId, RangeId), f64)> = None;
501            for key in self.ranges_owned_by(&source) {
502                let s = self.ranges[&key].bytes as f64;
503                let after = (dev_src - s).abs().max((dev_tgt + s).abs());
504                let better = match &best {
505                    None => true,
506                    Some((_, best_after)) => after < *best_after,
507                };
508                if better {
509                    best = Some((key, after));
510                }
511            }
512
513            let Some((key, worst_after)) = best else {
514                break;
515            };
516            // Only take the move if it strictly improves the worst imbalance —
517            // otherwise we would churn ownership for nothing.
518            if worst_after >= worst_before {
519                break;
520            }
521
522            let bytes = self.ranges[&key].bytes;
523            let from = self.origin_owner[&key].clone();
524            self.apply_move(&key, &target);
525            planned.push(PlannedMove {
526                collection: key.0,
527                range_id: key.1,
528                from,
529                to: target,
530                bytes,
531                reason: MoveReason::CapacityBalance,
532            });
533        }
534
535        planned
536    }
537
538    /// A member's deviation from its fair share in bytes: positive is over-full,
539    /// negative is under-full.
540    fn deviation(&self, member: &NodeIdentity) -> f64 {
541        self.used.get(member).copied().unwrap_or(0) as f64 - self.fair(member) as f64
542    }
543
544    /// The eligible member furthest over its fair share, beyond the tolerance
545    /// band, or `None` if everyone is within tolerance.
546    fn most_over(&self, policy: &PlacementPolicy) -> Option<NodeIdentity> {
547        self.eligible
548            .iter()
549            .filter(|id| {
550                let used = self.used.get(*id).copied().unwrap_or(0) as f64;
551                let fair = self.fair(id) as f64;
552                used > fair * (1.0 + policy.balance_tolerance) && used > fair
553            })
554            .max_by(|a, b| {
555                self.deviation(a)
556                    .partial_cmp(&self.deviation(b))
557                    .unwrap()
558                    // Tie-break by identity so the plan is deterministic.
559                    .then_with(|| b.cmp(a))
560            })
561            .cloned()
562    }
563
564    /// The eligible member furthest *under* its fair share (the best target),
565    /// excluding `source`.
566    fn most_under(&self, source: &NodeIdentity) -> Option<NodeIdentity> {
567        self.eligible
568            .iter()
569            .filter(|id| *id != source && self.deviation(id) < 0.0)
570            .min_by(|a, b| {
571                self.deviation(a)
572                    .partial_cmp(&self.deviation(b))
573                    .unwrap()
574                    // Tie-break by identity so the plan is deterministic.
575                    .then_with(|| a.cmp(b))
576            })
577            .cloned()
578    }
579
580    /// The **secondary** pass: identify hotspot ranges (traffic well above the
581    /// cluster mean) and, for each, propose spreading it to a member with both
582    /// load and capacity headroom — but only when that strictly lowers the owner's
583    /// load concentration and respects the capacity tolerance. Returns the
584    /// observed hotspots (hottest first) and any relief moves.
585    fn plan_hotspot_moves(
586        &mut self,
587        policy: &PlacementPolicy,
588    ) -> (Vec<HotspotRange>, Vec<PlannedMove>) {
589        let mut hotspots = Vec::new();
590        let mut moves = Vec::new();
591
592        let range_count = self.ranges.len();
593        if range_count == 0 {
594            return (hotspots, moves);
595        }
596        let total_traffic: u64 = self.ranges.values().map(|f| f.traffic).sum();
597        let mean = total_traffic as f64 / range_count as f64;
598        let threshold = mean * policy.hotspot_load_factor;
599        if mean <= 0.0 {
600            return (hotspots, moves);
601        }
602
603        // Collect hotspots, hottest first; tie-break by key for determinism.
604        let mut hot: Vec<((CollectionId, RangeId), u64)> = self
605            .ranges
606            .iter()
607            .filter(|(_, f)| f.traffic as f64 > threshold)
608            .map(|(key, f)| (key.clone(), f.traffic))
609            .collect();
610        hot.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
611
612        for (key, traffic) in hot {
613            let owner = self.owner_of[&key].clone();
614            hotspots.push(HotspotRange {
615                collection: key.0.clone(),
616                range_id: key.1,
617                // Report the range's true catalog owner — the member actually
618                // bearing the hot traffic — independent of any simulated move.
619                owner: self.origin_owner[&key].clone(),
620                traffic,
621            });
622
623            // A hotspot already scheduled to move (by the capacity pass) needs no
624            // second move, and an owner holding only this one range cannot be
625            // relieved by moving it — that just relocates the hotspot.
626            if self.moved.contains(&key) || self.ranges_owned_by(&owner).len() < 2 {
627                continue;
628            }
629
630            let facts = self.ranges[&key];
631            let owner_load = self.load.get(&owner).copied().unwrap_or(0);
632
633            // Pick the eligible member with the least load that can take the range
634            // without breaching its capacity tolerance and ends up less loaded than
635            // the owner is now — otherwise the move does not spread load.
636            let target = self
637                .eligible
638                .iter()
639                .filter(|id| **id != owner)
640                .filter(|id| {
641                    let used = self.used.get(*id).copied().unwrap_or(0);
642                    let fair = self.fair(id) as f64;
643                    (used + facts.bytes) as f64 <= fair * (1.0 + policy.balance_tolerance)
644                })
645                .filter(|id| {
646                    let tgt_load = self.load.get(*id).copied().unwrap_or(0);
647                    tgt_load + facts.traffic < owner_load
648                })
649                .min_by(|a, b| {
650                    let la = self.load.get(*a).copied().unwrap_or(0);
651                    let lb = self.load.get(*b).copied().unwrap_or(0);
652                    la.cmp(&lb).then_with(|| a.cmp(b))
653                })
654                .cloned();
655
656            if let Some(target) = target {
657                let from = self.origin_owner[&key].clone();
658                self.apply_move(&key, &target);
659                moves.push(PlannedMove {
660                    collection: key.0,
661                    range_id: key.1,
662                    from,
663                    to: target,
664                    bytes: facts.bytes,
665                    reason: MoveReason::HotspotRelief,
666                });
667            }
668        }
669
670        (hotspots, moves)
671    }
672}
673
674#[cfg(test)]
675mod tests {
676    use super::*;
677    use crate::cluster::membership::{ClusterId, ClusterMember, MemberKind};
678    use crate::cluster::ownership::{PlacementMetadata, RangeBounds, RangeOwnership, ShardKeyMode};
679    use std::collections::HashMap;
680
681    fn ident(cn: &str) -> NodeIdentity {
682        NodeIdentity::from_certificate_subject(cn).unwrap()
683    }
684
685    fn collection(name: &str) -> CollectionId {
686        CollectionId::new(name).unwrap()
687    }
688
689    fn data_member(cn: &str) -> ClusterMember {
690        ClusterMember::joined_empty(ident(cn), MemberKind::Data)
691    }
692
693    fn membership(members: &[&str]) -> MembershipCatalog {
694        MembershipCatalog::new(
695            ClusterId::new("cluster-x").unwrap(),
696            members.iter().map(|m| data_member(m)),
697        )
698    }
699
700    /// A catalog of `n` single-owner ranges in `orders`, assigning range `i` to
701    /// `owners[i]`. Each range is a distinct hash partition so they never overlap.
702    fn catalog(owners: &[&str]) -> (ShardOwnershipCatalog, CollectionId) {
703        let orders = collection("orders");
704        let mut catalog = ShardOwnershipCatalog::new();
705        for (i, owner) in owners.iter().enumerate() {
706            let lower = vec![i as u8];
707            let upper = vec![i as u8 + 1];
708            let bounds = RangeBounds::new(
709                crate::cluster::ownership::RangeBound::key(lower),
710                crate::cluster::ownership::RangeBound::key(upper),
711            )
712            .unwrap();
713            catalog
714                .apply_update(RangeOwnership::establish(
715                    orders.clone(),
716                    RangeId::new(i as u64 + 1),
717                    ShardKeyMode::Hash,
718                    bounds,
719                    ident(owner),
720                    Vec::<NodeIdentity>::new(),
721                    PlacementMetadata::with_replication_factor(1),
722                ))
723                .unwrap();
724        }
725        (catalog, orders)
726    }
727
728    /// A scripted [`PlacementSignals`]: per-member capacity (defaulting to a
729    /// uniform disk) and per-range load keyed by range id.
730    struct FakeSignals {
731        default_capacity: MemberCapacity,
732        capacity: HashMap<NodeIdentity, MemberCapacity>,
733        load: HashMap<u64, RangeLoad>,
734        default_bytes: u64,
735    }
736
737    impl FakeSignals {
738        fn uniform(disk: u64, default_bytes: u64) -> Self {
739            Self {
740                default_capacity: MemberCapacity::with_disk(disk),
741                capacity: HashMap::new(),
742                load: HashMap::new(),
743                default_bytes,
744            }
745        }
746
747        fn with_capacity(mut self, cn: &str, cap: MemberCapacity) -> Self {
748            self.capacity.insert(ident(cn), cap);
749            self
750        }
751
752        fn with_load(mut self, range_id: u64, load: RangeLoad) -> Self {
753            self.load.insert(range_id, load);
754            self
755        }
756    }
757
758    impl PlacementSignals for FakeSignals {
759        fn member_capacity(&self, member: &NodeIdentity) -> MemberCapacity {
760            self.capacity
761                .get(member)
762                .copied()
763                .unwrap_or(self.default_capacity)
764        }
765
766        fn range_load(&self, _collection: &CollectionId, range_id: RangeId) -> RangeLoad {
767            self.load
768                .get(&range_id.value())
769                .copied()
770                .unwrap_or_else(|| RangeLoad::idle(self.default_bytes))
771        }
772    }
773
774    // --- weighted capacity model -----------------------------------------
775
776    #[test]
777    fn weighted_capacity_scales_disk_by_operator_weight() {
778        // Neutral weight places strictly by disk.
779        assert_eq!(MemberCapacity::with_disk(1_000).weighted_capacity(), 1_000);
780        // A 2.0x operator weight doubles the placement weight; 0.5x halves it.
781        assert_eq!(MemberCapacity::new(1_000, 200).weighted_capacity(), 2_000);
782        assert_eq!(MemberCapacity::new(1_000, 50).weighted_capacity(), 500);
783        // No disk -> not placeable.
784        assert!(!MemberCapacity::with_disk(0).is_placeable());
785        assert!(MemberCapacity::with_disk(1).is_placeable());
786    }
787
788    // --- acceptance scenario: homogeneous placement ----------------------
789
790    #[test]
791    fn homogeneous_cluster_is_balanced_and_plans_nothing() {
792        // Three members, equal disk, three equal-sized ranges one each: already
793        // perfectly balanced, so the planner proposes no move.
794        let planner = WeightedPlacementPlanner::default();
795        let members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
796        let (catalog, _orders) = catalog(&["CN=node-a", "CN=node-b", "CN=node-c"]);
797        let signals = FakeSignals::uniform(1_000_000, 100);
798
799        let plan = planner.plan_rebalance(&members, &catalog, &signals);
800        assert!(plan.is_empty(), "balanced homogeneous cluster is a no-op");
801    }
802
803    #[test]
804    fn homogeneous_cluster_with_skew_spreads_ranges() {
805        // All three ranges sit on node-a while node-b and node-c are empty. With
806        // equal capacity the fair share is one range each, so the planner moves two
807        // ranges off node-a.
808        let planner = WeightedPlacementPlanner::default();
809        let members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
810        let (catalog, _orders) = catalog(&["CN=node-a", "CN=node-a", "CN=node-a"]);
811        let signals = FakeSignals::uniform(1_000_000, 100);
812
813        let plan = planner.plan_rebalance(&members, &catalog, &signals);
814        assert_eq!(
815            plan.capacity_moves().count(),
816            2,
817            "two ranges move off node-a"
818        );
819        for mv in plan.capacity_moves() {
820            assert_eq!(mv.from, ident("CN=node-a"));
821            assert_ne!(mv.to, ident("CN=node-a"));
822            assert_eq!(mv.reason, MoveReason::CapacityBalance);
823        }
824        // node-b and node-c each receive exactly one range.
825        let targets: std::collections::BTreeSet<_> =
826            plan.capacity_moves().map(|m| m.to.clone()).collect();
827        assert_eq!(targets.len(), 2);
828    }
829
830    // --- acceptance scenario: heterogeneous disk weights -----------------
831
832    #[test]
833    fn heterogeneous_disk_weights_apportion_by_capacity() {
834        // node-big advertises 4x the disk of node-small. Six equal ranges all start
835        // on node-small; fair shares are big≈4.8, small≈1.2 ranges, so the planner
836        // moves the bulk onto node-big.
837        let planner = WeightedPlacementPlanner::default();
838        let members = membership(&["CN=node-big", "CN=node-small"]);
839        let (catalog, _orders) = catalog(&[
840            "CN=node-small",
841            "CN=node-small",
842            "CN=node-small",
843            "CN=node-small",
844            "CN=node-small",
845            "CN=node-small",
846        ]);
847        let signals = FakeSignals::uniform(1_000, 100)
848            .with_capacity("CN=node-big", MemberCapacity::with_disk(4_000))
849            .with_capacity("CN=node-small", MemberCapacity::with_disk(1_000));
850
851        let plan = planner.plan_rebalance(&members, &catalog, &signals);
852        assert!(!plan.no_moves(), "imbalanced cluster must plan moves");
853        // Every move goes from small to big, and big ends with ~4-5 of the 6 ranges.
854        let to_big = plan
855            .capacity_moves()
856            .filter(|m| m.to == ident("CN=node-big"))
857            .count();
858        assert!(
859            (4..=5).contains(&to_big),
860            "node-big should receive ~4/5 of 6 ranges, got {to_big}"
861        );
862        for mv in plan.capacity_moves() {
863            assert_eq!(mv.from, ident("CN=node-small"));
864            assert_eq!(mv.to, ident("CN=node-big"));
865        }
866    }
867
868    #[test]
869    fn operator_weight_biases_placement_without_more_disk() {
870        // Same disk on both, but node-pref carries a 3x operator weight, so it
871        // deserves the larger share of four ranges that all start on node-plain.
872        let planner = WeightedPlacementPlanner::default();
873        let members = membership(&["CN=node-pref", "CN=node-plain"]);
874        let (catalog, _orders) = catalog(&[
875            "CN=node-plain",
876            "CN=node-plain",
877            "CN=node-plain",
878            "CN=node-plain",
879        ]);
880        let signals = FakeSignals::uniform(1_000, 100)
881            .with_capacity("CN=node-pref", MemberCapacity::new(1_000, 300));
882
883        let plan = planner.plan_rebalance(&members, &catalog, &signals);
884        let to_pref = plan
885            .capacity_moves()
886            .filter(|m| m.to == ident("CN=node-pref"))
887            .count();
888        assert!(
889            to_pref >= 2,
890            "higher operator weight pulls more ranges, got {to_pref}"
891        );
892    }
893
894    // --- acceptance scenario: capacity expansion -------------------------
895
896    #[test]
897    fn expanding_disk_changes_weight_and_next_plan_without_moving_data() {
898        // Start heterogeneous: node-a small, node-b large, all six ranges on node-a.
899        let planner = WeightedPlacementPlanner::default();
900        let members = membership(&["CN=node-a", "CN=node-b"]);
901        let (catalog, orders) = catalog(&[
902            "CN=node-a",
903            "CN=node-a",
904            "CN=node-a",
905            "CN=node-a",
906            "CN=node-a",
907            "CN=node-a",
908        ]);
909
910        // Before expansion: node-b has only modest disk, so it receives a modest
911        // share.
912        let before_signals = FakeSignals::uniform(1_000, 100)
913            .with_capacity("CN=node-a", MemberCapacity::with_disk(3_000))
914            .with_capacity("CN=node-b", MemberCapacity::with_disk(1_000));
915        let before = planner.plan_rebalance(&members, &catalog, &before_signals);
916        let before_to_b = before
917            .capacity_moves()
918            .filter(|m| m.to == ident("CN=node-b"))
919            .count();
920
921        // Operator expands node-b's disk 8x. Its placement weight jumps...
922        let small = MemberCapacity::with_disk(1_000);
923        let expanded = MemberCapacity::with_disk(8_000);
924        assert!(
925            expanded.weighted_capacity() > small.weighted_capacity(),
926            "expanding disk raises placement weight",
927        );
928        let after_signals = FakeSignals::uniform(1_000, 100)
929            .with_capacity("CN=node-a", MemberCapacity::with_disk(3_000))
930            .with_capacity("CN=node-b", expanded);
931        let after = planner.plan_rebalance(&members, &catalog, &after_signals);
932        let after_to_b = after
933            .capacity_moves()
934            .filter(|m| m.to == ident("CN=node-b"))
935            .count();
936
937        // ...so the *next* plan apportions more ranges to node-b than before.
938        assert!(
939            after_to_b > before_to_b,
940            "expanded disk pulls more ranges on the next plan ({before_to_b} -> {after_to_b})",
941        );
942
943        // But planning never moved data: the catalog still shows all six ranges on
944        // node-a. Data only relocates when a transition plan is executed.
945        for i in 1..=6 {
946            let range = catalog.range(&orders, RangeId::new(i)).unwrap();
947            assert_eq!(
948                range.owner(),
949                &ident("CN=node-a"),
950                "range {i} stayed on node-a; planning moved nothing",
951            );
952        }
953    }
954
955    // --- acceptance scenario: hotspot signal influence -------------------
956
957    #[test]
958    fn hotspot_traffic_identifies_secondary_candidate() {
959        // Capacity is *balanced* — node-a owns two ranges but has twice the disk,
960        // so every member sits exactly on its fair share and the primary pass
961        // proposes nothing. Yet range 1 on node-a serves a huge read/write load (a
962        // small, read-hammered range) while the others are quiet. The secondary
963        // signal flags it as a hotspot and, because node-a also carries other
964        // traffic and a quiet member has both load and capacity headroom, proposes
965        // spreading it.
966        let planner = WeightedPlacementPlanner::default();
967        let members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
968        // node-a owns ranges 1 and 2; node-b owns 3; node-c owns 4.
969        let (catalog, _orders) = catalog(&["CN=node-a", "CN=node-a", "CN=node-b", "CN=node-c"]);
970        // node-a has 2x disk so its fair share covers both its ranges (40 bytes);
971        // node-b and node-c each match their single 20-byte range.
972        let signals = FakeSignals::uniform(0, 0)
973            .with_capacity("CN=node-a", MemberCapacity::with_disk(2_000))
974            .with_capacity("CN=node-b", MemberCapacity::with_disk(1_000))
975            .with_capacity("CN=node-c", MemberCapacity::with_disk(1_000))
976            // The hot range is tiny on disk but hammered; node-a keeps real
977            // residual traffic on range 2.
978            .with_load(
979                1,
980                RangeLoad {
981                    bytes_used: 2,
982                    read_ops: 1_000,
983                    write_ops: 1_000,
984                },
985            )
986            .with_load(
987                2,
988                RangeLoad {
989                    bytes_used: 38,
990                    read_ops: 300,
991                    write_ops: 0,
992                },
993            )
994            .with_load(
995                3,
996                RangeLoad {
997                    bytes_used: 20,
998                    read_ops: 100,
999                    write_ops: 0,
1000                },
1001            )
1002            .with_load(
1003                4,
1004                RangeLoad {
1005                    bytes_used: 20,
1006                    read_ops: 100,
1007                    write_ops: 0,
1008                },
1009            );
1010
1011        let plan = planner.plan_rebalance(&members, &catalog, &signals);
1012        // Capacity is balanced, so no capacity-balance move is proposed.
1013        assert_eq!(plan.capacity_moves().count(), 0, "capacity is balanced");
1014        // Range 1 is identified as a hotspot, attributed to its real owner.
1015        assert_eq!(plan.hotspots.len(), 1, "the hot range is surfaced");
1016        assert_eq!(plan.hotspots[0].range_id, RangeId::new(1));
1017        assert_eq!(plan.hotspots[0].owner, ident("CN=node-a"));
1018        assert_eq!(plan.hotspots[0].traffic, 2_000);
1019        // And a hotspot-relief move spreads it off node-a onto the quietest member.
1020        let relief: Vec<_> = plan.hotspot_moves().collect();
1021        assert_eq!(relief.len(), 1, "a relief move is planned");
1022        assert_eq!(relief[0].range_id, RangeId::new(1));
1023        assert_eq!(relief[0].from, ident("CN=node-a"));
1024        assert_eq!(
1025            relief[0].to,
1026            ident("CN=node-b"),
1027            "quietest target, tie -> lowest id"
1028        );
1029        assert_eq!(relief[0].reason, MoveReason::HotspotRelief);
1030    }
1031
1032    #[test]
1033    fn no_hotspot_when_traffic_is_even() {
1034        // Balanced capacity (one range each, equal disk) and equal traffic: nothing
1035        // is a hotspot and the secondary signal proposes nothing.
1036        let planner = WeightedPlacementPlanner::default();
1037        let members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
1038        let (catalog, _orders) = catalog(&["CN=node-a", "CN=node-b", "CN=node-c"]);
1039        let signals = FakeSignals::uniform(1_000_000, 100)
1040            .with_load(
1041                1,
1042                RangeLoad {
1043                    bytes_used: 10,
1044                    read_ops: 100,
1045                    write_ops: 100,
1046                },
1047            )
1048            .with_load(
1049                2,
1050                RangeLoad {
1051                    bytes_used: 10,
1052                    read_ops: 100,
1053                    write_ops: 100,
1054                },
1055            )
1056            .with_load(
1057                3,
1058                RangeLoad {
1059                    bytes_used: 10,
1060                    read_ops: 100,
1061                    write_ops: 100,
1062                },
1063            );
1064
1065        let plan = planner.plan_rebalance(&members, &catalog, &signals);
1066        assert!(plan.is_empty(), "balanced, even-traffic cluster is a no-op");
1067    }
1068
1069    // --- acceptance scenario: no implicit data movement ------------------
1070
1071    #[test]
1072    fn planning_never_mutates_the_catalog() {
1073        // A deliberately skewed cluster yields a non-empty plan, yet the ownership
1074        // catalog is byte-for-byte identical before and after planning: the planner
1075        // only *describes* moves, it never performs them.
1076        let planner = WeightedPlacementPlanner::default();
1077        let members = membership(&["CN=node-a", "CN=node-b"]);
1078        let (catalog, orders) = catalog(&["CN=node-a", "CN=node-a", "CN=node-a", "CN=node-a"]);
1079        let signals = FakeSignals::uniform(1_000, 100);
1080
1081        // Snapshot every range's owner/epoch/version before planning.
1082        let before: Vec<_> = (1..=4)
1083            .map(|i| {
1084                let r = catalog.range(&orders, RangeId::new(i)).unwrap();
1085                (r.owner().clone(), r.epoch(), r.version())
1086            })
1087            .collect();
1088
1089        let plan = planner.plan_rebalance(&members, &catalog, &signals);
1090        assert!(!plan.no_moves(), "skewed cluster does plan moves");
1091
1092        // The catalog is unchanged: same owners, same epochs, same versions.
1093        for (i, snap) in before.iter().enumerate() {
1094            let r = catalog.range(&orders, RangeId::new(i as u64 + 1)).unwrap();
1095            assert_eq!(&(r.owner().clone(), r.epoch(), r.version()), snap);
1096        }
1097    }
1098
1099    #[test]
1100    fn draining_owner_ranges_are_left_to_the_drain_flow() {
1101        // node-a is draining (not placement-eligible). Its ranges are not moved by
1102        // the rebalancer — drain owns evacuating them — so no plan targets or
1103        // sources it for placement balancing.
1104        let planner = WeightedPlacementPlanner::default();
1105        let mut members = membership(&["CN=node-a", "CN=node-b"]);
1106        members.begin_drain(&ident("CN=node-a"));
1107        let (catalog, _orders) = catalog(&["CN=node-a", "CN=node-a", "CN=node-a"]);
1108        let signals = FakeSignals::uniform(1_000, 100);
1109
1110        let plan = planner.plan_rebalance(&members, &catalog, &signals);
1111        // node-a's ranges are not movable here, and node-b is the only eligible
1112        // member, so there is nothing to balance.
1113        assert!(
1114            plan.no_moves(),
1115            "draining owner's ranges are not rebalanced"
1116        );
1117    }
1118
1119    #[test]
1120    fn single_member_cluster_plans_nothing() {
1121        let planner = WeightedPlacementPlanner::default();
1122        let members = membership(&["CN=node-a"]);
1123        let (catalog, _orders) = catalog(&["CN=node-a", "CN=node-a"]);
1124        let signals = FakeSignals::uniform(1_000, 100);
1125
1126        let plan = planner.plan_rebalance(&members, &catalog, &signals);
1127        assert!(
1128            plan.no_moves(),
1129            "nowhere to move ranges in a one-member cluster"
1130        );
1131    }
1132}