reddb_server/cluster/move_range.rs
1//! Split-and-move planning and the move-range cutover state machine
2//! (issue #1004, PRD #987, ADR 0037).
3//!
4//! The [`WeightedPlacementPlanner`](super::placement::WeightedPlacementPlanner)
5//! decides *that* a range should move; this module decides *how* it moves and
6//! drives the move safely to completion. It is the glossary's **split-and-move**
7//! — *"rebalancing transition that first divides a large or hot shard/range, then
8//! moves only the selected subrange to a different writer. Small ranges may move
9//! whole without splitting"* — riding the glossary's **move range cutover** —
10//! *"the old owner continues serving writes while the target first copies a
11//! physical checkpoint/snapshot of the range directory, then catches up through
12//! the logical range-indexed stream; only after catch-up does the catalog epoch
13//! move write authority to the target."*
14//!
15//! ## Whole-range vs split-and-move
16//!
17//! [`classify_move`] is the small/large-or-hot decision: a range whose bytes and
18//! traffic both sit under the [`SplitPolicy`] thresholds moves whole
19//! ([`MoveKind::Whole`]); a range over either threshold is split first so the
20//! move sheds only part of the load ([`MoveKind::Split`]). [`split_range`] then
21//! carves the range at a chosen key into a retained child (the keys the owner
22//! keeps) and a moved child (a fresh range id the move hands off), tiling the
23//! original keyspace with no gap or overlap.
24//!
25//! ## The cutover, fenced and gated
26//!
27//! [`MoveRange`] is the state machine for one move. It encodes the move-range
28//! invariant directly:
29//!
30//! 1. **[`CopyingSnapshot`](MovePhase::CopyingSnapshot)** — the target copies a
31//! consistent physical snapshot of the range. Throughout, the catalog still
32//! names the old owner, so the old owner *keeps serving writes*.
33//! 2. **[`CatchingUp`](MovePhase::CatchingUp)** — the snapshot is installed at a
34//! consistent [`CommitWatermark`]; the target replays **range-indexed WAL
35//! records** (issue #992) from that point to close the gap to the live commit
36//! watermark, which keeps advancing because the old owner is still writing.
37//! 3. **[`cut_over`](MoveRange::cut_over)** — only when the target's applied log
38//! covers the live commit watermark does the fenced
39//! [`Handoff`](super::ownership_transition::TransitionKind::Handoff) transition
40//! move the catalog epoch. The epoch bump fences the old owner (its writes now
41//! carry a stale epoch and [`admit_public_write`] rejects them) and makes the
42//! target authoritative. *The target accepts no public write until this
43//! instant* — before it, the target is a replica and the ownership gate
44//! rejects it.
45//!
46//! ## Interrupted moves fail safe
47//!
48//! A move can be interrupted at any point — a supervisor restart, a crashed
49//! target. [`recover_interrupted_move`] resumes from the target's persisted
50//! catch-up position and **promotes the target only if it covers the range commit
51//! watermark**; otherwise it leaves the catalog untouched and the old owner keeps
52//! authority. A half-copied target is never promoted, so an interrupted move can
53//! lose no committed write.
54//!
55//! Everything here is a pure data model over the catalog plus the range-indexed
56//! WAL contract — no disk, no clock, no network — so the split arithmetic, the
57//! catch-up gate, the fencing, and the interrupted-move safety are all exercised
58//! deterministically.
59//!
60//! [`admit_public_write`]: super::ownership::ShardOwnershipCatalog::admit_public_write
61
62use crate::replication::cdc::{
63 plan_range_catchup, ChangeRecord, RangeCatchupPlan, RangeStreamPosition,
64};
65
66use super::identity::NodeIdentity;
67use super::ownership::{
68 CatalogError, CatalogVersion, CollectionId, OwnershipEpoch, RangeBoundsError, RangeId,
69 RangeOwnership, ShardOwnershipCatalog,
70};
71use super::ownership_transition::{
72 run_transition, CatchUpEvidence, CommitWatermark, TransitionError, TransitionKind,
73 TransitionOutcome, TransitionRequest,
74};
75use super::placement::RangeLoad;
76
77/// The thresholds that decide whether a range is small enough to move whole or
78/// must be split first.
79///
80/// The two are independent and either trips a split: a range can be small on disk
81/// yet a traffic hotspot, or quiet yet too large to copy and cut over as one
82/// unit. Splitting in either case lets the move shed only a subrange instead of
83/// relocating the whole load at once.
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85pub struct SplitPolicy {
86 /// A range strictly **above** this many bytes is "large" — too big to copy
87 /// and cut over whole, so it is split and only a subrange moves.
88 pub max_whole_move_bytes: u64,
89 /// A range serving **at or above** this much read+write traffic in the
90 /// observation window is "hot" — split so the move relocates only part of the
91 /// traffic.
92 pub hot_traffic_threshold: u64,
93}
94
95impl Default for SplitPolicy {
96 fn default() -> Self {
97 // Deliberately coarse defaults: only a genuinely large or genuinely hot
98 // range is worth the extra split step; everything else moves whole.
99 Self {
100 max_whole_move_bytes: 256 * 1024 * 1024,
101 hot_traffic_threshold: 10_000,
102 }
103 }
104}
105
106/// How a planned move should be carried out: relocate the whole range, or split
107/// it first and move only a subrange.
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
109pub enum MoveKind {
110 /// Small and cool: copy and cut over the whole range in one move.
111 Whole,
112 /// Large or hot: divide the range and move only the selected subrange.
113 Split,
114}
115
116/// Decide whether a range moves whole or is split first, from its live load and
117/// the [`SplitPolicy`]. A range over the byte ceiling **or** at/over the hot
118/// traffic threshold is split; otherwise it moves whole.
119pub fn classify_move(load: RangeLoad, policy: &SplitPolicy) -> MoveKind {
120 let large = load.bytes_used > policy.max_whole_move_bytes;
121 let hot = load.traffic() >= policy.hot_traffic_threshold && policy.hot_traffic_threshold > 0;
122 if large || hot {
123 MoveKind::Split
124 } else {
125 MoveKind::Whole
126 }
127}
128
129/// Which child of a split moves to the target.
130#[derive(Debug, Clone, Copy, PartialEq, Eq)]
131pub enum SplitSide {
132 /// The lower child `[lower, split_key)` moves; the owner retains the upper.
133 Lower,
134 /// The upper child `[split_key, upper)` moves; the owner retains the lower.
135 Upper,
136}
137
138/// The two entries a [`split_range`] produces: the child the owner keeps and the
139/// child the move will hand off.
140///
141/// Applying a split is order-sensitive — the retained child must be **narrowed
142/// first**, then the moved child created — or the create would transiently
143/// overlap the still-full original and the catalog would reject it.
144/// [`apply`](Self::apply) does this in the right order.
145#[derive(Debug, Clone, PartialEq, Eq)]
146pub struct RangeSplit {
147 retained: RangeOwnership,
148 moved: RangeOwnership,
149}
150
151impl RangeSplit {
152 /// The child the owner keeps writing — the original range id, narrowed to the
153 /// non-moved keys, version advanced but epoch unchanged (no authority moved).
154 pub fn retained(&self) -> &RangeOwnership {
155 &self.retained
156 }
157
158 /// The carved-off child the move hands off — a fresh range id, still owned by
159 /// the original owner (which keeps serving its keys until cutover) with the
160 /// move target enlisted as a replica.
161 pub fn moved(&self) -> &RangeOwnership {
162 &self.moved
163 }
164
165 /// Install the split into the catalog: narrow the retained child first, then
166 /// create the moved child. After this the two children tile the original
167 /// keyspace and the move can proceed on [`moved`](Self::moved)'s range id.
168 pub fn apply(&self, catalog: &mut ShardOwnershipCatalog) -> Result<(), CatalogError> {
169 // Narrow the retained child first so the moved child no longer overlaps a
170 // still-full original on create.
171 catalog.apply_update(self.retained.clone())?;
172 catalog.apply_update(self.moved.clone())?;
173 Ok(())
174 }
175}
176
177/// Divide `range` at `split_key` into a retained child and a moved child, with
178/// `target` enlisted as a replica of the moved child so a later
179/// [`MoveRange`] can hand authority to it.
180///
181/// `moved_id` is the fresh range id the carved-off subrange takes; it must differ
182/// from `range`'s own id. `moved_side` selects which child moves: the retained
183/// child keeps `range`'s id (narrowed in place), and the moved child is a brand
184/// new entry at epoch/version 1 — its data still lives under the owner until the
185/// move cuts over. Fails with [`SplitError`] if the split key does not fall
186/// strictly inside the range or the moved id collides with the original.
187pub fn split_range(
188 range: &RangeOwnership,
189 split_key: &[u8],
190 moved_side: SplitSide,
191 moved_id: RangeId,
192 target: NodeIdentity,
193) -> Result<RangeSplit, SplitError> {
194 if moved_id == range.range_id() {
195 return Err(SplitError::MovedIdCollision { id: moved_id });
196 }
197 let (lower_bounds, upper_bounds) = range
198 .bounds()
199 .split_at(split_key)
200 .map_err(SplitError::Bounds)?;
201 let (retained_bounds, moved_bounds) = match moved_side {
202 // The moved child is the lower part; the owner retains the upper.
203 SplitSide::Lower => (upper_bounds, lower_bounds),
204 // The moved child is the upper part; the owner retains the lower.
205 SplitSide::Upper => (lower_bounds, upper_bounds),
206 };
207
208 // The retained child keeps the original id and owner, narrowed in place.
209 let retained = range.with_bounds(retained_bounds);
210
211 // The moved child is a fresh range, still owned by the current owner (it keeps
212 // serving these keys until cutover) with the target enlisted as a replica so
213 // the handoff has a valid promotion candidate.
214 let mut replicas: Vec<NodeIdentity> = range.replicas().to_vec();
215 if !replicas.contains(&target) {
216 replicas.push(target);
217 }
218 let moved = RangeOwnership::establish(
219 range.collection().clone(),
220 moved_id,
221 range.shard_key_mode(),
222 moved_bounds,
223 range.owner().clone(),
224 replicas,
225 range.placement().clone(),
226 );
227
228 Ok(RangeSplit { retained, moved })
229}
230
231/// Why a range split could not be planned.
232#[derive(Debug, Clone, PartialEq, Eq)]
233pub enum SplitError {
234 /// The split key does not fall strictly inside the range's bounds, so one
235 /// child would be empty.
236 Bounds(RangeBoundsError),
237 /// The moved subrange was given the same range id as the range being split.
238 MovedIdCollision { id: RangeId },
239}
240
241impl std::fmt::Display for SplitError {
242 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
243 match self {
244 Self::Bounds(err) => write!(f, "cannot split range: {err}"),
245 Self::MovedIdCollision { id } => write!(
246 f,
247 "split moved subrange id {id} collides with the range being split"
248 ),
249 }
250 }
251}
252
253impl std::error::Error for SplitError {}
254
255/// Where a move-range is in its copy → catch-up → cutover lifecycle.
256#[derive(Debug, Clone, Copy, PartialEq, Eq)]
257pub enum MovePhase {
258 /// The target is copying a consistent physical snapshot of the range. The
259 /// catalog still names the old owner, which keeps serving writes.
260 CopyingSnapshot,
261 /// The snapshot is installed at a consistent watermark; the target is
262 /// replaying range-indexed WAL records to catch up to the live commit
263 /// watermark.
264 CatchingUp,
265 /// The catalog epoch has moved: the target is authoritative and the old owner
266 /// is fenced.
267 Completed,
268 /// The move was abandoned; the old owner retains authority.
269 Aborted,
270}
271
272impl MovePhase {
273 fn label(self) -> &'static str {
274 match self {
275 MovePhase::CopyingSnapshot => "copying-snapshot",
276 MovePhase::CatchingUp => "catching-up",
277 MovePhase::Completed => "completed",
278 MovePhase::Aborted => "aborted",
279 }
280 }
281}
282
283/// One in-flight move-range: the bookkeeping that carries authority for one range
284/// from its current owner to a target without losing a write or letting the
285/// target serve early.
286///
287/// Built with [`begin`](Self::begin), which enlists the target as a replica and
288/// captures the catalog CAS (owner / epoch / version) the cutover will use. The
289/// snapshot point and the target's catch-up progress are filled in as the move
290/// runs. Until [`cut_over`](Self::cut_over) succeeds the catalog is unchanged, so
291/// the old owner keeps serving and the target — a mere replica — cannot.
292#[derive(Debug, Clone, PartialEq, Eq)]
293pub struct MoveRange {
294 collection: CollectionId,
295 range_id: RangeId,
296 /// The range's current owner — the move's source, fenced at cutover.
297 source: NodeIdentity,
298 /// The move target — promoted at cutover, a replica until then.
299 target: NodeIdentity,
300 /// The catalog epoch captured at [`begin`](Self::begin) — the cutover CAS.
301 expected_epoch: OwnershipEpoch,
302 /// The catalog version captured at [`begin`](Self::begin) — the cutover CAS.
303 expected_version: CatalogVersion,
304 phase: MovePhase,
305 /// The consistent point the snapshot was taken at, once installed.
306 snapshot_watermark: Option<CommitWatermark>,
307 /// The target's range-indexed catch-up position over the shared WAL, once the
308 /// snapshot is installed and catch-up begins.
309 position: Option<RangeStreamPosition>,
310}
311
312impl MoveRange {
313 /// Start moving `(collection, range_id)` to `target`. Enlists `target` as a
314 /// replica of the range if it is not one already (so the cutover has a valid
315 /// promotion candidate), then captures the catalog CAS for the eventual
316 /// fenced handoff. The move begins in [`CopyingSnapshot`](MovePhase::CopyingSnapshot);
317 /// the catalog's *owner* is unchanged, so the old owner keeps serving writes.
318 ///
319 /// Fails if the range is unknown or `target` is already its owner (a move to
320 /// the incumbent is a no-op).
321 pub fn begin(
322 catalog: &mut ShardOwnershipCatalog,
323 collection: CollectionId,
324 range_id: RangeId,
325 target: NodeIdentity,
326 ) -> Result<Self, MoveError> {
327 let current =
328 catalog
329 .range(&collection, range_id)
330 .ok_or_else(|| MoveError::UnknownRange {
331 collection: collection.clone(),
332 range_id,
333 })?;
334 let source = current.owner().clone();
335 if target == source {
336 return Err(MoveError::TargetIsOwner {
337 collection,
338 range_id,
339 owner: source,
340 });
341 }
342
343 // Enlist the target as a replica if it is not one yet — a replica is the
344 // only valid handoff candidate. This advances the version but not the
345 // epoch (no authority moved), so the old owner is not fenced.
346 if !current.replicas().contains(&target) {
347 let mut replicas: Vec<NodeIdentity> = current.replicas().to_vec();
348 replicas.push(target.clone());
349 let enlisted = current.update_replicas(replicas);
350 catalog.apply_update(enlisted).map_err(MoveError::Catalog)?;
351 }
352
353 // Capture the CAS *after* any replica enlistment so the cutover names the
354 // current catalog version.
355 let current = catalog
356 .range(&collection, range_id)
357 .expect("range present immediately after enlist");
358 Ok(Self {
359 collection,
360 range_id,
361 source,
362 target,
363 expected_epoch: current.epoch(),
364 expected_version: current.version(),
365 phase: MovePhase::CopyingSnapshot,
366 snapshot_watermark: None,
367 position: None,
368 })
369 }
370
371 pub fn phase(&self) -> MovePhase {
372 self.phase
373 }
374
375 pub fn source(&self) -> &NodeIdentity {
376 &self.source
377 }
378
379 pub fn target(&self) -> &NodeIdentity {
380 &self.target
381 }
382
383 /// The consistent point the physical snapshot was taken at, once installed.
384 pub fn snapshot_watermark(&self) -> Option<CommitWatermark> {
385 self.snapshot_watermark
386 }
387
388 /// The target's catch-up position over the range-indexed WAL, once catch-up
389 /// has begun.
390 pub fn position(&self) -> Option<RangeStreamPosition> {
391 self.position
392 }
393
394 /// Record that the target has installed a consistent physical snapshot taken
395 /// at `at`. Moves the move into [`CatchingUp`](MovePhase::CatchingUp) and
396 /// seeds the catch-up position from the snapshot point: the target has applied
397 /// everything up to `at` and will accept range records ahead of it, fencing
398 /// any stamped below the range's current ownership epoch.
399 ///
400 /// Only valid while copying the snapshot.
401 pub fn complete_snapshot(&mut self, at: CommitWatermark) -> Result<(), MoveError> {
402 self.expect_phase(MovePhase::CopyingSnapshot)?;
403 self.snapshot_watermark = Some(at);
404 self.position = Some(RangeStreamPosition::new(
405 self.range_id.value(),
406 at.lsn,
407 at.term,
408 self.expected_epoch.value(),
409 ));
410 self.phase = MovePhase::CatchingUp;
411 Ok(())
412 }
413
414 /// Replay a slice of the shared logical stream into the target's range-indexed
415 /// catch-up, advancing its applied position past every record stamped for this
416 /// range (issue #992). Returns the [`RangeCatchupPlan`] so the caller can see
417 /// which records applied and which were fenced. Only valid while catching up.
418 pub fn record_catch_up(
419 &mut self,
420 records: &[ChangeRecord],
421 ) -> Result<RangeCatchupPlan, MoveError> {
422 self.expect_phase(MovePhase::CatchingUp)?;
423 let position = self
424 .position
425 .as_mut()
426 .expect("catch-up position present while catching up");
427 let plan = plan_range_catchup(position, records);
428 *position = plan.resume;
429 Ok(plan)
430 }
431
432 /// The catch-up evidence the cutover will present for the target: the highest
433 /// `(term, lsn)` it has applied for the range. `None` before a snapshot is
434 /// installed.
435 pub fn catch_up_evidence(&self) -> Option<CatchUpEvidence> {
436 self.position.map(|position| {
437 CatchUpEvidence::new(
438 self.target.clone(),
439 position.accepted_term,
440 position.applied_lsn,
441 )
442 })
443 }
444
445 /// Whether the target's applied log covers `live` — the live range commit
446 /// watermark, which has advanced past the snapshot point as the old owner kept
447 /// writing. The cutover may only proceed once this holds.
448 pub fn has_caught_up(&self, live: CommitWatermark) -> bool {
449 self.catch_up_evidence()
450 .map(|evidence| evidence.covers(live))
451 .unwrap_or(false)
452 }
453
454 /// Cut over: move the catalog epoch to the target through the fenced
455 /// [`Handoff`](TransitionKind::Handoff) transition, demoting the old owner to a
456 /// replica. The move must be [`CatchingUp`](MovePhase::CatchingUp) and the
457 /// target must cover `live` — otherwise this returns
458 /// [`TargetBehindWatermark`](MoveError::TargetBehindWatermark) **without
459 /// touching the catalog**, so a target that has not caught up is never
460 /// promoted and the old owner keeps serving.
461 ///
462 /// On success the catalog names the target at a new epoch (fencing the old
463 /// owner's stale-epoch writes) and the move is [`Completed`](MovePhase::Completed).
464 pub fn cut_over(
465 &mut self,
466 catalog: &mut ShardOwnershipCatalog,
467 live: CommitWatermark,
468 ) -> Result<TransitionOutcome, MoveError> {
469 self.expect_phase(MovePhase::CatchingUp)?;
470 let evidence = self
471 .catch_up_evidence()
472 .expect("catch-up evidence present while catching up");
473 if !evidence.covers(live) {
474 return Err(MoveError::TargetBehindWatermark {
475 collection: self.collection.clone(),
476 range_id: self.range_id,
477 target: self.target.clone(),
478 watermark: live,
479 applied_term: evidence.applied_term,
480 applied_lsn: evidence.applied_lsn,
481 });
482 }
483
484 let outcome = attempt_handoff(
485 catalog,
486 &self.collection,
487 self.range_id,
488 &self.source,
489 self.expected_epoch,
490 self.expected_version,
491 &self.target,
492 evidence,
493 live,
494 )?;
495 self.phase = MovePhase::Completed;
496 Ok(outcome)
497 }
498
499 /// Abandon the move. The catalog is untouched (the old owner remains owner);
500 /// the target keeps whatever copy it has but is never promoted.
501 pub fn abort(&mut self) {
502 self.phase = MovePhase::Aborted;
503 }
504
505 fn expect_phase(&self, expected: MovePhase) -> Result<(), MoveError> {
506 if self.phase == expected {
507 Ok(())
508 } else {
509 Err(MoveError::WrongPhase {
510 expected: expected.label(),
511 actual: self.phase,
512 })
513 }
514 }
515}
516
517/// Resume an interrupted move and decide its fate from the target's persisted
518/// catch-up position alone — the recovery path after a supervisor restart or a
519/// crash mid-move.
520///
521/// Promotes `target` through the fenced handoff **only if** its applied position
522/// covers `live` (the range commit watermark); otherwise it leaves the catalog
523/// untouched so the old owner keeps authority. This is the interrupted-move
524/// safety rule: a half-copied target is never promoted, so no committed write is
525/// lost when a move is cut short.
526pub fn recover_interrupted_move(
527 catalog: &mut ShardOwnershipCatalog,
528 collection: &CollectionId,
529 range_id: RangeId,
530 target: &NodeIdentity,
531 target_position: RangeStreamPosition,
532 live: CommitWatermark,
533) -> Result<MoveRecovery, MoveError> {
534 let current = catalog
535 .range(collection, range_id)
536 .ok_or_else(|| MoveError::UnknownRange {
537 collection: collection.clone(),
538 range_id,
539 })?;
540 let source = current.owner().clone();
541 let expected_epoch = current.epoch();
542 let expected_version = current.version();
543
544 let evidence = CatchUpEvidence::new(
545 target.clone(),
546 target_position.accepted_term,
547 target_position.applied_lsn,
548 );
549
550 // The interrupted-move safety gate: promote only a target that covers the
551 // range commit watermark. A target behind it is abandoned and the source
552 // retains authority — the catalog is not touched.
553 if !evidence.covers(live) {
554 return Ok(MoveRecovery::AbortedSourceRetained {
555 applied_term: evidence.applied_term,
556 applied_lsn: evidence.applied_lsn,
557 watermark: live,
558 });
559 }
560
561 let outcome = attempt_handoff(
562 catalog,
563 collection,
564 range_id,
565 &source,
566 expected_epoch,
567 expected_version,
568 target,
569 evidence,
570 live,
571 )?;
572 Ok(MoveRecovery::Promoted(outcome))
573}
574
575/// The outcome of recovering an interrupted move.
576#[derive(Debug, Clone, PartialEq, Eq)]
577pub enum MoveRecovery {
578 /// The target covered the watermark and was promoted through a fenced
579 /// handoff; the old owner is now fenced.
580 Promoted(TransitionOutcome),
581 /// The target did not cover the watermark; the move was abandoned and the
582 /// source retains authority. Carries the target's applied position and the
583 /// watermark it fell short of.
584 AbortedSourceRetained {
585 applied_term: u64,
586 applied_lsn: u64,
587 watermark: CommitWatermark,
588 },
589}
590
591impl MoveRecovery {
592 /// Whether recovery promoted the target. False when the move was abandoned.
593 pub fn promoted(&self) -> bool {
594 matches!(self, MoveRecovery::Promoted(_))
595 }
596}
597
598/// Build and run the fenced [`Handoff`](TransitionKind::Handoff) that completes a
599/// move: the target takes ownership and the old owner is demoted to a replica and
600/// fenced by the epoch bump. Shared by the normal cutover and interrupted-move
601/// recovery so both run the identical safety gate.
602#[allow(clippy::too_many_arguments)]
603fn attempt_handoff(
604 catalog: &mut ShardOwnershipCatalog,
605 collection: &CollectionId,
606 range_id: RangeId,
607 source: &NodeIdentity,
608 expected_epoch: OwnershipEpoch,
609 expected_version: CatalogVersion,
610 target: &NodeIdentity,
611 evidence: CatchUpEvidence,
612 watermark: CommitWatermark,
613) -> Result<TransitionOutcome, MoveError> {
614 let request = TransitionRequest::new(
615 TransitionKind::Handoff,
616 collection.clone(),
617 range_id,
618 source.clone(),
619 expected_epoch,
620 expected_version,
621 target.clone(),
622 watermark,
623 )
624 .with_evidence(evidence)
625 // Demote the old owner to a replica of the range after cutover.
626 .with_replicas([source.clone()]);
627 run_transition(catalog, &request).map_err(MoveError::Transition)
628}
629
630/// Why a move-range step failed. Every variant that can be returned before the
631/// fenced handoff leaves the catalog untouched.
632#[derive(Debug, Clone, PartialEq, Eq)]
633pub enum MoveError {
634 /// No range with this `(collection, range_id)` exists in the catalog.
635 UnknownRange {
636 collection: CollectionId,
637 range_id: RangeId,
638 },
639 /// The move target is already the range's owner — a no-op move.
640 TargetIsOwner {
641 collection: CollectionId,
642 range_id: RangeId,
643 owner: NodeIdentity,
644 },
645 /// A move step was attempted from the wrong phase (e.g. cutting over before a
646 /// snapshot was installed).
647 WrongPhase {
648 expected: &'static str,
649 actual: MovePhase,
650 },
651 /// Cutover was attempted but the target's applied log does not yet cover the
652 /// live commit watermark — refused, the catalog untouched.
653 TargetBehindWatermark {
654 collection: CollectionId,
655 range_id: RangeId,
656 target: NodeIdentity,
657 watermark: CommitWatermark,
658 applied_term: u64,
659 applied_lsn: u64,
660 },
661 /// A catalog write (replica enlistment) was rejected.
662 Catalog(CatalogError),
663 /// The fenced handoff transition was rejected (a CAS or safety failure) or the
664 /// activation write failed.
665 Transition(TransitionError),
666}
667
668impl std::fmt::Display for MoveError {
669 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
670 match self {
671 Self::UnknownRange {
672 collection,
673 range_id,
674 } => write!(f, "no range {collection}/{range_id} to move"),
675 Self::TargetIsOwner {
676 collection,
677 range_id,
678 owner,
679 } => write!(
680 f,
681 "move target {owner} is already the owner of {collection}/{range_id}"
682 ),
683 Self::WrongPhase { expected, actual } => write!(
684 f,
685 "move-range step expected phase {expected} but the move is {}",
686 actual.label()
687 ),
688 Self::TargetBehindWatermark {
689 collection,
690 range_id,
691 target,
692 watermark,
693 applied_term,
694 applied_lsn,
695 } => write!(
696 f,
697 "cannot cut over {collection}/{range_id} to {target}: applied term {applied_term} lsn {applied_lsn} is behind the commit watermark term {} lsn {}",
698 watermark.term, watermark.lsn
699 ),
700 Self::Catalog(err) => write!(f, "{err}"),
701 Self::Transition(err) => write!(f, "{err}"),
702 }
703 }
704}
705
706impl std::error::Error for MoveError {}
707
708#[cfg(test)]
709mod tests {
710 use super::*;
711 use crate::cluster::ownership::{
712 PlacementMetadata, RangeBound, RangeBounds, RangeRole, RangeWriteReject, ShardKeyMode,
713 };
714 use crate::replication::cdc::ChangeOperation;
715
716 fn collection(name: &str) -> CollectionId {
717 CollectionId::new(name).unwrap()
718 }
719
720 fn ident(cn: &str) -> NodeIdentity {
721 NodeIdentity::from_certificate_subject(cn).unwrap()
722 }
723
724 /// A single full-keyspace ordered range owned by `owner` with `replicas`, so
725 /// concrete split keys land inside the range.
726 fn catalog_with(owner: &str, replicas: &[&str]) -> (ShardOwnershipCatalog, CollectionId) {
727 let orders = collection("orders");
728 let mut catalog = ShardOwnershipCatalog::new();
729 catalog
730 .apply_update(RangeOwnership::establish(
731 orders.clone(),
732 RangeId::new(1),
733 ShardKeyMode::Ordered,
734 RangeBounds::full(),
735 ident(owner),
736 replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
737 PlacementMetadata::with_replication_factor(3),
738 ))
739 .unwrap();
740 (catalog, orders)
741 }
742
743 /// A range-indexed WAL record for `range_id` at `(term, lsn)` carrying
744 /// ownership `epoch` — the catch-up feed a move-range target replays.
745 fn record(range_id: u64, term: u64, lsn: u64, epoch: u64) -> ChangeRecord {
746 ChangeRecord {
747 term,
748 lsn,
749 timestamp: 1,
750 operation: ChangeOperation::Insert,
751 collection: "orders".to_string(),
752 entity_id: lsn,
753 entity_kind: "row".to_string(),
754 entity_bytes: Some(vec![1]),
755 metadata: None,
756 refresh_records: None,
757 range_id: Some(range_id),
758 ownership_epoch: Some(epoch),
759 }
760 }
761
762 // --- criterion 1: whole vs split classification ----------------------
763
764 #[test]
765 fn small_cool_range_moves_whole_large_or_hot_range_splits() {
766 let policy = SplitPolicy {
767 max_whole_move_bytes: 1_000,
768 hot_traffic_threshold: 500,
769 };
770 // Small and cool -> whole.
771 assert_eq!(
772 classify_move(RangeLoad::idle(900), &policy),
773 MoveKind::Whole
774 );
775 // Large on disk -> split.
776 assert_eq!(
777 classify_move(RangeLoad::idle(1_001), &policy),
778 MoveKind::Split
779 );
780 // Small but hot (traffic at threshold) -> split.
781 assert_eq!(
782 classify_move(
783 RangeLoad {
784 bytes_used: 10,
785 read_ops: 300,
786 write_ops: 200,
787 },
788 &policy
789 ),
790 MoveKind::Split
791 );
792 // Small and just under the hot threshold -> whole.
793 assert_eq!(
794 classify_move(
795 RangeLoad {
796 bytes_used: 10,
797 read_ops: 250,
798 write_ops: 249,
799 },
800 &policy
801 ),
802 MoveKind::Whole
803 );
804 }
805
806 // --- range split arithmetic ------------------------------------------
807
808 #[test]
809 fn split_tiles_the_keyspace_with_no_gap_or_overlap() {
810 let (catalog, orders) = catalog_with("CN=node-a", &[]);
811 let range = catalog.range(&orders, RangeId::new(1)).unwrap();
812 let split = split_range(
813 range,
814 b"m",
815 SplitSide::Upper,
816 RangeId::new(2),
817 ident("CN=node-b"),
818 )
819 .expect("split ok");
820
821 // Retained keeps id 1, narrowed to [Min, "m"); moved is id 2 over ["m", Max).
822 assert_eq!(split.retained().range_id(), RangeId::new(1));
823 assert_eq!(split.retained().bounds().lower(), &RangeBound::Min);
824 assert_eq!(
825 split.retained().bounds().upper(),
826 &RangeBound::key(b"m".to_vec())
827 );
828 assert_eq!(split.moved().range_id(), RangeId::new(2));
829 assert_eq!(
830 split.moved().bounds().lower(),
831 &RangeBound::key(b"m".to_vec())
832 );
833 assert_eq!(split.moved().bounds().upper(), &RangeBound::Max);
834
835 // Both children stay with the original owner; the target is a replica of
836 // the moved child only.
837 assert_eq!(split.retained().owner(), &ident("CN=node-a"));
838 assert_eq!(split.moved().owner(), &ident("CN=node-a"));
839 assert_eq!(
840 split.moved().role_of(&ident("CN=node-b")),
841 RangeRole::Replica
842 );
843
844 // The retained child's epoch is unchanged (no authority moved); only the
845 // version advanced.
846 assert_eq!(split.retained().epoch(), range.epoch());
847 assert!(split.retained().version() > range.version());
848 }
849
850 #[test]
851 fn split_rejects_an_out_of_range_key_and_an_id_collision() {
852 let (catalog, orders) = catalog_with("CN=node-a", &[]);
853 let range = catalog.range(&orders, RangeId::new(1)).unwrap();
854 // Reusing the original id is a collision.
855 assert!(matches!(
856 split_range(
857 range,
858 b"m",
859 SplitSide::Upper,
860 RangeId::new(1),
861 ident("CN=node-b")
862 ),
863 Err(SplitError::MovedIdCollision { .. })
864 ));
865
866 // A bounded range cannot be split at or outside its bounds.
867 let bounded = RangeOwnership::establish(
868 orders.clone(),
869 RangeId::new(5),
870 ShardKeyMode::Ordered,
871 RangeBounds::new(
872 RangeBound::key(b"d".to_vec()),
873 RangeBound::key(b"h".to_vec()),
874 )
875 .unwrap(),
876 ident("CN=node-a"),
877 Vec::<NodeIdentity>::new(),
878 PlacementMetadata::with_replication_factor(1),
879 );
880 assert!(matches!(
881 split_range(
882 &bounded,
883 b"z",
884 SplitSide::Upper,
885 RangeId::new(6),
886 ident("CN=node-b")
887 ),
888 Err(SplitError::Bounds(_))
889 ));
890 }
891
892 #[test]
893 fn applying_a_split_installs_two_non_overlapping_ranges() {
894 let (mut catalog, orders) = catalog_with("CN=node-a", &[]);
895 let range = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
896 let split = split_range(
897 &range,
898 b"m",
899 SplitSide::Upper,
900 RangeId::new(2),
901 ident("CN=node-b"),
902 )
903 .unwrap();
904 split.apply(&mut catalog).expect("split applies cleanly");
905
906 assert_eq!(catalog.range_count(), 2);
907 // Routing now resolves either side to exactly one range.
908 assert_eq!(
909 catalog.route(&orders, b"a").unwrap().range_id(),
910 RangeId::new(1)
911 );
912 assert_eq!(
913 catalog.route(&orders, b"z").unwrap().range_id(),
914 RangeId::new(2)
915 );
916 }
917
918 // --- criterion 2 + 3 + 4: snapshot, catch-up, fenced cutover ---------
919
920 #[test]
921 fn whole_range_move_copies_snapshot_catches_up_then_cuts_over() {
922 let (mut catalog, orders) = catalog_with("CN=node-a", &[]);
923 let mut mv = MoveRange::begin(
924 &mut catalog,
925 orders.clone(),
926 RangeId::new(1),
927 ident("CN=node-b"),
928 )
929 .expect("begin ok");
930 assert_eq!(mv.phase(), MovePhase::CopyingSnapshot);
931
932 // Criterion 3: while copying, the catalog still names node-a, which keeps
933 // serving public writes.
934 let serving_epoch = catalog.range(&orders, RangeId::new(1)).unwrap().epoch();
935 assert!(catalog
936 .admit_public_write(&ident("CN=node-a"), &orders, b"k", serving_epoch)
937 .is_ok());
938 // Criterion 4: the target is only a replica, so its writes are rejected.
939 let err = catalog
940 .admit_public_write(&ident("CN=node-b"), &orders, b"k", serving_epoch)
941 .unwrap_err();
942 assert!(matches!(err, RangeWriteReject::NotOwner { .. }));
943
944 // Criterion 2: snapshot taken at a consistent point, then range-indexed
945 // WAL catch-up closes the gap to the live watermark.
946 mv.complete_snapshot(CommitWatermark::new(1, 100)).unwrap();
947 assert_eq!(mv.phase(), MovePhase::CatchingUp);
948 // The old owner kept writing: live watermark is now term 1 lsn 130.
949 let plan = mv
950 .record_catch_up(&[
951 record(1, 1, 110, 1),
952 record(1, 1, 120, 1),
953 record(1, 1, 130, 1),
954 ])
955 .unwrap();
956 assert_eq!(plan.apply_count(), 3);
957 assert!(mv.has_caught_up(CommitWatermark::new(1, 130)));
958
959 let outcome = mv
960 .cut_over(&mut catalog, CommitWatermark::new(1, 130))
961 .unwrap();
962 assert_eq!(mv.phase(), MovePhase::Completed);
963 assert_eq!(outcome.kind, TransitionKind::Handoff);
964 assert!(outcome.fenced_old_owner());
965
966 // Criterion 3 (after cutover): node-a is fenced — demoted to a replica at
967 // the stale epoch, its public write is rejected.
968 let err = catalog
969 .admit_public_write(&ident("CN=node-a"), &orders, b"k", serving_epoch)
970 .unwrap_err();
971 assert!(matches!(
972 err,
973 RangeWriteReject::NotOwner { .. } | RangeWriteReject::StaleEpoch { .. }
974 ));
975 // Criterion 4 (after cutover): the target is now the owner and is admitted
976 // at the new epoch.
977 let new_epoch = catalog.range(&orders, RangeId::new(1)).unwrap().epoch();
978 assert!(catalog
979 .admit_public_write(&ident("CN=node-b"), &orders, b"k", new_epoch)
980 .is_ok());
981 }
982
983 // --- criterion 4: cutover refused before catch-up --------------------
984
985 #[test]
986 fn cutover_before_catch_up_is_refused_and_leaves_catalog_untouched() {
987 let (mut catalog, orders) = catalog_with("CN=node-a", &[]);
988 let mut mv = MoveRange::begin(
989 &mut catalog,
990 orders.clone(),
991 RangeId::new(1),
992 ident("CN=node-b"),
993 )
994 .unwrap();
995 mv.complete_snapshot(CommitWatermark::new(1, 100)).unwrap();
996 // Only caught up to lsn 110, but the live watermark is lsn 200.
997 mv.record_catch_up(&[record(1, 1, 110, 1)]).unwrap();
998 assert!(!mv.has_caught_up(CommitWatermark::new(1, 200)));
999
1000 let err = mv
1001 .cut_over(&mut catalog, CommitWatermark::new(1, 200))
1002 .unwrap_err();
1003 assert!(matches!(err, MoveError::TargetBehindWatermark { .. }));
1004 // The move did not advance and node-a is still the owner.
1005 assert_eq!(mv.phase(), MovePhase::CatchingUp);
1006 assert_eq!(
1007 catalog.range(&orders, RangeId::new(1)).unwrap().owner(),
1008 &ident("CN=node-a")
1009 );
1010 }
1011
1012 // --- split-and-move end to end ---------------------------------------
1013
1014 #[test]
1015 fn split_and_move_relocates_only_the_subrange() {
1016 let (mut catalog, orders) = catalog_with("CN=node-a", &[]);
1017 // Split the hot/large range and move only the upper subrange to node-b.
1018 let range = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
1019 let split = split_range(
1020 &range,
1021 b"m",
1022 SplitSide::Upper,
1023 RangeId::new(2),
1024 ident("CN=node-b"),
1025 )
1026 .unwrap();
1027 split.apply(&mut catalog).unwrap();
1028
1029 // Move the carved-off subrange (id 2) to node-b.
1030 let mut mv = MoveRange::begin(
1031 &mut catalog,
1032 orders.clone(),
1033 RangeId::new(2),
1034 ident("CN=node-b"),
1035 )
1036 .unwrap();
1037 mv.complete_snapshot(CommitWatermark::new(1, 10)).unwrap();
1038 mv.record_catch_up(&[record(2, 1, 20, 1)]).unwrap();
1039 mv.cut_over(&mut catalog, CommitWatermark::new(1, 20))
1040 .unwrap();
1041
1042 // The moved subrange is now owned by node-b; the retained subrange stays
1043 // with node-a, untouched by the move.
1044 assert_eq!(
1045 catalog.range(&orders, RangeId::new(2)).unwrap().owner(),
1046 &ident("CN=node-b")
1047 );
1048 assert_eq!(
1049 catalog.range(&orders, RangeId::new(1)).unwrap().owner(),
1050 &ident("CN=node-a")
1051 );
1052 // node-a still owns the lower keys; node-b owns the upper keys.
1053 assert_eq!(
1054 catalog.route(&orders, b"a").unwrap().owner(),
1055 &ident("CN=node-a")
1056 );
1057 assert_eq!(
1058 catalog.route(&orders, b"z").unwrap().owner(),
1059 &ident("CN=node-b")
1060 );
1061 }
1062
1063 // --- criterion 2: catch-up only consumes this range's records --------
1064
1065 #[test]
1066 fn catch_up_ignores_other_ranges_and_fences_stale_epoch_records() {
1067 let (mut catalog, orders) = catalog_with("CN=node-a", &[]);
1068 let mut mv = MoveRange::begin(
1069 &mut catalog,
1070 orders.clone(),
1071 RangeId::new(1),
1072 ident("CN=node-b"),
1073 )
1074 .unwrap();
1075 mv.complete_snapshot(CommitWatermark::new(1, 100)).unwrap();
1076
1077 // A shared WAL slice: a record for another range, a stale-epoch record
1078 // from a deposed owner, and two genuine records for this range.
1079 let plan = mv
1080 .record_catch_up(&[
1081 record(99, 1, 105, 1), // other range — skipped
1082 record(1, 1, 110, 0), // stale ownership epoch (0 < 1) — fenced
1083 record(1, 1, 120, 1), // applied
1084 record(1, 1, 130, 1), // applied
1085 ])
1086 .unwrap();
1087 assert_eq!(plan.apply_count(), 2);
1088 assert_eq!(plan.rejected.len(), 1);
1089 // Only this range's genuine records advanced the position.
1090 assert_eq!(mv.position().unwrap().applied_lsn, 130);
1091 assert!(mv.has_caught_up(CommitWatermark::new(1, 130)));
1092 }
1093
1094 // --- criterion 5: interrupted-move recovery safety -------------------
1095
1096 #[test]
1097 fn interrupted_move_promotes_a_caught_up_target() {
1098 let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
1099 // The supervisor died mid-move; node-b's persisted position covers the
1100 // live watermark (term 1 lsn 50).
1101 let position = RangeStreamPosition::new(RangeId::new(1).value(), 50, 1, 1);
1102 let recovery = recover_interrupted_move(
1103 &mut catalog,
1104 &orders,
1105 RangeId::new(1),
1106 &ident("CN=node-b"),
1107 position,
1108 CommitWatermark::new(1, 50),
1109 )
1110 .unwrap();
1111 assert!(recovery.promoted());
1112 assert_eq!(
1113 catalog.range(&orders, RangeId::new(1)).unwrap().owner(),
1114 &ident("CN=node-b")
1115 );
1116 }
1117
1118 #[test]
1119 fn interrupted_move_abandons_a_target_behind_the_watermark() {
1120 let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
1121 // node-b only applied through lsn 40 but the watermark is lsn 50 — it must
1122 // not be promoted.
1123 let position = RangeStreamPosition::new(RangeId::new(1).value(), 40, 1, 1);
1124 let recovery = recover_interrupted_move(
1125 &mut catalog,
1126 &orders,
1127 RangeId::new(1),
1128 &ident("CN=node-b"),
1129 position,
1130 CommitWatermark::new(1, 50),
1131 )
1132 .unwrap();
1133 assert!(!recovery.promoted());
1134 assert!(matches!(
1135 recovery,
1136 MoveRecovery::AbortedSourceRetained {
1137 applied_lsn: 40,
1138 ..
1139 }
1140 ));
1141 // The source kept authority; the catalog is unchanged.
1142 assert_eq!(
1143 catalog.range(&orders, RangeId::new(1)).unwrap().owner(),
1144 &ident("CN=node-a")
1145 );
1146 assert_eq!(
1147 catalog.range(&orders, RangeId::new(1)).unwrap().epoch(),
1148 OwnershipEpoch::initial()
1149 );
1150 }
1151
1152 #[test]
1153 fn move_to_the_incumbent_owner_is_rejected() {
1154 let (mut catalog, orders) = catalog_with("CN=node-a", &[]);
1155 let err = MoveRange::begin(&mut catalog, orders, RangeId::new(1), ident("CN=node-a"))
1156 .unwrap_err();
1157 assert!(matches!(err, MoveError::TargetIsOwner { .. }));
1158 }
1159
1160 #[test]
1161 fn begin_enlists_the_target_as_a_replica() {
1162 let (mut catalog, orders) = catalog_with("CN=node-a", &[]);
1163 let mv = MoveRange::begin(
1164 &mut catalog,
1165 orders.clone(),
1166 RangeId::new(1),
1167 ident("CN=node-b"),
1168 )
1169 .unwrap();
1170 assert_eq!(mv.source(), &ident("CN=node-a"));
1171 // node-b is now a replica of the range — a valid handoff candidate.
1172 assert_eq!(
1173 catalog
1174 .range(&orders, RangeId::new(1))
1175 .unwrap()
1176 .role_of(&ident("CN=node-b")),
1177 RangeRole::Replica
1178 );
1179 }
1180}