use crate::replication::cdc::{
plan_range_catchup, ChangeRecord, RangeCatchupPlan, RangeStreamPosition,
};
use super::identity::NodeIdentity;
use super::ownership::{
CatalogError, CatalogVersion, CollectionId, OwnershipEpoch, RangeBoundsError, RangeId,
RangeOwnership, ShardOwnershipCatalog,
};
use super::ownership_transition::{
run_transition, CatchUpEvidence, CommitWatermark, TransitionError, TransitionKind,
TransitionOutcome, TransitionRequest,
};
use super::placement::RangeLoad;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SplitPolicy {
pub max_whole_move_bytes: u64,
pub hot_traffic_threshold: u64,
}
impl Default for SplitPolicy {
fn default() -> Self {
Self {
max_whole_move_bytes: 256 * 1024 * 1024,
hot_traffic_threshold: 10_000,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MoveKind {
Whole,
Split,
}
pub fn classify_move(load: RangeLoad, policy: &SplitPolicy) -> MoveKind {
let large = load.bytes_used > policy.max_whole_move_bytes;
let hot = load.traffic() >= policy.hot_traffic_threshold && policy.hot_traffic_threshold > 0;
if large || hot {
MoveKind::Split
} else {
MoveKind::Whole
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SplitSide {
Lower,
Upper,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RangeSplit {
retained: RangeOwnership,
moved: RangeOwnership,
}
impl RangeSplit {
pub fn retained(&self) -> &RangeOwnership {
&self.retained
}
pub fn moved(&self) -> &RangeOwnership {
&self.moved
}
pub fn apply(&self, catalog: &mut ShardOwnershipCatalog) -> Result<(), CatalogError> {
catalog.apply_update(self.retained.clone())?;
catalog.apply_update(self.moved.clone())?;
Ok(())
}
}
pub fn split_range(
range: &RangeOwnership,
split_key: &[u8],
moved_side: SplitSide,
moved_id: RangeId,
target: NodeIdentity,
) -> Result<RangeSplit, SplitError> {
if moved_id == range.range_id() {
return Err(SplitError::MovedIdCollision { id: moved_id });
}
let (lower_bounds, upper_bounds) = range
.bounds()
.split_at(split_key)
.map_err(SplitError::Bounds)?;
let (retained_bounds, moved_bounds) = match moved_side {
SplitSide::Lower => (upper_bounds, lower_bounds),
SplitSide::Upper => (lower_bounds, upper_bounds),
};
let retained = range.with_bounds(retained_bounds);
let mut replicas: Vec<NodeIdentity> = range.replicas().to_vec();
if !replicas.contains(&target) {
replicas.push(target);
}
let moved = RangeOwnership::establish(
range.collection().clone(),
moved_id,
range.shard_key_mode(),
moved_bounds,
range.owner().clone(),
replicas,
range.placement().clone(),
);
Ok(RangeSplit { retained, moved })
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SplitError {
Bounds(RangeBoundsError),
MovedIdCollision { id: RangeId },
}
impl std::fmt::Display for SplitError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Bounds(err) => write!(f, "cannot split range: {err}"),
Self::MovedIdCollision { id } => write!(
f,
"split moved subrange id {id} collides with the range being split"
),
}
}
}
impl std::error::Error for SplitError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MovePhase {
CopyingSnapshot,
CatchingUp,
Completed,
Aborted,
}
impl MovePhase {
fn label(self) -> &'static str {
match self {
MovePhase::CopyingSnapshot => "copying-snapshot",
MovePhase::CatchingUp => "catching-up",
MovePhase::Completed => "completed",
MovePhase::Aborted => "aborted",
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MoveRange {
collection: CollectionId,
range_id: RangeId,
source: NodeIdentity,
target: NodeIdentity,
expected_epoch: OwnershipEpoch,
expected_version: CatalogVersion,
phase: MovePhase,
snapshot_watermark: Option<CommitWatermark>,
position: Option<RangeStreamPosition>,
}
impl MoveRange {
pub fn begin(
catalog: &mut ShardOwnershipCatalog,
collection: CollectionId,
range_id: RangeId,
target: NodeIdentity,
) -> Result<Self, MoveError> {
let current =
catalog
.range(&collection, range_id)
.ok_or_else(|| MoveError::UnknownRange {
collection: collection.clone(),
range_id,
})?;
let source = current.owner().clone();
if target == source {
return Err(MoveError::TargetIsOwner {
collection,
range_id,
owner: source,
});
}
if !current.replicas().contains(&target) {
let mut replicas: Vec<NodeIdentity> = current.replicas().to_vec();
replicas.push(target.clone());
let enlisted = current.update_replicas(replicas);
catalog.apply_update(enlisted).map_err(MoveError::Catalog)?;
}
let current = catalog
.range(&collection, range_id)
.expect("range present immediately after enlist");
Ok(Self {
collection,
range_id,
source,
target,
expected_epoch: current.epoch(),
expected_version: current.version(),
phase: MovePhase::CopyingSnapshot,
snapshot_watermark: None,
position: None,
})
}
pub fn phase(&self) -> MovePhase {
self.phase
}
pub fn source(&self) -> &NodeIdentity {
&self.source
}
pub fn target(&self) -> &NodeIdentity {
&self.target
}
pub fn snapshot_watermark(&self) -> Option<CommitWatermark> {
self.snapshot_watermark
}
pub fn position(&self) -> Option<RangeStreamPosition> {
self.position
}
pub fn complete_snapshot(&mut self, at: CommitWatermark) -> Result<(), MoveError> {
self.expect_phase(MovePhase::CopyingSnapshot)?;
self.snapshot_watermark = Some(at);
self.position = Some(RangeStreamPosition::new(
self.range_id.value(),
at.lsn,
at.term,
self.expected_epoch.value(),
));
self.phase = MovePhase::CatchingUp;
Ok(())
}
pub fn record_catch_up(
&mut self,
records: &[ChangeRecord],
) -> Result<RangeCatchupPlan, MoveError> {
self.expect_phase(MovePhase::CatchingUp)?;
let position = self
.position
.as_mut()
.expect("catch-up position present while catching up");
let plan = plan_range_catchup(position, records);
*position = plan.resume;
Ok(plan)
}
pub fn catch_up_evidence(&self) -> Option<CatchUpEvidence> {
self.position.map(|position| {
CatchUpEvidence::new(
self.target.clone(),
position.accepted_term,
position.applied_lsn,
)
})
}
pub fn has_caught_up(&self, live: CommitWatermark) -> bool {
self.catch_up_evidence()
.map(|evidence| evidence.covers(live))
.unwrap_or(false)
}
pub fn cut_over(
&mut self,
catalog: &mut ShardOwnershipCatalog,
live: CommitWatermark,
) -> Result<TransitionOutcome, MoveError> {
self.expect_phase(MovePhase::CatchingUp)?;
let evidence = self
.catch_up_evidence()
.expect("catch-up evidence present while catching up");
if !evidence.covers(live) {
return Err(MoveError::TargetBehindWatermark {
collection: self.collection.clone(),
range_id: self.range_id,
target: self.target.clone(),
watermark: live,
applied_term: evidence.applied_term,
applied_lsn: evidence.applied_lsn,
});
}
let outcome = attempt_handoff(
catalog,
&self.collection,
self.range_id,
&self.source,
self.expected_epoch,
self.expected_version,
&self.target,
evidence,
live,
)?;
self.phase = MovePhase::Completed;
Ok(outcome)
}
pub fn abort(&mut self) {
self.phase = MovePhase::Aborted;
}
fn expect_phase(&self, expected: MovePhase) -> Result<(), MoveError> {
if self.phase == expected {
Ok(())
} else {
Err(MoveError::WrongPhase {
expected: expected.label(),
actual: self.phase,
})
}
}
}
pub fn recover_interrupted_move(
catalog: &mut ShardOwnershipCatalog,
collection: &CollectionId,
range_id: RangeId,
target: &NodeIdentity,
target_position: RangeStreamPosition,
live: CommitWatermark,
) -> Result<MoveRecovery, MoveError> {
let current = catalog
.range(collection, range_id)
.ok_or_else(|| MoveError::UnknownRange {
collection: collection.clone(),
range_id,
})?;
let source = current.owner().clone();
let expected_epoch = current.epoch();
let expected_version = current.version();
let evidence = CatchUpEvidence::new(
target.clone(),
target_position.accepted_term,
target_position.applied_lsn,
);
if !evidence.covers(live) {
return Ok(MoveRecovery::AbortedSourceRetained {
applied_term: evidence.applied_term,
applied_lsn: evidence.applied_lsn,
watermark: live,
});
}
let outcome = attempt_handoff(
catalog,
collection,
range_id,
&source,
expected_epoch,
expected_version,
target,
evidence,
live,
)?;
Ok(MoveRecovery::Promoted(outcome))
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum MoveRecovery {
Promoted(TransitionOutcome),
AbortedSourceRetained {
applied_term: u64,
applied_lsn: u64,
watermark: CommitWatermark,
},
}
impl MoveRecovery {
pub fn promoted(&self) -> bool {
matches!(self, MoveRecovery::Promoted(_))
}
}
#[allow(clippy::too_many_arguments)]
fn attempt_handoff(
catalog: &mut ShardOwnershipCatalog,
collection: &CollectionId,
range_id: RangeId,
source: &NodeIdentity,
expected_epoch: OwnershipEpoch,
expected_version: CatalogVersion,
target: &NodeIdentity,
evidence: CatchUpEvidence,
watermark: CommitWatermark,
) -> Result<TransitionOutcome, MoveError> {
let request = TransitionRequest::new(
TransitionKind::Handoff,
collection.clone(),
range_id,
source.clone(),
expected_epoch,
expected_version,
target.clone(),
watermark,
)
.with_evidence(evidence)
.with_replicas([source.clone()]);
run_transition(catalog, &request).map_err(MoveError::Transition)
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum MoveError {
UnknownRange {
collection: CollectionId,
range_id: RangeId,
},
TargetIsOwner {
collection: CollectionId,
range_id: RangeId,
owner: NodeIdentity,
},
WrongPhase {
expected: &'static str,
actual: MovePhase,
},
TargetBehindWatermark {
collection: CollectionId,
range_id: RangeId,
target: NodeIdentity,
watermark: CommitWatermark,
applied_term: u64,
applied_lsn: u64,
},
Catalog(CatalogError),
Transition(TransitionError),
}
impl std::fmt::Display for MoveError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::UnknownRange {
collection,
range_id,
} => write!(f, "no range {collection}/{range_id} to move"),
Self::TargetIsOwner {
collection,
range_id,
owner,
} => write!(
f,
"move target {owner} is already the owner of {collection}/{range_id}"
),
Self::WrongPhase { expected, actual } => write!(
f,
"move-range step expected phase {expected} but the move is {}",
actual.label()
),
Self::TargetBehindWatermark {
collection,
range_id,
target,
watermark,
applied_term,
applied_lsn,
} => write!(
f,
"cannot cut over {collection}/{range_id} to {target}: applied term {applied_term} lsn {applied_lsn} is behind the commit watermark term {} lsn {}",
watermark.term, watermark.lsn
),
Self::Catalog(err) => write!(f, "{err}"),
Self::Transition(err) => write!(f, "{err}"),
}
}
}
impl std::error::Error for MoveError {}
#[cfg(test)]
mod tests {
use super::*;
use crate::cluster::ownership::{
PlacementMetadata, RangeBound, RangeBounds, RangeRole, RangeWriteReject, ShardKeyMode,
};
use crate::replication::cdc::ChangeOperation;
fn collection(name: &str) -> CollectionId {
CollectionId::new(name).unwrap()
}
fn ident(cn: &str) -> NodeIdentity {
NodeIdentity::from_certificate_subject(cn).unwrap()
}
fn catalog_with(owner: &str, replicas: &[&str]) -> (ShardOwnershipCatalog, CollectionId) {
let orders = collection("orders");
let mut catalog = ShardOwnershipCatalog::new();
catalog
.apply_update(RangeOwnership::establish(
orders.clone(),
RangeId::new(1),
ShardKeyMode::Ordered,
RangeBounds::full(),
ident(owner),
replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
PlacementMetadata::with_replication_factor(3),
))
.unwrap();
(catalog, orders)
}
fn record(range_id: u64, term: u64, lsn: u64, epoch: u64) -> ChangeRecord {
ChangeRecord {
term,
lsn,
timestamp: 1,
operation: ChangeOperation::Insert,
collection: "orders".to_string(),
entity_id: lsn,
entity_kind: "row".to_string(),
entity_bytes: Some(vec![1]),
metadata: None,
refresh_records: None,
range_id: Some(range_id),
ownership_epoch: Some(epoch),
}
}
#[test]
fn small_cool_range_moves_whole_large_or_hot_range_splits() {
let policy = SplitPolicy {
max_whole_move_bytes: 1_000,
hot_traffic_threshold: 500,
};
assert_eq!(
classify_move(RangeLoad::idle(900), &policy),
MoveKind::Whole
);
assert_eq!(
classify_move(RangeLoad::idle(1_001), &policy),
MoveKind::Split
);
assert_eq!(
classify_move(
RangeLoad {
bytes_used: 10,
read_ops: 300,
write_ops: 200,
},
&policy
),
MoveKind::Split
);
assert_eq!(
classify_move(
RangeLoad {
bytes_used: 10,
read_ops: 250,
write_ops: 249,
},
&policy
),
MoveKind::Whole
);
}
#[test]
fn split_tiles_the_keyspace_with_no_gap_or_overlap() {
let (catalog, orders) = catalog_with("CN=node-a", &[]);
let range = catalog.range(&orders, RangeId::new(1)).unwrap();
let split = split_range(
range,
b"m",
SplitSide::Upper,
RangeId::new(2),
ident("CN=node-b"),
)
.expect("split ok");
assert_eq!(split.retained().range_id(), RangeId::new(1));
assert_eq!(split.retained().bounds().lower(), &RangeBound::Min);
assert_eq!(
split.retained().bounds().upper(),
&RangeBound::key(b"m".to_vec())
);
assert_eq!(split.moved().range_id(), RangeId::new(2));
assert_eq!(
split.moved().bounds().lower(),
&RangeBound::key(b"m".to_vec())
);
assert_eq!(split.moved().bounds().upper(), &RangeBound::Max);
assert_eq!(split.retained().owner(), &ident("CN=node-a"));
assert_eq!(split.moved().owner(), &ident("CN=node-a"));
assert_eq!(
split.moved().role_of(&ident("CN=node-b")),
RangeRole::Replica
);
assert_eq!(split.retained().epoch(), range.epoch());
assert!(split.retained().version() > range.version());
}
#[test]
fn split_rejects_an_out_of_range_key_and_an_id_collision() {
let (catalog, orders) = catalog_with("CN=node-a", &[]);
let range = catalog.range(&orders, RangeId::new(1)).unwrap();
assert!(matches!(
split_range(
range,
b"m",
SplitSide::Upper,
RangeId::new(1),
ident("CN=node-b")
),
Err(SplitError::MovedIdCollision { .. })
));
let bounded = RangeOwnership::establish(
orders.clone(),
RangeId::new(5),
ShardKeyMode::Ordered,
RangeBounds::new(
RangeBound::key(b"d".to_vec()),
RangeBound::key(b"h".to_vec()),
)
.unwrap(),
ident("CN=node-a"),
Vec::<NodeIdentity>::new(),
PlacementMetadata::with_replication_factor(1),
);
assert!(matches!(
split_range(
&bounded,
b"z",
SplitSide::Upper,
RangeId::new(6),
ident("CN=node-b")
),
Err(SplitError::Bounds(_))
));
}
#[test]
fn applying_a_split_installs_two_non_overlapping_ranges() {
let (mut catalog, orders) = catalog_with("CN=node-a", &[]);
let range = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
let split = split_range(
&range,
b"m",
SplitSide::Upper,
RangeId::new(2),
ident("CN=node-b"),
)
.unwrap();
split.apply(&mut catalog).expect("split applies cleanly");
assert_eq!(catalog.range_count(), 2);
assert_eq!(
catalog.route(&orders, b"a").unwrap().range_id(),
RangeId::new(1)
);
assert_eq!(
catalog.route(&orders, b"z").unwrap().range_id(),
RangeId::new(2)
);
}
#[test]
fn whole_range_move_copies_snapshot_catches_up_then_cuts_over() {
let (mut catalog, orders) = catalog_with("CN=node-a", &[]);
let mut mv = MoveRange::begin(
&mut catalog,
orders.clone(),
RangeId::new(1),
ident("CN=node-b"),
)
.expect("begin ok");
assert_eq!(mv.phase(), MovePhase::CopyingSnapshot);
let serving_epoch = catalog.range(&orders, RangeId::new(1)).unwrap().epoch();
assert!(catalog
.admit_public_write(&ident("CN=node-a"), &orders, b"k", serving_epoch)
.is_ok());
let err = catalog
.admit_public_write(&ident("CN=node-b"), &orders, b"k", serving_epoch)
.unwrap_err();
assert!(matches!(err, RangeWriteReject::NotOwner { .. }));
mv.complete_snapshot(CommitWatermark::new(1, 100)).unwrap();
assert_eq!(mv.phase(), MovePhase::CatchingUp);
let plan = mv
.record_catch_up(&[
record(1, 1, 110, 1),
record(1, 1, 120, 1),
record(1, 1, 130, 1),
])
.unwrap();
assert_eq!(plan.apply_count(), 3);
assert!(mv.has_caught_up(CommitWatermark::new(1, 130)));
let outcome = mv
.cut_over(&mut catalog, CommitWatermark::new(1, 130))
.unwrap();
assert_eq!(mv.phase(), MovePhase::Completed);
assert_eq!(outcome.kind, TransitionKind::Handoff);
assert!(outcome.fenced_old_owner());
let err = catalog
.admit_public_write(&ident("CN=node-a"), &orders, b"k", serving_epoch)
.unwrap_err();
assert!(matches!(
err,
RangeWriteReject::NotOwner { .. } | RangeWriteReject::StaleEpoch { .. }
));
let new_epoch = catalog.range(&orders, RangeId::new(1)).unwrap().epoch();
assert!(catalog
.admit_public_write(&ident("CN=node-b"), &orders, b"k", new_epoch)
.is_ok());
}
#[test]
fn cutover_before_catch_up_is_refused_and_leaves_catalog_untouched() {
let (mut catalog, orders) = catalog_with("CN=node-a", &[]);
let mut mv = MoveRange::begin(
&mut catalog,
orders.clone(),
RangeId::new(1),
ident("CN=node-b"),
)
.unwrap();
mv.complete_snapshot(CommitWatermark::new(1, 100)).unwrap();
mv.record_catch_up(&[record(1, 1, 110, 1)]).unwrap();
assert!(!mv.has_caught_up(CommitWatermark::new(1, 200)));
let err = mv
.cut_over(&mut catalog, CommitWatermark::new(1, 200))
.unwrap_err();
assert!(matches!(err, MoveError::TargetBehindWatermark { .. }));
assert_eq!(mv.phase(), MovePhase::CatchingUp);
assert_eq!(
catalog.range(&orders, RangeId::new(1)).unwrap().owner(),
&ident("CN=node-a")
);
}
#[test]
fn split_and_move_relocates_only_the_subrange() {
let (mut catalog, orders) = catalog_with("CN=node-a", &[]);
let range = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
let split = split_range(
&range,
b"m",
SplitSide::Upper,
RangeId::new(2),
ident("CN=node-b"),
)
.unwrap();
split.apply(&mut catalog).unwrap();
let mut mv = MoveRange::begin(
&mut catalog,
orders.clone(),
RangeId::new(2),
ident("CN=node-b"),
)
.unwrap();
mv.complete_snapshot(CommitWatermark::new(1, 10)).unwrap();
mv.record_catch_up(&[record(2, 1, 20, 1)]).unwrap();
mv.cut_over(&mut catalog, CommitWatermark::new(1, 20))
.unwrap();
assert_eq!(
catalog.range(&orders, RangeId::new(2)).unwrap().owner(),
&ident("CN=node-b")
);
assert_eq!(
catalog.range(&orders, RangeId::new(1)).unwrap().owner(),
&ident("CN=node-a")
);
assert_eq!(
catalog.route(&orders, b"a").unwrap().owner(),
&ident("CN=node-a")
);
assert_eq!(
catalog.route(&orders, b"z").unwrap().owner(),
&ident("CN=node-b")
);
}
#[test]
fn catch_up_ignores_other_ranges_and_fences_stale_epoch_records() {
let (mut catalog, orders) = catalog_with("CN=node-a", &[]);
let mut mv = MoveRange::begin(
&mut catalog,
orders.clone(),
RangeId::new(1),
ident("CN=node-b"),
)
.unwrap();
mv.complete_snapshot(CommitWatermark::new(1, 100)).unwrap();
let plan = mv
.record_catch_up(&[
record(99, 1, 105, 1), record(1, 1, 110, 0), record(1, 1, 120, 1), record(1, 1, 130, 1), ])
.unwrap();
assert_eq!(plan.apply_count(), 2);
assert_eq!(plan.rejected.len(), 1);
assert_eq!(mv.position().unwrap().applied_lsn, 130);
assert!(mv.has_caught_up(CommitWatermark::new(1, 130)));
}
#[test]
fn interrupted_move_promotes_a_caught_up_target() {
let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let position = RangeStreamPosition::new(RangeId::new(1).value(), 50, 1, 1);
let recovery = recover_interrupted_move(
&mut catalog,
&orders,
RangeId::new(1),
&ident("CN=node-b"),
position,
CommitWatermark::new(1, 50),
)
.unwrap();
assert!(recovery.promoted());
assert_eq!(
catalog.range(&orders, RangeId::new(1)).unwrap().owner(),
&ident("CN=node-b")
);
}
#[test]
fn interrupted_move_abandons_a_target_behind_the_watermark() {
let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let position = RangeStreamPosition::new(RangeId::new(1).value(), 40, 1, 1);
let recovery = recover_interrupted_move(
&mut catalog,
&orders,
RangeId::new(1),
&ident("CN=node-b"),
position,
CommitWatermark::new(1, 50),
)
.unwrap();
assert!(!recovery.promoted());
assert!(matches!(
recovery,
MoveRecovery::AbortedSourceRetained {
applied_lsn: 40,
..
}
));
assert_eq!(
catalog.range(&orders, RangeId::new(1)).unwrap().owner(),
&ident("CN=node-a")
);
assert_eq!(
catalog.range(&orders, RangeId::new(1)).unwrap().epoch(),
OwnershipEpoch::initial()
);
}
#[test]
fn move_to_the_incumbent_owner_is_rejected() {
let (mut catalog, orders) = catalog_with("CN=node-a", &[]);
let err = MoveRange::begin(&mut catalog, orders, RangeId::new(1), ident("CN=node-a"))
.unwrap_err();
assert!(matches!(err, MoveError::TargetIsOwner { .. }));
}
#[test]
fn begin_enlists_the_target_as_a_replica() {
let (mut catalog, orders) = catalog_with("CN=node-a", &[]);
let mv = MoveRange::begin(
&mut catalog,
orders.clone(),
RangeId::new(1),
ident("CN=node-b"),
)
.unwrap();
assert_eq!(mv.source(), &ident("CN=node-a"));
assert_eq!(
catalog
.range(&orders, RangeId::new(1))
.unwrap()
.role_of(&ident("CN=node-b")),
RangeRole::Replica
);
}
}