use super::identity::NodeIdentity;
use super::ownership::{
CatalogError, CatalogVersion, CollectionId, OwnershipEpoch, RangeId, RangeOwnership,
ShardOwnershipCatalog,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct CommitWatermark {
pub term: u64,
pub lsn: u64,
}
impl CommitWatermark {
pub fn new(term: u64, lsn: u64) -> Self {
Self { term, lsn }
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CatchUpEvidence {
pub candidate: NodeIdentity,
pub applied_term: u64,
pub applied_lsn: u64,
}
impl CatchUpEvidence {
pub fn new(candidate: NodeIdentity, applied_term: u64, applied_lsn: u64) -> Self {
Self {
candidate,
applied_term,
applied_lsn,
}
}
pub fn covers(&self, watermark: CommitWatermark) -> bool {
self.applied_term > watermark.term
|| (self.applied_term == watermark.term && self.applied_lsn >= watermark.lsn)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TransitionKind {
Promote,
Handoff,
}
impl TransitionKind {
fn label(self) -> &'static str {
match self {
TransitionKind::Promote => "promote",
TransitionKind::Handoff => "handoff",
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TransitionRequest {
kind: TransitionKind,
collection: CollectionId,
range_id: RangeId,
expected_owner: NodeIdentity,
expected_epoch: OwnershipEpoch,
expected_version: CatalogVersion,
target: NodeIdentity,
watermark: CommitWatermark,
evidence: Option<CatchUpEvidence>,
new_replicas: Vec<NodeIdentity>,
}
impl TransitionRequest {
#[allow(clippy::too_many_arguments)]
pub fn new(
kind: TransitionKind,
collection: CollectionId,
range_id: RangeId,
expected_owner: NodeIdentity,
expected_epoch: OwnershipEpoch,
expected_version: CatalogVersion,
target: NodeIdentity,
watermark: CommitWatermark,
) -> Self {
Self {
kind,
collection,
range_id,
expected_owner,
expected_epoch,
expected_version,
target,
watermark,
evidence: None,
new_replicas: Vec::new(),
}
}
pub fn with_evidence(mut self, evidence: CatchUpEvidence) -> Self {
self.evidence = Some(evidence);
self
}
pub fn with_replicas(mut self, replicas: impl IntoIterator<Item = NodeIdentity>) -> Self {
self.new_replicas = replicas.into_iter().collect();
self
}
pub fn kind(&self) -> TransitionKind {
self.kind
}
pub fn collection(&self) -> &CollectionId {
&self.collection
}
pub fn range_id(&self) -> RangeId {
self.range_id
}
pub fn target(&self) -> &NodeIdentity {
&self.target
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum InvalidCandidateReason {
NotAReplica,
AlreadyOwner,
}
impl InvalidCandidateReason {
fn label(self) -> &'static str {
match self {
InvalidCandidateReason::NotAReplica => "candidate is not a replica of the range",
InvalidCandidateReason::AlreadyOwner => "candidate is already the current owner",
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TransitionRejection {
UnknownRange {
collection: CollectionId,
range_id: RangeId,
},
OwnerMismatch {
collection: CollectionId,
range_id: RangeId,
expected: NodeIdentity,
current: NodeIdentity,
},
StaleEpoch {
collection: CollectionId,
range_id: RangeId,
expected: OwnershipEpoch,
current: OwnershipEpoch,
},
StaleCatalogVersion {
collection: CollectionId,
range_id: RangeId,
expected: CatalogVersion,
current: CatalogVersion,
},
InvalidCandidate {
collection: CollectionId,
range_id: RangeId,
candidate: NodeIdentity,
reason: InvalidCandidateReason,
},
MissingSafetyEvidence {
collection: CollectionId,
range_id: RangeId,
candidate: NodeIdentity,
},
EvidenceForWrongCandidate {
collection: CollectionId,
range_id: RangeId,
target: NodeIdentity,
evidence_for: NodeIdentity,
},
SafetyCheckFailed {
collection: CollectionId,
range_id: RangeId,
candidate: NodeIdentity,
watermark: CommitWatermark,
applied_term: u64,
applied_lsn: u64,
},
}
impl std::fmt::Display for TransitionRejection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::UnknownRange {
collection,
range_id,
} => write!(f, "no range {collection}/{range_id} in the catalog"),
Self::OwnerMismatch {
collection,
range_id,
expected,
current,
} => write!(
f,
"ownership transition for {collection}/{range_id} expected current owner {expected}, but catalog owner is {current}"
),
Self::StaleEpoch {
collection,
range_id,
expected,
current,
} => write!(
f,
"ownership transition for {collection}/{range_id} expected epoch {expected}, but catalog epoch is {current}"
),
Self::StaleCatalogVersion {
collection,
range_id,
expected,
current,
} => write!(
f,
"ownership transition for {collection}/{range_id} expected catalog version {expected}, but catalog version is {current}"
),
Self::InvalidCandidate {
collection,
range_id,
candidate,
reason,
} => write!(
f,
"invalid ownership transition candidate {candidate} for {collection}/{range_id}: {}",
reason.label()
),
Self::MissingSafetyEvidence {
collection,
range_id,
candidate,
} => write!(
f,
"ownership transition for {collection}/{range_id} carries no safety evidence for candidate {candidate}"
),
Self::EvidenceForWrongCandidate {
collection,
range_id,
target,
evidence_for,
} => write!(
f,
"ownership transition for {collection}/{range_id} targets {target} but its safety evidence describes {evidence_for}"
),
Self::SafetyCheckFailed {
collection,
range_id,
candidate,
watermark,
applied_term,
applied_lsn,
} => write!(
f,
"candidate {candidate} for {collection}/{range_id} is behind the commit watermark (term {}, lsn {}): applied term {applied_term}, lsn {applied_lsn}",
watermark.term, watermark.lsn
),
}
}
}
impl std::error::Error for TransitionRejection {}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PreparedTransition {
kind: TransitionKind,
collection: CollectionId,
range_id: RangeId,
previous_owner: NodeIdentity,
new_owner: NodeIdentity,
previous_epoch: OwnershipEpoch,
previous_version: CatalogVersion,
watermark: CommitWatermark,
next: RangeOwnership,
}
impl PreparedTransition {
pub fn next_entry(&self) -> &RangeOwnership {
&self.next
}
pub fn new_epoch(&self) -> OwnershipEpoch {
self.next.epoch()
}
pub fn activate(
self,
catalog: &mut ShardOwnershipCatalog,
) -> Result<TransitionOutcome, CatalogError> {
let new_epoch = self.next.epoch();
let new_version = self.next.version();
catalog.apply_update(self.next)?;
Ok(TransitionOutcome {
kind: self.kind,
collection: self.collection,
range_id: self.range_id,
previous_owner: self.previous_owner,
new_owner: self.new_owner,
previous_epoch: self.previous_epoch,
new_epoch,
previous_version: self.previous_version,
new_version,
watermark: self.watermark,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TransitionOutcome {
pub kind: TransitionKind,
pub collection: CollectionId,
pub range_id: RangeId,
pub previous_owner: NodeIdentity,
pub new_owner: NodeIdentity,
pub previous_epoch: OwnershipEpoch,
pub new_epoch: OwnershipEpoch,
pub previous_version: CatalogVersion,
pub new_version: CatalogVersion,
pub watermark: CommitWatermark,
}
impl TransitionOutcome {
pub fn fenced_old_owner(&self) -> bool {
self.new_epoch > self.previous_epoch
}
}
impl std::fmt::Display for TransitionOutcome {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{} {}/{}: {} (epoch {}, version {}) -> {} (epoch {}, version {}) over watermark term {} lsn {}",
self.kind.label(),
self.collection,
self.range_id,
self.previous_owner,
self.previous_epoch,
self.previous_version,
self.new_owner,
self.new_epoch,
self.new_version,
self.watermark.term,
self.watermark.lsn,
)
}
}
pub fn prepare(
catalog: &ShardOwnershipCatalog,
request: &TransitionRequest,
) -> Result<PreparedTransition, TransitionRejection> {
let current = catalog.range(&request.collection, request.range_id).ok_or(
TransitionRejection::UnknownRange {
collection: request.collection.clone(),
range_id: request.range_id,
},
)?;
if *current.owner() != request.expected_owner {
return Err(TransitionRejection::OwnerMismatch {
collection: request.collection.clone(),
range_id: request.range_id,
expected: request.expected_owner.clone(),
current: current.owner().clone(),
});
}
if current.epoch() != request.expected_epoch {
return Err(TransitionRejection::StaleEpoch {
collection: request.collection.clone(),
range_id: request.range_id,
expected: request.expected_epoch,
current: current.epoch(),
});
}
if current.version() != request.expected_version {
return Err(TransitionRejection::StaleCatalogVersion {
collection: request.collection.clone(),
range_id: request.range_id,
expected: request.expected_version,
current: current.version(),
});
}
if request.target == *current.owner() {
return Err(TransitionRejection::InvalidCandidate {
collection: request.collection.clone(),
range_id: request.range_id,
candidate: request.target.clone(),
reason: InvalidCandidateReason::AlreadyOwner,
});
}
if !current.replicas().contains(&request.target) {
return Err(TransitionRejection::InvalidCandidate {
collection: request.collection.clone(),
range_id: request.range_id,
candidate: request.target.clone(),
reason: InvalidCandidateReason::NotAReplica,
});
}
let evidence =
request
.evidence
.as_ref()
.ok_or_else(|| TransitionRejection::MissingSafetyEvidence {
collection: request.collection.clone(),
range_id: request.range_id,
candidate: request.target.clone(),
})?;
if evidence.candidate != request.target {
return Err(TransitionRejection::EvidenceForWrongCandidate {
collection: request.collection.clone(),
range_id: request.range_id,
target: request.target.clone(),
evidence_for: evidence.candidate.clone(),
});
}
if !evidence.covers(request.watermark) {
return Err(TransitionRejection::SafetyCheckFailed {
collection: request.collection.clone(),
range_id: request.range_id,
candidate: request.target.clone(),
watermark: request.watermark,
applied_term: evidence.applied_term,
applied_lsn: evidence.applied_lsn,
});
}
let next = current.transfer_to(request.target.clone(), request.new_replicas.clone());
Ok(PreparedTransition {
kind: request.kind,
collection: request.collection.clone(),
range_id: request.range_id,
previous_owner: current.owner().clone(),
new_owner: request.target.clone(),
previous_epoch: current.epoch(),
previous_version: current.version(),
watermark: request.watermark,
next,
})
}
pub fn run_transition(
catalog: &mut ShardOwnershipCatalog,
request: &TransitionRequest,
) -> Result<TransitionOutcome, TransitionError> {
let prepared = prepare(catalog, request)?;
prepared.activate(catalog).map_err(TransitionError::Catalog)
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TransitionError {
Rejected(TransitionRejection),
Catalog(CatalogError),
}
impl From<TransitionRejection> for TransitionError {
fn from(value: TransitionRejection) -> Self {
TransitionError::Rejected(value)
}
}
impl std::fmt::Display for TransitionError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Rejected(err) => write!(f, "{err}"),
Self::Catalog(err) => write!(f, "{err}"),
}
}
}
impl std::error::Error for TransitionError {}
#[cfg(test)]
mod tests {
use super::*;
use crate::cluster::ownership::{
OwnershipEpoch, PlacementMetadata, RangeBounds, RangeRole, RangeWriteReject, ShardKeyMode,
};
fn collection(name: &str) -> CollectionId {
CollectionId::new(name).unwrap()
}
fn ident(cn: &str) -> NodeIdentity {
NodeIdentity::from_certificate_subject(cn).unwrap()
}
fn catalog_with(owner: &str, replicas: &[&str]) -> (ShardOwnershipCatalog, CollectionId) {
let orders = collection("orders");
let mut catalog = ShardOwnershipCatalog::new();
catalog
.apply_update(RangeOwnership::establish(
orders.clone(),
RangeId::new(1),
ShardKeyMode::Hash,
RangeBounds::full(),
ident(owner),
replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
PlacementMetadata::with_replication_factor(3),
))
.unwrap();
(catalog, orders)
}
fn request(
kind: TransitionKind,
orders: &CollectionId,
expected_owner: &str,
target: &str,
) -> TransitionRequest {
TransitionRequest::new(
kind,
orders.clone(),
RangeId::new(1),
ident(expected_owner),
OwnershipEpoch::initial(),
CatalogVersion::initial(),
ident(target),
CommitWatermark::new(1, 10),
)
.with_evidence(CatchUpEvidence::new(ident(target), 1, 10))
}
#[test]
fn successful_promote_moves_authority_and_bumps_epoch() {
let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let req = request(TransitionKind::Promote, &orders, "CN=node-a", "CN=node-b");
let outcome = run_transition(&mut catalog, &req).expect("promote should succeed");
assert_eq!(outcome.kind, TransitionKind::Promote);
assert_eq!(outcome.previous_owner, ident("CN=node-a"));
assert_eq!(outcome.new_owner, ident("CN=node-b"));
assert_eq!(outcome.previous_epoch, OwnershipEpoch::initial());
assert_eq!(outcome.new_epoch.value(), 2);
assert_eq!(outcome.new_version.value(), 2);
assert!(outcome.fenced_old_owner());
let range = catalog.range(&orders, RangeId::new(1)).unwrap();
assert_eq!(range.owner(), &ident("CN=node-b"));
assert_eq!(range.epoch().value(), 2);
let audit = outcome.to_string();
assert!(audit.contains("promote"));
assert!(audit.contains("CN=node-a"));
assert!(audit.contains("CN=node-b"));
}
#[test]
fn successful_handoff_demotes_old_owner_to_replica() {
let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let req = request(TransitionKind::Handoff, &orders, "CN=node-a", "CN=node-b")
.with_replicas([ident("CN=node-a")]);
let outcome = run_transition(&mut catalog, &req).expect("handoff should succeed");
assert_eq!(outcome.kind, TransitionKind::Handoff);
let range = catalog.range(&orders, RangeId::new(1)).unwrap();
assert_eq!(range.owner(), &ident("CN=node-b"));
assert_eq!(range.role_of(&ident("CN=node-a")), RangeRole::Replica);
assert_eq!(range.role_of(&ident("CN=node-b")), RangeRole::Owner);
}
#[test]
fn old_owner_is_fenced_from_durable_writes_after_transition() {
let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
assert!(catalog
.admit_public_write(
&ident("CN=node-a"),
&orders,
b"k",
OwnershipEpoch::initial()
)
.is_ok());
let req = request(TransitionKind::Promote, &orders, "CN=node-a", "CN=node-b")
.with_replicas([ident("CN=node-a")]);
run_transition(&mut catalog, &req).unwrap();
let err = catalog
.admit_public_write(
&ident("CN=node-a"),
&orders,
b"k",
OwnershipEpoch::initial(),
)
.unwrap_err();
assert!(matches!(err, RangeWriteReject::NotOwner { .. }));
assert!(catalog
.admit_public_write(
&ident("CN=node-b"),
&orders,
b"k",
catalog.range(&orders, RangeId::new(1)).unwrap().epoch()
)
.is_ok());
}
#[test]
fn prepare_does_not_mutate_catalog() {
let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let req = request(TransitionKind::Promote, &orders, "CN=node-a", "CN=node-b");
let _prepared = prepare(&catalog, &req).expect("prepare ok");
let range = catalog.range(&orders, RangeId::new(1)).unwrap();
assert_eq!(range.owner(), &ident("CN=node-a"));
assert_eq!(range.epoch(), OwnershipEpoch::initial());
}
fn bumped(catalog: &ShardOwnershipCatalog, orders: &CollectionId) -> RangeOwnership {
catalog
.range(orders, RangeId::new(1))
.unwrap()
.transfer_to(ident("CN=tmp"), [])
}
#[test]
fn stale_catalog_version_is_rejected() {
let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let v2_entry = catalog
.range(&orders, RangeId::new(1))
.unwrap()
.update_replicas([ident("CN=node-b")]);
catalog.apply_update(v2_entry).unwrap();
let req = request(TransitionKind::Promote, &orders, "CN=node-a", "CN=node-b");
let err = prepare(&catalog, &req).unwrap_err();
match err {
TransitionRejection::StaleCatalogVersion {
expected, current, ..
} => {
assert_eq!(expected, CatalogVersion::initial());
assert_eq!(current.value(), 2);
}
other => panic!("expected StaleCatalogVersion, got {other:?}"),
}
}
#[test]
fn stale_expected_owner_is_rejected() {
let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let req = request(TransitionKind::Promote, &orders, "CN=node-x", "CN=node-b");
let err = prepare(&catalog, &req).unwrap_err();
match err {
TransitionRejection::OwnerMismatch {
expected, current, ..
} => {
assert_eq!(expected, ident("CN=node-x"));
assert_eq!(current, ident("CN=node-a"));
}
other => panic!("expected OwnerMismatch, got {other:?}"),
}
}
#[test]
fn stale_expected_epoch_is_rejected() {
let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let wrong_epoch = bumped(&catalog, &orders).epoch();
assert_eq!(wrong_epoch.value(), 2);
let req = TransitionRequest::new(
TransitionKind::Promote,
orders.clone(),
RangeId::new(1),
ident("CN=node-a"),
wrong_epoch,
CatalogVersion::initial(),
ident("CN=node-b"),
CommitWatermark::new(1, 10),
)
.with_evidence(CatchUpEvidence::new(ident("CN=node-b"), 1, 10));
let err = prepare(&catalog, &req).unwrap_err();
assert!(matches!(err, TransitionRejection::StaleEpoch { .. }));
}
#[test]
fn invalid_candidate_not_a_replica_is_rejected() {
let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let req = request(TransitionKind::Promote, &orders, "CN=node-a", "CN=node-z");
let err = prepare(&catalog, &req).unwrap_err();
match err {
TransitionRejection::InvalidCandidate { reason, .. } => {
assert_eq!(reason, InvalidCandidateReason::NotAReplica);
}
other => panic!("expected InvalidCandidate(NotAReplica), got {other:?}"),
}
}
#[test]
fn invalid_candidate_already_owner_is_rejected() {
let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let req = request(TransitionKind::Promote, &orders, "CN=node-a", "CN=node-a")
.with_evidence(CatchUpEvidence::new(ident("CN=node-a"), 1, 10));
let err = prepare(&catalog, &req).unwrap_err();
match err {
TransitionRejection::InvalidCandidate { reason, .. } => {
assert_eq!(reason, InvalidCandidateReason::AlreadyOwner);
}
other => panic!("expected InvalidCandidate(AlreadyOwner), got {other:?}"),
}
}
#[test]
fn missing_safety_evidence_fails_closed() {
let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let req = TransitionRequest::new(
TransitionKind::Promote,
orders.clone(),
RangeId::new(1),
ident("CN=node-a"),
OwnershipEpoch::initial(),
CatalogVersion::initial(),
ident("CN=node-b"),
CommitWatermark::new(1, 10),
); let err = prepare(&catalog, &req).unwrap_err();
assert!(matches!(
err,
TransitionRejection::MissingSafetyEvidence { .. }
));
}
#[test]
fn evidence_for_a_different_candidate_is_rejected() {
let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let req = request(TransitionKind::Promote, &orders, "CN=node-a", "CN=node-b")
.with_evidence(CatchUpEvidence::new(ident("CN=node-c"), 9, 99));
let err = prepare(&catalog, &req).unwrap_err();
assert!(matches!(
err,
TransitionRejection::EvidenceForWrongCandidate { .. }
));
}
#[test]
fn candidate_behind_commit_watermark_fails_safety_check() {
let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let req = TransitionRequest::new(
TransitionKind::Promote,
orders.clone(),
RangeId::new(1),
ident("CN=node-a"),
OwnershipEpoch::initial(),
CatalogVersion::initial(),
ident("CN=node-b"),
CommitWatermark::new(2, 50),
)
.with_evidence(CatchUpEvidence::new(ident("CN=node-b"), 2, 49));
let err = prepare(&catalog, &req).unwrap_err();
match err {
TransitionRejection::SafetyCheckFailed {
applied_lsn,
watermark,
..
} => {
assert_eq!(applied_lsn, 49);
assert_eq!(watermark, CommitWatermark::new(2, 50));
}
other => panic!("expected SafetyCheckFailed, got {other:?}"),
}
}
#[test]
fn candidate_on_older_term_fails_even_with_higher_lsn() {
let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let req = TransitionRequest::new(
TransitionKind::Promote,
orders.clone(),
RangeId::new(1),
ident("CN=node-a"),
OwnershipEpoch::initial(),
CatalogVersion::initial(),
ident("CN=node-b"),
CommitWatermark::new(3, 10),
)
.with_evidence(CatchUpEvidence::new(ident("CN=node-b"), 2, 9999));
let err = prepare(&catalog, &req).unwrap_err();
assert!(matches!(err, TransitionRejection::SafetyCheckFailed { .. }));
}
#[test]
fn evidence_on_newer_term_covers_watermark() {
let evidence = CatchUpEvidence::new(ident("CN=node-b"), 5, 0);
assert!(evidence.covers(CommitWatermark::new(4, 9999)));
}
#[test]
fn rejected_transition_leaves_catalog_unchanged() {
let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let req = request(TransitionKind::Promote, &orders, "CN=node-a", "CN=node-z");
assert!(run_transition(&mut catalog, &req).is_err());
let range = catalog.range(&orders, RangeId::new(1)).unwrap();
assert_eq!(range.owner(), &ident("CN=node-a"));
assert_eq!(range.epoch(), OwnershipEpoch::initial());
assert_eq!(range.version(), CatalogVersion::initial());
}
#[test]
fn second_transition_with_stale_cas_loses() {
let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b", "CN=node-c"]);
let first = request(TransitionKind::Promote, &orders, "CN=node-a", "CN=node-b");
run_transition(&mut catalog, &first).unwrap();
let second = request(TransitionKind::Promote, &orders, "CN=node-a", "CN=node-c");
let err = run_transition(&mut catalog, &second).unwrap_err();
assert!(matches!(
err,
TransitionError::Rejected(TransitionRejection::OwnerMismatch { .. })
));
}
}