Skip to main content

reddb_server/cluster/
move_range.rs

1//! Split-and-move planning and the move-range cutover state machine
2//! (issue #1004, PRD #987, ADR 0037).
3//!
4//! The [`WeightedPlacementPlanner`](super::placement::WeightedPlacementPlanner)
5//! decides *that* a range should move; this module decides *how* it moves and
6//! drives the move safely to completion. It is the glossary's **split-and-move**
7//! — *"rebalancing transition that first divides a large or hot shard/range, then
8//! moves only the selected subrange to a different writer. Small ranges may move
9//! whole without splitting"* — riding the glossary's **move range cutover** —
10//! *"the old owner continues serving writes while the target first copies a
11//! physical checkpoint/snapshot of the range directory, then catches up through
12//! the logical range-indexed stream; only after catch-up does the catalog epoch
13//! move write authority to the target."*
14//!
15//! ## Whole-range vs split-and-move
16//!
17//! [`classify_move`] is the small/large-or-hot decision: a range whose bytes and
18//! traffic both sit under the [`SplitPolicy`] thresholds moves whole
19//! ([`MoveKind::Whole`]); a range over either threshold is split first so the
20//! move sheds only part of the load ([`MoveKind::Split`]). [`split_range`] then
21//! carves the range at a chosen key into a retained child (the keys the owner
22//! keeps) and a moved child (a fresh range id the move hands off), tiling the
23//! original keyspace with no gap or overlap.
24//!
25//! ## The cutover, fenced and gated
26//!
27//! [`MoveRange`] is the state machine for one move. It encodes the move-range
28//! invariant directly:
29//!
30//! 1. **[`CopyingSnapshot`](MovePhase::CopyingSnapshot)** — the target copies a
31//!    consistent physical snapshot of the range. Throughout, the catalog still
32//!    names the old owner, so the old owner *keeps serving writes*.
33//! 2. **[`CatchingUp`](MovePhase::CatchingUp)** — the snapshot is installed at a
34//!    consistent [`CommitWatermark`]; the target replays **range-indexed WAL
35//!    records** (issue #992) from that point to close the gap to the live commit
36//!    watermark, which keeps advancing because the old owner is still writing.
37//! 3. **[`cut_over`](MoveRange::cut_over)** — only when the target's applied log
38//!    covers the live commit watermark does the fenced
39//!    [`Handoff`](super::ownership_transition::TransitionKind::Handoff) transition
40//!    move the catalog epoch. The epoch bump fences the old owner (its writes now
41//!    carry a stale epoch and [`admit_public_write`] rejects them) and makes the
42//!    target authoritative. *The target accepts no public write until this
43//!    instant* — before it, the target is a replica and the ownership gate
44//!    rejects it.
45//!
46//! ## Interrupted moves fail safe
47//!
48//! A move can be interrupted at any point — a supervisor restart, a crashed
49//! target. [`recover_interrupted_move`] resumes from the target's persisted
50//! catch-up position and **promotes the target only if it covers the range commit
51//! watermark**; otherwise it leaves the catalog untouched and the old owner keeps
52//! authority. A half-copied target is never promoted, so an interrupted move can
53//! lose no committed write.
54//!
55//! Everything here is a pure data model over the catalog plus the range-indexed
56//! WAL contract — no disk, no clock, no network — so the split arithmetic, the
57//! catch-up gate, the fencing, and the interrupted-move safety are all exercised
58//! deterministically.
59//!
60//! [`admit_public_write`]: super::ownership::ShardOwnershipCatalog::admit_public_write
61
62use crate::replication::cdc::{
63    plan_range_catchup, ChangeRecord, RangeCatchupPlan, RangeStreamPosition,
64};
65
66use super::identity::NodeIdentity;
67use super::ownership::{
68    CatalogError, CatalogVersion, CollectionId, OwnershipEpoch, RangeBoundsError, RangeId,
69    RangeOwnership, ShardOwnershipCatalog,
70};
71use super::ownership_transition::{
72    run_transition, CatchUpEvidence, CommitWatermark, TransitionError, TransitionKind,
73    TransitionOutcome, TransitionRequest,
74};
75use super::placement::RangeLoad;
76
77/// The thresholds that decide whether a range is small enough to move whole or
78/// must be split first.
79///
80/// The two are independent and either trips a split: a range can be small on disk
81/// yet a traffic hotspot, or quiet yet too large to copy and cut over as one
82/// unit. Splitting in either case lets the move shed only a subrange instead of
83/// relocating the whole load at once.
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85pub struct SplitPolicy {
86    /// A range strictly **above** this many bytes is "large" — too big to copy
87    /// and cut over whole, so it is split and only a subrange moves.
88    pub max_whole_move_bytes: u64,
89    /// A range serving **at or above** this much read+write traffic in the
90    /// observation window is "hot" — split so the move relocates only part of the
91    /// traffic.
92    pub hot_traffic_threshold: u64,
93}
94
95impl Default for SplitPolicy {
96    fn default() -> Self {
97        // Deliberately coarse defaults: only a genuinely large or genuinely hot
98        // range is worth the extra split step; everything else moves whole.
99        Self {
100            max_whole_move_bytes: 256 * 1024 * 1024,
101            hot_traffic_threshold: 10_000,
102        }
103    }
104}
105
106/// How a planned move should be carried out: relocate the whole range, or split
107/// it first and move only a subrange.
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
109pub enum MoveKind {
110    /// Small and cool: copy and cut over the whole range in one move.
111    Whole,
112    /// Large or hot: divide the range and move only the selected subrange.
113    Split,
114}
115
116/// Decide whether a range moves whole or is split first, from its live load and
117/// the [`SplitPolicy`]. A range over the byte ceiling **or** at/over the hot
118/// traffic threshold is split; otherwise it moves whole.
119pub fn classify_move(load: RangeLoad, policy: &SplitPolicy) -> MoveKind {
120    let large = load.bytes_used > policy.max_whole_move_bytes;
121    let hot = load.traffic() >= policy.hot_traffic_threshold && policy.hot_traffic_threshold > 0;
122    if large || hot {
123        MoveKind::Split
124    } else {
125        MoveKind::Whole
126    }
127}
128
129/// Which child of a split moves to the target.
130#[derive(Debug, Clone, Copy, PartialEq, Eq)]
131pub enum SplitSide {
132    /// The lower child `[lower, split_key)` moves; the owner retains the upper.
133    Lower,
134    /// The upper child `[split_key, upper)` moves; the owner retains the lower.
135    Upper,
136}
137
138/// The two entries a [`split_range`] produces: the child the owner keeps and the
139/// child the move will hand off.
140///
141/// Applying a split is order-sensitive — the retained child must be **narrowed
142/// first**, then the moved child created — or the create would transiently
143/// overlap the still-full original and the catalog would reject it.
144/// [`apply`](Self::apply) does this in the right order.
145#[derive(Debug, Clone, PartialEq, Eq)]
146pub struct RangeSplit {
147    retained: RangeOwnership,
148    moved: RangeOwnership,
149}
150
151impl RangeSplit {
152    /// The child the owner keeps writing — the original range id, narrowed to the
153    /// non-moved keys, version advanced but epoch unchanged (no authority moved).
154    pub fn retained(&self) -> &RangeOwnership {
155        &self.retained
156    }
157
158    /// The carved-off child the move hands off — a fresh range id, still owned by
159    /// the original owner (which keeps serving its keys until cutover) with the
160    /// move target enlisted as a replica.
161    pub fn moved(&self) -> &RangeOwnership {
162        &self.moved
163    }
164
165    /// Install the split into the catalog: narrow the retained child first, then
166    /// create the moved child. After this the two children tile the original
167    /// keyspace and the move can proceed on [`moved`](Self::moved)'s range id.
168    pub fn apply(&self, catalog: &mut ShardOwnershipCatalog) -> Result<(), CatalogError> {
169        // Narrow the retained child first so the moved child no longer overlaps a
170        // still-full original on create.
171        catalog.apply_update(self.retained.clone())?;
172        catalog.apply_update(self.moved.clone())?;
173        Ok(())
174    }
175}
176
177/// Divide `range` at `split_key` into a retained child and a moved child, with
178/// `target` enlisted as a replica of the moved child so a later
179/// [`MoveRange`] can hand authority to it.
180///
181/// `moved_id` is the fresh range id the carved-off subrange takes; it must differ
182/// from `range`'s own id. `moved_side` selects which child moves: the retained
183/// child keeps `range`'s id (narrowed in place), and the moved child is a brand
184/// new entry at epoch/version 1 — its data still lives under the owner until the
185/// move cuts over. Fails with [`SplitError`] if the split key does not fall
186/// strictly inside the range or the moved id collides with the original.
187pub fn split_range(
188    range: &RangeOwnership,
189    split_key: &[u8],
190    moved_side: SplitSide,
191    moved_id: RangeId,
192    target: NodeIdentity,
193) -> Result<RangeSplit, SplitError> {
194    if moved_id == range.range_id() {
195        return Err(SplitError::MovedIdCollision { id: moved_id });
196    }
197    let (lower_bounds, upper_bounds) = range
198        .bounds()
199        .split_at(split_key)
200        .map_err(SplitError::Bounds)?;
201    let (retained_bounds, moved_bounds) = match moved_side {
202        // The moved child is the lower part; the owner retains the upper.
203        SplitSide::Lower => (upper_bounds, lower_bounds),
204        // The moved child is the upper part; the owner retains the lower.
205        SplitSide::Upper => (lower_bounds, upper_bounds),
206    };
207
208    // The retained child keeps the original id and owner, narrowed in place.
209    let retained = range.with_bounds(retained_bounds);
210
211    // The moved child is a fresh range, still owned by the current owner (it keeps
212    // serving these keys until cutover) with the target enlisted as a replica so
213    // the handoff has a valid promotion candidate.
214    let mut replicas: Vec<NodeIdentity> = range.replicas().to_vec();
215    if !replicas.contains(&target) {
216        replicas.push(target);
217    }
218    let moved = RangeOwnership::establish(
219        range.collection().clone(),
220        moved_id,
221        range.shard_key_mode(),
222        moved_bounds,
223        range.owner().clone(),
224        replicas,
225        range.placement().clone(),
226    );
227
228    Ok(RangeSplit { retained, moved })
229}
230
231/// Why a range split could not be planned.
232#[derive(Debug, Clone, PartialEq, Eq)]
233pub enum SplitError {
234    /// The split key does not fall strictly inside the range's bounds, so one
235    /// child would be empty.
236    Bounds(RangeBoundsError),
237    /// The moved subrange was given the same range id as the range being split.
238    MovedIdCollision { id: RangeId },
239}
240
241impl std::fmt::Display for SplitError {
242    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
243        match self {
244            Self::Bounds(err) => write!(f, "cannot split range: {err}"),
245            Self::MovedIdCollision { id } => write!(
246                f,
247                "split moved subrange id {id} collides with the range being split"
248            ),
249        }
250    }
251}
252
253impl std::error::Error for SplitError {}
254
255/// Where a move-range is in its copy → catch-up → cutover lifecycle.
256#[derive(Debug, Clone, Copy, PartialEq, Eq)]
257pub enum MovePhase {
258    /// The target is copying a consistent physical snapshot of the range. The
259    /// catalog still names the old owner, which keeps serving writes.
260    CopyingSnapshot,
261    /// The snapshot is installed at a consistent watermark; the target is
262    /// replaying range-indexed WAL records to catch up to the live commit
263    /// watermark.
264    CatchingUp,
265    /// The catalog epoch has moved: the target is authoritative and the old owner
266    /// is fenced.
267    Completed,
268    /// The move was abandoned; the old owner retains authority.
269    Aborted,
270}
271
272impl MovePhase {
273    fn label(self) -> &'static str {
274        match self {
275            MovePhase::CopyingSnapshot => "copying-snapshot",
276            MovePhase::CatchingUp => "catching-up",
277            MovePhase::Completed => "completed",
278            MovePhase::Aborted => "aborted",
279        }
280    }
281}
282
283/// One in-flight move-range: the bookkeeping that carries authority for one range
284/// from its current owner to a target without losing a write or letting the
285/// target serve early.
286///
287/// Built with [`begin`](Self::begin), which enlists the target as a replica and
288/// captures the catalog CAS (owner / epoch / version) the cutover will use. The
289/// snapshot point and the target's catch-up progress are filled in as the move
290/// runs. Until [`cut_over`](Self::cut_over) succeeds the catalog is unchanged, so
291/// the old owner keeps serving and the target — a mere replica — cannot.
292#[derive(Debug, Clone, PartialEq, Eq)]
293pub struct MoveRange {
294    collection: CollectionId,
295    range_id: RangeId,
296    /// The range's current owner — the move's source, fenced at cutover.
297    source: NodeIdentity,
298    /// The move target — promoted at cutover, a replica until then.
299    target: NodeIdentity,
300    /// The catalog epoch captured at [`begin`](Self::begin) — the cutover CAS.
301    expected_epoch: OwnershipEpoch,
302    /// The catalog version captured at [`begin`](Self::begin) — the cutover CAS.
303    expected_version: CatalogVersion,
304    phase: MovePhase,
305    /// The consistent point the snapshot was taken at, once installed.
306    snapshot_watermark: Option<CommitWatermark>,
307    /// The target's range-indexed catch-up position over the shared WAL, once the
308    /// snapshot is installed and catch-up begins.
309    position: Option<RangeStreamPosition>,
310}
311
312impl MoveRange {
313    /// Start moving `(collection, range_id)` to `target`. Enlists `target` as a
314    /// replica of the range if it is not one already (so the cutover has a valid
315    /// promotion candidate), then captures the catalog CAS for the eventual
316    /// fenced handoff. The move begins in [`CopyingSnapshot`](MovePhase::CopyingSnapshot);
317    /// the catalog's *owner* is unchanged, so the old owner keeps serving writes.
318    ///
319    /// Fails if the range is unknown or `target` is already its owner (a move to
320    /// the incumbent is a no-op).
321    pub fn begin(
322        catalog: &mut ShardOwnershipCatalog,
323        collection: CollectionId,
324        range_id: RangeId,
325        target: NodeIdentity,
326    ) -> Result<Self, MoveError> {
327        let current =
328            catalog
329                .range(&collection, range_id)
330                .ok_or_else(|| MoveError::UnknownRange {
331                    collection: collection.clone(),
332                    range_id,
333                })?;
334        let source = current.owner().clone();
335        if target == source {
336            return Err(MoveError::TargetIsOwner {
337                collection,
338                range_id,
339                owner: source,
340            });
341        }
342
343        // Enlist the target as a replica if it is not one yet — a replica is the
344        // only valid handoff candidate. This advances the version but not the
345        // epoch (no authority moved), so the old owner is not fenced.
346        if !current.replicas().contains(&target) {
347            let mut replicas: Vec<NodeIdentity> = current.replicas().to_vec();
348            replicas.push(target.clone());
349            let enlisted = current.update_replicas(replicas);
350            catalog.apply_update(enlisted).map_err(MoveError::Catalog)?;
351        }
352
353        // Capture the CAS *after* any replica enlistment so the cutover names the
354        // current catalog version.
355        let current = catalog
356            .range(&collection, range_id)
357            .expect("range present immediately after enlist");
358        Ok(Self {
359            collection,
360            range_id,
361            source,
362            target,
363            expected_epoch: current.epoch(),
364            expected_version: current.version(),
365            phase: MovePhase::CopyingSnapshot,
366            snapshot_watermark: None,
367            position: None,
368        })
369    }
370
371    pub fn phase(&self) -> MovePhase {
372        self.phase
373    }
374
375    pub fn source(&self) -> &NodeIdentity {
376        &self.source
377    }
378
379    pub fn target(&self) -> &NodeIdentity {
380        &self.target
381    }
382
383    /// The consistent point the physical snapshot was taken at, once installed.
384    pub fn snapshot_watermark(&self) -> Option<CommitWatermark> {
385        self.snapshot_watermark
386    }
387
388    /// The target's catch-up position over the range-indexed WAL, once catch-up
389    /// has begun.
390    pub fn position(&self) -> Option<RangeStreamPosition> {
391        self.position
392    }
393
394    /// Record that the target has installed a consistent physical snapshot taken
395    /// at `at`. Moves the move into [`CatchingUp`](MovePhase::CatchingUp) and
396    /// seeds the catch-up position from the snapshot point: the target has applied
397    /// everything up to `at` and will accept range records ahead of it, fencing
398    /// any stamped below the range's current ownership epoch.
399    ///
400    /// Only valid while copying the snapshot.
401    pub fn complete_snapshot(&mut self, at: CommitWatermark) -> Result<(), MoveError> {
402        self.expect_phase(MovePhase::CopyingSnapshot)?;
403        self.snapshot_watermark = Some(at);
404        self.position = Some(RangeStreamPosition::new(
405            self.range_id.value(),
406            at.lsn,
407            at.term,
408            self.expected_epoch.value(),
409        ));
410        self.phase = MovePhase::CatchingUp;
411        Ok(())
412    }
413
414    /// Replay a slice of the shared logical stream into the target's range-indexed
415    /// catch-up, advancing its applied position past every record stamped for this
416    /// range (issue #992). Returns the [`RangeCatchupPlan`] so the caller can see
417    /// which records applied and which were fenced. Only valid while catching up.
418    pub fn record_catch_up(
419        &mut self,
420        records: &[ChangeRecord],
421    ) -> Result<RangeCatchupPlan, MoveError> {
422        self.expect_phase(MovePhase::CatchingUp)?;
423        let position = self
424            .position
425            .as_mut()
426            .expect("catch-up position present while catching up");
427        let plan = plan_range_catchup(position, records);
428        *position = plan.resume;
429        Ok(plan)
430    }
431
432    /// The catch-up evidence the cutover will present for the target: the highest
433    /// `(term, lsn)` it has applied for the range. `None` before a snapshot is
434    /// installed.
435    pub fn catch_up_evidence(&self) -> Option<CatchUpEvidence> {
436        self.position.map(|position| {
437            CatchUpEvidence::new(
438                self.target.clone(),
439                position.accepted_term,
440                position.applied_lsn,
441            )
442        })
443    }
444
445    /// Whether the target's applied log covers `live` — the live range commit
446    /// watermark, which has advanced past the snapshot point as the old owner kept
447    /// writing. The cutover may only proceed once this holds.
448    pub fn has_caught_up(&self, live: CommitWatermark) -> bool {
449        self.catch_up_evidence()
450            .map(|evidence| evidence.covers(live))
451            .unwrap_or(false)
452    }
453
454    /// Cut over: move the catalog epoch to the target through the fenced
455    /// [`Handoff`](TransitionKind::Handoff) transition, demoting the old owner to a
456    /// replica. The move must be [`CatchingUp`](MovePhase::CatchingUp) and the
457    /// target must cover `live` — otherwise this returns
458    /// [`TargetBehindWatermark`](MoveError::TargetBehindWatermark) **without
459    /// touching the catalog**, so a target that has not caught up is never
460    /// promoted and the old owner keeps serving.
461    ///
462    /// On success the catalog names the target at a new epoch (fencing the old
463    /// owner's stale-epoch writes) and the move is [`Completed`](MovePhase::Completed).
464    pub fn cut_over(
465        &mut self,
466        catalog: &mut ShardOwnershipCatalog,
467        live: CommitWatermark,
468    ) -> Result<TransitionOutcome, MoveError> {
469        self.expect_phase(MovePhase::CatchingUp)?;
470        let evidence = self
471            .catch_up_evidence()
472            .expect("catch-up evidence present while catching up");
473        if !evidence.covers(live) {
474            return Err(MoveError::TargetBehindWatermark {
475                collection: self.collection.clone(),
476                range_id: self.range_id,
477                target: self.target.clone(),
478                watermark: live,
479                applied_term: evidence.applied_term,
480                applied_lsn: evidence.applied_lsn,
481            });
482        }
483
484        let outcome = attempt_handoff(
485            catalog,
486            &self.collection,
487            self.range_id,
488            &self.source,
489            self.expected_epoch,
490            self.expected_version,
491            &self.target,
492            evidence,
493            live,
494        )?;
495        self.phase = MovePhase::Completed;
496        Ok(outcome)
497    }
498
499    /// Abandon the move. The catalog is untouched (the old owner remains owner);
500    /// the target keeps whatever copy it has but is never promoted.
501    pub fn abort(&mut self) {
502        self.phase = MovePhase::Aborted;
503    }
504
505    fn expect_phase(&self, expected: MovePhase) -> Result<(), MoveError> {
506        if self.phase == expected {
507            Ok(())
508        } else {
509            Err(MoveError::WrongPhase {
510                expected: expected.label(),
511                actual: self.phase,
512            })
513        }
514    }
515}
516
517/// Resume an interrupted move and decide its fate from the target's persisted
518/// catch-up position alone — the recovery path after a supervisor restart or a
519/// crash mid-move.
520///
521/// Promotes `target` through the fenced handoff **only if** its applied position
522/// covers `live` (the range commit watermark); otherwise it leaves the catalog
523/// untouched so the old owner keeps authority. This is the interrupted-move
524/// safety rule: a half-copied target is never promoted, so no committed write is
525/// lost when a move is cut short.
526pub fn recover_interrupted_move(
527    catalog: &mut ShardOwnershipCatalog,
528    collection: &CollectionId,
529    range_id: RangeId,
530    target: &NodeIdentity,
531    target_position: RangeStreamPosition,
532    live: CommitWatermark,
533) -> Result<MoveRecovery, MoveError> {
534    let current = catalog
535        .range(collection, range_id)
536        .ok_or_else(|| MoveError::UnknownRange {
537            collection: collection.clone(),
538            range_id,
539        })?;
540    let source = current.owner().clone();
541    let expected_epoch = current.epoch();
542    let expected_version = current.version();
543
544    let evidence = CatchUpEvidence::new(
545        target.clone(),
546        target_position.accepted_term,
547        target_position.applied_lsn,
548    );
549
550    // The interrupted-move safety gate: promote only a target that covers the
551    // range commit watermark. A target behind it is abandoned and the source
552    // retains authority — the catalog is not touched.
553    if !evidence.covers(live) {
554        return Ok(MoveRecovery::AbortedSourceRetained {
555            applied_term: evidence.applied_term,
556            applied_lsn: evidence.applied_lsn,
557            watermark: live,
558        });
559    }
560
561    let outcome = attempt_handoff(
562        catalog,
563        collection,
564        range_id,
565        &source,
566        expected_epoch,
567        expected_version,
568        target,
569        evidence,
570        live,
571    )?;
572    Ok(MoveRecovery::Promoted(outcome))
573}
574
575/// The outcome of recovering an interrupted move.
576#[derive(Debug, Clone, PartialEq, Eq)]
577pub enum MoveRecovery {
578    /// The target covered the watermark and was promoted through a fenced
579    /// handoff; the old owner is now fenced.
580    Promoted(TransitionOutcome),
581    /// The target did not cover the watermark; the move was abandoned and the
582    /// source retains authority. Carries the target's applied position and the
583    /// watermark it fell short of.
584    AbortedSourceRetained {
585        applied_term: u64,
586        applied_lsn: u64,
587        watermark: CommitWatermark,
588    },
589}
590
591impl MoveRecovery {
592    /// Whether recovery promoted the target. False when the move was abandoned.
593    pub fn promoted(&self) -> bool {
594        matches!(self, MoveRecovery::Promoted(_))
595    }
596}
597
598/// Build and run the fenced [`Handoff`](TransitionKind::Handoff) that completes a
599/// move: the target takes ownership and the old owner is demoted to a replica and
600/// fenced by the epoch bump. Shared by the normal cutover and interrupted-move
601/// recovery so both run the identical safety gate.
602#[allow(clippy::too_many_arguments)]
603fn attempt_handoff(
604    catalog: &mut ShardOwnershipCatalog,
605    collection: &CollectionId,
606    range_id: RangeId,
607    source: &NodeIdentity,
608    expected_epoch: OwnershipEpoch,
609    expected_version: CatalogVersion,
610    target: &NodeIdentity,
611    evidence: CatchUpEvidence,
612    watermark: CommitWatermark,
613) -> Result<TransitionOutcome, MoveError> {
614    let request = TransitionRequest::new(
615        TransitionKind::Handoff,
616        collection.clone(),
617        range_id,
618        source.clone(),
619        expected_epoch,
620        expected_version,
621        target.clone(),
622        watermark,
623    )
624    .with_evidence(evidence)
625    // Demote the old owner to a replica of the range after cutover.
626    .with_replicas([source.clone()]);
627    run_transition(catalog, &request).map_err(MoveError::Transition)
628}
629
630/// Why a move-range step failed. Every variant that can be returned before the
631/// fenced handoff leaves the catalog untouched.
632#[derive(Debug, Clone, PartialEq, Eq)]
633pub enum MoveError {
634    /// No range with this `(collection, range_id)` exists in the catalog.
635    UnknownRange {
636        collection: CollectionId,
637        range_id: RangeId,
638    },
639    /// The move target is already the range's owner — a no-op move.
640    TargetIsOwner {
641        collection: CollectionId,
642        range_id: RangeId,
643        owner: NodeIdentity,
644    },
645    /// A move step was attempted from the wrong phase (e.g. cutting over before a
646    /// snapshot was installed).
647    WrongPhase {
648        expected: &'static str,
649        actual: MovePhase,
650    },
651    /// Cutover was attempted but the target's applied log does not yet cover the
652    /// live commit watermark — refused, the catalog untouched.
653    TargetBehindWatermark {
654        collection: CollectionId,
655        range_id: RangeId,
656        target: NodeIdentity,
657        watermark: CommitWatermark,
658        applied_term: u64,
659        applied_lsn: u64,
660    },
661    /// A catalog write (replica enlistment) was rejected.
662    Catalog(CatalogError),
663    /// The fenced handoff transition was rejected (a CAS or safety failure) or the
664    /// activation write failed.
665    Transition(TransitionError),
666}
667
668impl std::fmt::Display for MoveError {
669    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
670        match self {
671            Self::UnknownRange {
672                collection,
673                range_id,
674            } => write!(f, "no range {collection}/{range_id} to move"),
675            Self::TargetIsOwner {
676                collection,
677                range_id,
678                owner,
679            } => write!(
680                f,
681                "move target {owner} is already the owner of {collection}/{range_id}"
682            ),
683            Self::WrongPhase { expected, actual } => write!(
684                f,
685                "move-range step expected phase {expected} but the move is {}",
686                actual.label()
687            ),
688            Self::TargetBehindWatermark {
689                collection,
690                range_id,
691                target,
692                watermark,
693                applied_term,
694                applied_lsn,
695            } => write!(
696                f,
697                "cannot cut over {collection}/{range_id} to {target}: applied term {applied_term} lsn {applied_lsn} is behind the commit watermark term {} lsn {}",
698                watermark.term, watermark.lsn
699            ),
700            Self::Catalog(err) => write!(f, "{err}"),
701            Self::Transition(err) => write!(f, "{err}"),
702        }
703    }
704}
705
706impl std::error::Error for MoveError {}
707
708#[cfg(test)]
709mod tests {
710    use super::*;
711    use crate::cluster::ownership::{
712        PlacementMetadata, RangeBound, RangeBounds, RangeRole, RangeWriteReject, ShardKeyMode,
713    };
714    use crate::replication::cdc::ChangeOperation;
715
716    fn collection(name: &str) -> CollectionId {
717        CollectionId::new(name).unwrap()
718    }
719
720    fn ident(cn: &str) -> NodeIdentity {
721        NodeIdentity::from_certificate_subject(cn).unwrap()
722    }
723
724    /// A single full-keyspace ordered range owned by `owner` with `replicas`, so
725    /// concrete split keys land inside the range.
726    fn catalog_with(owner: &str, replicas: &[&str]) -> (ShardOwnershipCatalog, CollectionId) {
727        let orders = collection("orders");
728        let mut catalog = ShardOwnershipCatalog::new();
729        catalog
730            .apply_update(RangeOwnership::establish(
731                orders.clone(),
732                RangeId::new(1),
733                ShardKeyMode::Ordered,
734                RangeBounds::full(),
735                ident(owner),
736                replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
737                PlacementMetadata::with_replication_factor(3),
738            ))
739            .unwrap();
740        (catalog, orders)
741    }
742
743    /// A range-indexed WAL record for `range_id` at `(term, lsn)` carrying
744    /// ownership `epoch` — the catch-up feed a move-range target replays.
745    fn record(range_id: u64, term: u64, lsn: u64, epoch: u64) -> ChangeRecord {
746        ChangeRecord {
747            term,
748            lsn,
749            timestamp: 1,
750            operation: ChangeOperation::Insert,
751            collection: "orders".to_string(),
752            entity_id: lsn,
753            entity_kind: "row".to_string(),
754            entity_bytes: Some(vec![1]),
755            metadata: None,
756            refresh_records: None,
757            range_id: Some(range_id),
758            ownership_epoch: Some(epoch),
759        }
760    }
761
762    // --- criterion 1: whole vs split classification ----------------------
763
764    #[test]
765    fn small_cool_range_moves_whole_large_or_hot_range_splits() {
766        let policy = SplitPolicy {
767            max_whole_move_bytes: 1_000,
768            hot_traffic_threshold: 500,
769        };
770        // Small and cool -> whole.
771        assert_eq!(
772            classify_move(RangeLoad::idle(900), &policy),
773            MoveKind::Whole
774        );
775        // Large on disk -> split.
776        assert_eq!(
777            classify_move(RangeLoad::idle(1_001), &policy),
778            MoveKind::Split
779        );
780        // Small but hot (traffic at threshold) -> split.
781        assert_eq!(
782            classify_move(
783                RangeLoad {
784                    bytes_used: 10,
785                    read_ops: 300,
786                    write_ops: 200,
787                },
788                &policy
789            ),
790            MoveKind::Split
791        );
792        // Small and just under the hot threshold -> whole.
793        assert_eq!(
794            classify_move(
795                RangeLoad {
796                    bytes_used: 10,
797                    read_ops: 250,
798                    write_ops: 249,
799                },
800                &policy
801            ),
802            MoveKind::Whole
803        );
804    }
805
806    // --- range split arithmetic ------------------------------------------
807
808    #[test]
809    fn split_tiles_the_keyspace_with_no_gap_or_overlap() {
810        let (catalog, orders) = catalog_with("CN=node-a", &[]);
811        let range = catalog.range(&orders, RangeId::new(1)).unwrap();
812        let split = split_range(
813            range,
814            b"m",
815            SplitSide::Upper,
816            RangeId::new(2),
817            ident("CN=node-b"),
818        )
819        .expect("split ok");
820
821        // Retained keeps id 1, narrowed to [Min, "m"); moved is id 2 over ["m", Max).
822        assert_eq!(split.retained().range_id(), RangeId::new(1));
823        assert_eq!(split.retained().bounds().lower(), &RangeBound::Min);
824        assert_eq!(
825            split.retained().bounds().upper(),
826            &RangeBound::key(b"m".to_vec())
827        );
828        assert_eq!(split.moved().range_id(), RangeId::new(2));
829        assert_eq!(
830            split.moved().bounds().lower(),
831            &RangeBound::key(b"m".to_vec())
832        );
833        assert_eq!(split.moved().bounds().upper(), &RangeBound::Max);
834
835        // Both children stay with the original owner; the target is a replica of
836        // the moved child only.
837        assert_eq!(split.retained().owner(), &ident("CN=node-a"));
838        assert_eq!(split.moved().owner(), &ident("CN=node-a"));
839        assert_eq!(
840            split.moved().role_of(&ident("CN=node-b")),
841            RangeRole::Replica
842        );
843
844        // The retained child's epoch is unchanged (no authority moved); only the
845        // version advanced.
846        assert_eq!(split.retained().epoch(), range.epoch());
847        assert!(split.retained().version() > range.version());
848    }
849
850    #[test]
851    fn split_rejects_an_out_of_range_key_and_an_id_collision() {
852        let (catalog, orders) = catalog_with("CN=node-a", &[]);
853        let range = catalog.range(&orders, RangeId::new(1)).unwrap();
854        // Reusing the original id is a collision.
855        assert!(matches!(
856            split_range(
857                range,
858                b"m",
859                SplitSide::Upper,
860                RangeId::new(1),
861                ident("CN=node-b")
862            ),
863            Err(SplitError::MovedIdCollision { .. })
864        ));
865
866        // A bounded range cannot be split at or outside its bounds.
867        let bounded = RangeOwnership::establish(
868            orders.clone(),
869            RangeId::new(5),
870            ShardKeyMode::Ordered,
871            RangeBounds::new(
872                RangeBound::key(b"d".to_vec()),
873                RangeBound::key(b"h".to_vec()),
874            )
875            .unwrap(),
876            ident("CN=node-a"),
877            Vec::<NodeIdentity>::new(),
878            PlacementMetadata::with_replication_factor(1),
879        );
880        assert!(matches!(
881            split_range(
882                &bounded,
883                b"z",
884                SplitSide::Upper,
885                RangeId::new(6),
886                ident("CN=node-b")
887            ),
888            Err(SplitError::Bounds(_))
889        ));
890    }
891
892    #[test]
893    fn applying_a_split_installs_two_non_overlapping_ranges() {
894        let (mut catalog, orders) = catalog_with("CN=node-a", &[]);
895        let range = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
896        let split = split_range(
897            &range,
898            b"m",
899            SplitSide::Upper,
900            RangeId::new(2),
901            ident("CN=node-b"),
902        )
903        .unwrap();
904        split.apply(&mut catalog).expect("split applies cleanly");
905
906        assert_eq!(catalog.range_count(), 2);
907        // Routing now resolves either side to exactly one range.
908        assert_eq!(
909            catalog.route(&orders, b"a").unwrap().range_id(),
910            RangeId::new(1)
911        );
912        assert_eq!(
913            catalog.route(&orders, b"z").unwrap().range_id(),
914            RangeId::new(2)
915        );
916    }
917
918    // --- criterion 2 + 3 + 4: snapshot, catch-up, fenced cutover ---------
919
920    #[test]
921    fn whole_range_move_copies_snapshot_catches_up_then_cuts_over() {
922        let (mut catalog, orders) = catalog_with("CN=node-a", &[]);
923        let mut mv = MoveRange::begin(
924            &mut catalog,
925            orders.clone(),
926            RangeId::new(1),
927            ident("CN=node-b"),
928        )
929        .expect("begin ok");
930        assert_eq!(mv.phase(), MovePhase::CopyingSnapshot);
931
932        // Criterion 3: while copying, the catalog still names node-a, which keeps
933        // serving public writes.
934        let serving_epoch = catalog.range(&orders, RangeId::new(1)).unwrap().epoch();
935        assert!(catalog
936            .admit_public_write(&ident("CN=node-a"), &orders, b"k", serving_epoch)
937            .is_ok());
938        // Criterion 4: the target is only a replica, so its writes are rejected.
939        let err = catalog
940            .admit_public_write(&ident("CN=node-b"), &orders, b"k", serving_epoch)
941            .unwrap_err();
942        assert!(matches!(err, RangeWriteReject::NotOwner { .. }));
943
944        // Criterion 2: snapshot taken at a consistent point, then range-indexed
945        // WAL catch-up closes the gap to the live watermark.
946        mv.complete_snapshot(CommitWatermark::new(1, 100)).unwrap();
947        assert_eq!(mv.phase(), MovePhase::CatchingUp);
948        // The old owner kept writing: live watermark is now term 1 lsn 130.
949        let plan = mv
950            .record_catch_up(&[
951                record(1, 1, 110, 1),
952                record(1, 1, 120, 1),
953                record(1, 1, 130, 1),
954            ])
955            .unwrap();
956        assert_eq!(plan.apply_count(), 3);
957        assert!(mv.has_caught_up(CommitWatermark::new(1, 130)));
958
959        let outcome = mv
960            .cut_over(&mut catalog, CommitWatermark::new(1, 130))
961            .unwrap();
962        assert_eq!(mv.phase(), MovePhase::Completed);
963        assert_eq!(outcome.kind, TransitionKind::Handoff);
964        assert!(outcome.fenced_old_owner());
965
966        // Criterion 3 (after cutover): node-a is fenced — demoted to a replica at
967        // the stale epoch, its public write is rejected.
968        let err = catalog
969            .admit_public_write(&ident("CN=node-a"), &orders, b"k", serving_epoch)
970            .unwrap_err();
971        assert!(matches!(
972            err,
973            RangeWriteReject::NotOwner { .. } | RangeWriteReject::StaleEpoch { .. }
974        ));
975        // Criterion 4 (after cutover): the target is now the owner and is admitted
976        // at the new epoch.
977        let new_epoch = catalog.range(&orders, RangeId::new(1)).unwrap().epoch();
978        assert!(catalog
979            .admit_public_write(&ident("CN=node-b"), &orders, b"k", new_epoch)
980            .is_ok());
981    }
982
983    // --- criterion 4: cutover refused before catch-up --------------------
984
985    #[test]
986    fn cutover_before_catch_up_is_refused_and_leaves_catalog_untouched() {
987        let (mut catalog, orders) = catalog_with("CN=node-a", &[]);
988        let mut mv = MoveRange::begin(
989            &mut catalog,
990            orders.clone(),
991            RangeId::new(1),
992            ident("CN=node-b"),
993        )
994        .unwrap();
995        mv.complete_snapshot(CommitWatermark::new(1, 100)).unwrap();
996        // Only caught up to lsn 110, but the live watermark is lsn 200.
997        mv.record_catch_up(&[record(1, 1, 110, 1)]).unwrap();
998        assert!(!mv.has_caught_up(CommitWatermark::new(1, 200)));
999
1000        let err = mv
1001            .cut_over(&mut catalog, CommitWatermark::new(1, 200))
1002            .unwrap_err();
1003        assert!(matches!(err, MoveError::TargetBehindWatermark { .. }));
1004        // The move did not advance and node-a is still the owner.
1005        assert_eq!(mv.phase(), MovePhase::CatchingUp);
1006        assert_eq!(
1007            catalog.range(&orders, RangeId::new(1)).unwrap().owner(),
1008            &ident("CN=node-a")
1009        );
1010    }
1011
1012    // --- split-and-move end to end ---------------------------------------
1013
1014    #[test]
1015    fn split_and_move_relocates_only_the_subrange() {
1016        let (mut catalog, orders) = catalog_with("CN=node-a", &[]);
1017        // Split the hot/large range and move only the upper subrange to node-b.
1018        let range = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
1019        let split = split_range(
1020            &range,
1021            b"m",
1022            SplitSide::Upper,
1023            RangeId::new(2),
1024            ident("CN=node-b"),
1025        )
1026        .unwrap();
1027        split.apply(&mut catalog).unwrap();
1028
1029        // Move the carved-off subrange (id 2) to node-b.
1030        let mut mv = MoveRange::begin(
1031            &mut catalog,
1032            orders.clone(),
1033            RangeId::new(2),
1034            ident("CN=node-b"),
1035        )
1036        .unwrap();
1037        mv.complete_snapshot(CommitWatermark::new(1, 10)).unwrap();
1038        mv.record_catch_up(&[record(2, 1, 20, 1)]).unwrap();
1039        mv.cut_over(&mut catalog, CommitWatermark::new(1, 20))
1040            .unwrap();
1041
1042        // The moved subrange is now owned by node-b; the retained subrange stays
1043        // with node-a, untouched by the move.
1044        assert_eq!(
1045            catalog.range(&orders, RangeId::new(2)).unwrap().owner(),
1046            &ident("CN=node-b")
1047        );
1048        assert_eq!(
1049            catalog.range(&orders, RangeId::new(1)).unwrap().owner(),
1050            &ident("CN=node-a")
1051        );
1052        // node-a still owns the lower keys; node-b owns the upper keys.
1053        assert_eq!(
1054            catalog.route(&orders, b"a").unwrap().owner(),
1055            &ident("CN=node-a")
1056        );
1057        assert_eq!(
1058            catalog.route(&orders, b"z").unwrap().owner(),
1059            &ident("CN=node-b")
1060        );
1061    }
1062
1063    // --- criterion 2: catch-up only consumes this range's records --------
1064
1065    #[test]
1066    fn catch_up_ignores_other_ranges_and_fences_stale_epoch_records() {
1067        let (mut catalog, orders) = catalog_with("CN=node-a", &[]);
1068        let mut mv = MoveRange::begin(
1069            &mut catalog,
1070            orders.clone(),
1071            RangeId::new(1),
1072            ident("CN=node-b"),
1073        )
1074        .unwrap();
1075        mv.complete_snapshot(CommitWatermark::new(1, 100)).unwrap();
1076
1077        // A shared WAL slice: a record for another range, a stale-epoch record
1078        // from a deposed owner, and two genuine records for this range.
1079        let plan = mv
1080            .record_catch_up(&[
1081                record(99, 1, 105, 1), // other range — skipped
1082                record(1, 1, 110, 0),  // stale ownership epoch (0 < 1) — fenced
1083                record(1, 1, 120, 1),  // applied
1084                record(1, 1, 130, 1),  // applied
1085            ])
1086            .unwrap();
1087        assert_eq!(plan.apply_count(), 2);
1088        assert_eq!(plan.rejected.len(), 1);
1089        // Only this range's genuine records advanced the position.
1090        assert_eq!(mv.position().unwrap().applied_lsn, 130);
1091        assert!(mv.has_caught_up(CommitWatermark::new(1, 130)));
1092    }
1093
1094    // --- criterion 5: interrupted-move recovery safety -------------------
1095
1096    #[test]
1097    fn interrupted_move_promotes_a_caught_up_target() {
1098        let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
1099        // The supervisor died mid-move; node-b's persisted position covers the
1100        // live watermark (term 1 lsn 50).
1101        let position = RangeStreamPosition::new(RangeId::new(1).value(), 50, 1, 1);
1102        let recovery = recover_interrupted_move(
1103            &mut catalog,
1104            &orders,
1105            RangeId::new(1),
1106            &ident("CN=node-b"),
1107            position,
1108            CommitWatermark::new(1, 50),
1109        )
1110        .unwrap();
1111        assert!(recovery.promoted());
1112        assert_eq!(
1113            catalog.range(&orders, RangeId::new(1)).unwrap().owner(),
1114            &ident("CN=node-b")
1115        );
1116    }
1117
1118    #[test]
1119    fn interrupted_move_abandons_a_target_behind_the_watermark() {
1120        let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
1121        // node-b only applied through lsn 40 but the watermark is lsn 50 — it must
1122        // not be promoted.
1123        let position = RangeStreamPosition::new(RangeId::new(1).value(), 40, 1, 1);
1124        let recovery = recover_interrupted_move(
1125            &mut catalog,
1126            &orders,
1127            RangeId::new(1),
1128            &ident("CN=node-b"),
1129            position,
1130            CommitWatermark::new(1, 50),
1131        )
1132        .unwrap();
1133        assert!(!recovery.promoted());
1134        assert!(matches!(
1135            recovery,
1136            MoveRecovery::AbortedSourceRetained {
1137                applied_lsn: 40,
1138                ..
1139            }
1140        ));
1141        // The source kept authority; the catalog is unchanged.
1142        assert_eq!(
1143            catalog.range(&orders, RangeId::new(1)).unwrap().owner(),
1144            &ident("CN=node-a")
1145        );
1146        assert_eq!(
1147            catalog.range(&orders, RangeId::new(1)).unwrap().epoch(),
1148            OwnershipEpoch::initial()
1149        );
1150    }
1151
1152    #[test]
1153    fn move_to_the_incumbent_owner_is_rejected() {
1154        let (mut catalog, orders) = catalog_with("CN=node-a", &[]);
1155        let err = MoveRange::begin(&mut catalog, orders, RangeId::new(1), ident("CN=node-a"))
1156            .unwrap_err();
1157        assert!(matches!(err, MoveError::TargetIsOwner { .. }));
1158    }
1159
1160    #[test]
1161    fn begin_enlists_the_target_as_a_replica() {
1162        let (mut catalog, orders) = catalog_with("CN=node-a", &[]);
1163        let mv = MoveRange::begin(
1164            &mut catalog,
1165            orders.clone(),
1166            RangeId::new(1),
1167            ident("CN=node-b"),
1168        )
1169        .unwrap();
1170        assert_eq!(mv.source(), &ident("CN=node-a"));
1171        // node-b is now a replica of the range — a valid handoff candidate.
1172        assert_eq!(
1173            catalog
1174                .range(&orders, RangeId::new(1))
1175                .unwrap()
1176                .role_of(&ident("CN=node-b")),
1177            RangeRole::Replica
1178        );
1179    }
1180}