1use super::identity::NodeIdentity;
60use super::ownership::{
61 CatalogError, CatalogVersion, CollectionId, OwnershipEpoch, RangeId, RangeOwnership,
62 ShardOwnershipCatalog,
63};
64
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub struct CommitWatermark {
71 pub term: u64,
74 pub lsn: u64,
76}
77
78impl CommitWatermark {
79 pub fn new(term: u64, lsn: u64) -> Self {
80 Self { term, lsn }
81 }
82}
83
84#[derive(Debug, Clone, PartialEq, Eq)]
90pub struct CatchUpEvidence {
91 pub candidate: NodeIdentity,
94 pub applied_term: u64,
96 pub applied_lsn: u64,
98}
99
100impl CatchUpEvidence {
101 pub fn new(candidate: NodeIdentity, applied_term: u64, applied_lsn: u64) -> Self {
102 Self {
103 candidate,
104 applied_term,
105 applied_lsn,
106 }
107 }
108
109 pub fn covers(&self, watermark: CommitWatermark) -> bool {
113 self.applied_term > watermark.term
114 || (self.applied_term == watermark.term && self.applied_lsn >= watermark.lsn)
115 }
116}
117
118#[derive(Debug, Clone, Copy, PartialEq, Eq)]
121pub enum TransitionKind {
122 Promote,
125 Handoff,
128}
129
130impl TransitionKind {
131 fn label(self) -> &'static str {
132 match self {
133 TransitionKind::Promote => "promote",
134 TransitionKind::Handoff => "handoff",
135 }
136 }
137}
138
139#[derive(Debug, Clone, PartialEq, Eq)]
146pub struct TransitionRequest {
147 kind: TransitionKind,
148 collection: CollectionId,
149 range_id: RangeId,
150 expected_owner: NodeIdentity,
151 expected_epoch: OwnershipEpoch,
152 expected_version: CatalogVersion,
153 target: NodeIdentity,
154 watermark: CommitWatermark,
155 evidence: Option<CatchUpEvidence>,
156 new_replicas: Vec<NodeIdentity>,
157}
158
159impl TransitionRequest {
160 #[allow(clippy::too_many_arguments)]
166 pub fn new(
167 kind: TransitionKind,
168 collection: CollectionId,
169 range_id: RangeId,
170 expected_owner: NodeIdentity,
171 expected_epoch: OwnershipEpoch,
172 expected_version: CatalogVersion,
173 target: NodeIdentity,
174 watermark: CommitWatermark,
175 ) -> Self {
176 Self {
177 kind,
178 collection,
179 range_id,
180 expected_owner,
181 expected_epoch,
182 expected_version,
183 target,
184 watermark,
185 evidence: None,
186 new_replicas: Vec::new(),
187 }
188 }
189
190 pub fn with_evidence(mut self, evidence: CatchUpEvidence) -> Self {
192 self.evidence = Some(evidence);
193 self
194 }
195
196 pub fn with_replicas(mut self, replicas: impl IntoIterator<Item = NodeIdentity>) -> Self {
199 self.new_replicas = replicas.into_iter().collect();
200 self
201 }
202
203 pub fn kind(&self) -> TransitionKind {
204 self.kind
205 }
206
207 pub fn collection(&self) -> &CollectionId {
208 &self.collection
209 }
210
211 pub fn range_id(&self) -> RangeId {
212 self.range_id
213 }
214
215 pub fn target(&self) -> &NodeIdentity {
216 &self.target
217 }
218}
219
220#[derive(Debug, Clone, Copy, PartialEq, Eq)]
222pub enum InvalidCandidateReason {
223 NotAReplica,
227 AlreadyOwner,
230}
231
232impl InvalidCandidateReason {
233 fn label(self) -> &'static str {
234 match self {
235 InvalidCandidateReason::NotAReplica => "candidate is not a replica of the range",
236 InvalidCandidateReason::AlreadyOwner => "candidate is already the current owner",
237 }
238 }
239}
240
241#[derive(Debug, Clone, PartialEq, Eq)]
244pub enum TransitionRejection {
245 UnknownRange {
247 collection: CollectionId,
248 range_id: RangeId,
249 },
250 OwnerMismatch {
253 collection: CollectionId,
254 range_id: RangeId,
255 expected: NodeIdentity,
256 current: NodeIdentity,
257 },
258 StaleEpoch {
261 collection: CollectionId,
262 range_id: RangeId,
263 expected: OwnershipEpoch,
264 current: OwnershipEpoch,
265 },
266 StaleCatalogVersion {
269 collection: CollectionId,
270 range_id: RangeId,
271 expected: CatalogVersion,
272 current: CatalogVersion,
273 },
274 InvalidCandidate {
276 collection: CollectionId,
277 range_id: RangeId,
278 candidate: NodeIdentity,
279 reason: InvalidCandidateReason,
280 },
281 MissingSafetyEvidence {
284 collection: CollectionId,
285 range_id: RangeId,
286 candidate: NodeIdentity,
287 },
288 EvidenceForWrongCandidate {
291 collection: CollectionId,
292 range_id: RangeId,
293 target: NodeIdentity,
294 evidence_for: NodeIdentity,
295 },
296 SafetyCheckFailed {
299 collection: CollectionId,
300 range_id: RangeId,
301 candidate: NodeIdentity,
302 watermark: CommitWatermark,
303 applied_term: u64,
304 applied_lsn: u64,
305 },
306}
307
308impl std::fmt::Display for TransitionRejection {
309 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
310 match self {
311 Self::UnknownRange {
312 collection,
313 range_id,
314 } => write!(f, "no range {collection}/{range_id} in the catalog"),
315 Self::OwnerMismatch {
316 collection,
317 range_id,
318 expected,
319 current,
320 } => write!(
321 f,
322 "ownership transition for {collection}/{range_id} expected current owner {expected}, but catalog owner is {current}"
323 ),
324 Self::StaleEpoch {
325 collection,
326 range_id,
327 expected,
328 current,
329 } => write!(
330 f,
331 "ownership transition for {collection}/{range_id} expected epoch {expected}, but catalog epoch is {current}"
332 ),
333 Self::StaleCatalogVersion {
334 collection,
335 range_id,
336 expected,
337 current,
338 } => write!(
339 f,
340 "ownership transition for {collection}/{range_id} expected catalog version {expected}, but catalog version is {current}"
341 ),
342 Self::InvalidCandidate {
343 collection,
344 range_id,
345 candidate,
346 reason,
347 } => write!(
348 f,
349 "invalid ownership transition candidate {candidate} for {collection}/{range_id}: {}",
350 reason.label()
351 ),
352 Self::MissingSafetyEvidence {
353 collection,
354 range_id,
355 candidate,
356 } => write!(
357 f,
358 "ownership transition for {collection}/{range_id} carries no safety evidence for candidate {candidate}"
359 ),
360 Self::EvidenceForWrongCandidate {
361 collection,
362 range_id,
363 target,
364 evidence_for,
365 } => write!(
366 f,
367 "ownership transition for {collection}/{range_id} targets {target} but its safety evidence describes {evidence_for}"
368 ),
369 Self::SafetyCheckFailed {
370 collection,
371 range_id,
372 candidate,
373 watermark,
374 applied_term,
375 applied_lsn,
376 } => write!(
377 f,
378 "candidate {candidate} for {collection}/{range_id} is behind the commit watermark (term {}, lsn {}): applied term {applied_term}, lsn {applied_lsn}",
379 watermark.term, watermark.lsn
380 ),
381 }
382 }
383}
384
385impl std::error::Error for TransitionRejection {}
386
387#[derive(Debug, Clone, PartialEq, Eq)]
392pub struct PreparedTransition {
393 kind: TransitionKind,
394 collection: CollectionId,
395 range_id: RangeId,
396 previous_owner: NodeIdentity,
397 new_owner: NodeIdentity,
398 previous_epoch: OwnershipEpoch,
399 previous_version: CatalogVersion,
400 watermark: CommitWatermark,
401 next: RangeOwnership,
402}
403
404impl PreparedTransition {
405 pub fn next_entry(&self) -> &RangeOwnership {
408 &self.next
409 }
410
411 pub fn new_epoch(&self) -> OwnershipEpoch {
415 self.next.epoch()
416 }
417
418 pub fn activate(
424 self,
425 catalog: &mut ShardOwnershipCatalog,
426 ) -> Result<TransitionOutcome, CatalogError> {
427 let new_epoch = self.next.epoch();
428 let new_version = self.next.version();
429 catalog.apply_update(self.next)?;
430 Ok(TransitionOutcome {
431 kind: self.kind,
432 collection: self.collection,
433 range_id: self.range_id,
434 previous_owner: self.previous_owner,
435 new_owner: self.new_owner,
436 previous_epoch: self.previous_epoch,
437 new_epoch,
438 previous_version: self.previous_version,
439 new_version,
440 watermark: self.watermark,
441 })
442 }
443}
444
445#[derive(Debug, Clone, PartialEq, Eq)]
450pub struct TransitionOutcome {
451 pub kind: TransitionKind,
452 pub collection: CollectionId,
453 pub range_id: RangeId,
454 pub previous_owner: NodeIdentity,
455 pub new_owner: NodeIdentity,
456 pub previous_epoch: OwnershipEpoch,
457 pub new_epoch: OwnershipEpoch,
458 pub previous_version: CatalogVersion,
459 pub new_version: CatalogVersion,
460 pub watermark: CommitWatermark,
461}
462
463impl TransitionOutcome {
464 pub fn fenced_old_owner(&self) -> bool {
468 self.new_epoch > self.previous_epoch
469 }
470}
471
472impl std::fmt::Display for TransitionOutcome {
473 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
474 write!(
475 f,
476 "{} {}/{}: {} (epoch {}, version {}) -> {} (epoch {}, version {}) over watermark term {} lsn {}",
477 self.kind.label(),
478 self.collection,
479 self.range_id,
480 self.previous_owner,
481 self.previous_epoch,
482 self.previous_version,
483 self.new_owner,
484 self.new_epoch,
485 self.new_version,
486 self.watermark.term,
487 self.watermark.lsn,
488 )
489 }
490}
491
492pub fn prepare(
504 catalog: &ShardOwnershipCatalog,
505 request: &TransitionRequest,
506) -> Result<PreparedTransition, TransitionRejection> {
507 let current = catalog.range(&request.collection, request.range_id).ok_or(
508 TransitionRejection::UnknownRange {
509 collection: request.collection.clone(),
510 range_id: request.range_id,
511 },
512 )?;
513
514 if *current.owner() != request.expected_owner {
517 return Err(TransitionRejection::OwnerMismatch {
518 collection: request.collection.clone(),
519 range_id: request.range_id,
520 expected: request.expected_owner.clone(),
521 current: current.owner().clone(),
522 });
523 }
524 if current.epoch() != request.expected_epoch {
525 return Err(TransitionRejection::StaleEpoch {
526 collection: request.collection.clone(),
527 range_id: request.range_id,
528 expected: request.expected_epoch,
529 current: current.epoch(),
530 });
531 }
532 if current.version() != request.expected_version {
533 return Err(TransitionRejection::StaleCatalogVersion {
534 collection: request.collection.clone(),
535 range_id: request.range_id,
536 expected: request.expected_version,
537 current: current.version(),
538 });
539 }
540
541 if request.target == *current.owner() {
544 return Err(TransitionRejection::InvalidCandidate {
545 collection: request.collection.clone(),
546 range_id: request.range_id,
547 candidate: request.target.clone(),
548 reason: InvalidCandidateReason::AlreadyOwner,
549 });
550 }
551 if !current.replicas().contains(&request.target) {
552 return Err(TransitionRejection::InvalidCandidate {
553 collection: request.collection.clone(),
554 range_id: request.range_id,
555 candidate: request.target.clone(),
556 reason: InvalidCandidateReason::NotAReplica,
557 });
558 }
559
560 let evidence =
563 request
564 .evidence
565 .as_ref()
566 .ok_or_else(|| TransitionRejection::MissingSafetyEvidence {
567 collection: request.collection.clone(),
568 range_id: request.range_id,
569 candidate: request.target.clone(),
570 })?;
571 if evidence.candidate != request.target {
572 return Err(TransitionRejection::EvidenceForWrongCandidate {
573 collection: request.collection.clone(),
574 range_id: request.range_id,
575 target: request.target.clone(),
576 evidence_for: evidence.candidate.clone(),
577 });
578 }
579 if !evidence.covers(request.watermark) {
580 return Err(TransitionRejection::SafetyCheckFailed {
581 collection: request.collection.clone(),
582 range_id: request.range_id,
583 candidate: request.target.clone(),
584 watermark: request.watermark,
585 applied_term: evidence.applied_term,
586 applied_lsn: evidence.applied_lsn,
587 });
588 }
589
590 let next = current.transfer_to(request.target.clone(), request.new_replicas.clone());
592 Ok(PreparedTransition {
593 kind: request.kind,
594 collection: request.collection.clone(),
595 range_id: request.range_id,
596 previous_owner: current.owner().clone(),
597 new_owner: request.target.clone(),
598 previous_epoch: current.epoch(),
599 previous_version: current.version(),
600 watermark: request.watermark,
601 next,
602 })
603}
604
605pub fn run_transition(
610 catalog: &mut ShardOwnershipCatalog,
611 request: &TransitionRequest,
612) -> Result<TransitionOutcome, TransitionError> {
613 let prepared = prepare(catalog, request)?;
614 prepared.activate(catalog).map_err(TransitionError::Catalog)
615}
616
617#[derive(Debug, Clone, PartialEq, Eq)]
620pub enum TransitionError {
621 Rejected(TransitionRejection),
623 Catalog(CatalogError),
625}
626
627impl From<TransitionRejection> for TransitionError {
628 fn from(value: TransitionRejection) -> Self {
629 TransitionError::Rejected(value)
630 }
631}
632
633impl std::fmt::Display for TransitionError {
634 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
635 match self {
636 Self::Rejected(err) => write!(f, "{err}"),
637 Self::Catalog(err) => write!(f, "{err}"),
638 }
639 }
640}
641
642impl std::error::Error for TransitionError {}
643
644#[cfg(test)]
645mod tests {
646 use super::*;
647 use crate::cluster::ownership::{
648 OwnershipEpoch, PlacementMetadata, RangeBounds, RangeRole, RangeWriteReject, ShardKeyMode,
649 };
650
651 fn collection(name: &str) -> CollectionId {
652 CollectionId::new(name).unwrap()
653 }
654
655 fn ident(cn: &str) -> NodeIdentity {
656 NodeIdentity::from_certificate_subject(cn).unwrap()
657 }
658
659 fn catalog_with(owner: &str, replicas: &[&str]) -> (ShardOwnershipCatalog, CollectionId) {
661 let orders = collection("orders");
662 let mut catalog = ShardOwnershipCatalog::new();
663 catalog
664 .apply_update(RangeOwnership::establish(
665 orders.clone(),
666 RangeId::new(1),
667 ShardKeyMode::Hash,
668 RangeBounds::full(),
669 ident(owner),
670 replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
671 PlacementMetadata::with_replication_factor(3),
672 ))
673 .unwrap();
674 (catalog, orders)
675 }
676
677 fn request(
680 kind: TransitionKind,
681 orders: &CollectionId,
682 expected_owner: &str,
683 target: &str,
684 ) -> TransitionRequest {
685 TransitionRequest::new(
686 kind,
687 orders.clone(),
688 RangeId::new(1),
689 ident(expected_owner),
690 OwnershipEpoch::initial(),
691 CatalogVersion::initial(),
692 ident(target),
693 CommitWatermark::new(1, 10),
694 )
695 .with_evidence(CatchUpEvidence::new(ident(target), 1, 10))
696 }
697
698 #[test]
699 fn successful_promote_moves_authority_and_bumps_epoch() {
700 let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
701 let req = request(TransitionKind::Promote, &orders, "CN=node-a", "CN=node-b");
702
703 let outcome = run_transition(&mut catalog, &req).expect("promote should succeed");
704
705 assert_eq!(outcome.kind, TransitionKind::Promote);
706 assert_eq!(outcome.previous_owner, ident("CN=node-a"));
707 assert_eq!(outcome.new_owner, ident("CN=node-b"));
708 assert_eq!(outcome.previous_epoch, OwnershipEpoch::initial());
709 assert_eq!(outcome.new_epoch.value(), 2);
710 assert_eq!(outcome.new_version.value(), 2);
711 assert!(outcome.fenced_old_owner());
712
713 let range = catalog.range(&orders, RangeId::new(1)).unwrap();
715 assert_eq!(range.owner(), &ident("CN=node-b"));
716 assert_eq!(range.epoch().value(), 2);
717 let audit = outcome.to_string();
719 assert!(audit.contains("promote"));
720 assert!(audit.contains("CN=node-a"));
721 assert!(audit.contains("CN=node-b"));
722 }
723
724 #[test]
725 fn successful_handoff_demotes_old_owner_to_replica() {
726 let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
727 let req = request(TransitionKind::Handoff, &orders, "CN=node-a", "CN=node-b")
729 .with_replicas([ident("CN=node-a")]);
730
731 let outcome = run_transition(&mut catalog, &req).expect("handoff should succeed");
732 assert_eq!(outcome.kind, TransitionKind::Handoff);
733
734 let range = catalog.range(&orders, RangeId::new(1)).unwrap();
735 assert_eq!(range.owner(), &ident("CN=node-b"));
736 assert_eq!(range.role_of(&ident("CN=node-a")), RangeRole::Replica);
737 assert_eq!(range.role_of(&ident("CN=node-b")), RangeRole::Owner);
738 }
739
740 #[test]
741 fn old_owner_is_fenced_from_durable_writes_after_transition() {
742 let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
743 assert!(catalog
745 .admit_public_write(
746 &ident("CN=node-a"),
747 &orders,
748 b"k",
749 OwnershipEpoch::initial()
750 )
751 .is_ok());
752
753 let req = request(TransitionKind::Promote, &orders, "CN=node-a", "CN=node-b")
754 .with_replicas([ident("CN=node-a")]);
755 run_transition(&mut catalog, &req).unwrap();
756
757 let err = catalog
760 .admit_public_write(
761 &ident("CN=node-a"),
762 &orders,
763 b"k",
764 OwnershipEpoch::initial(),
765 )
766 .unwrap_err();
767 assert!(matches!(err, RangeWriteReject::NotOwner { .. }));
768
769 assert!(catalog
772 .admit_public_write(
773 &ident("CN=node-b"),
774 &orders,
775 b"k",
776 catalog.range(&orders, RangeId::new(1)).unwrap().epoch()
777 )
778 .is_ok());
779 }
780
781 #[test]
782 fn prepare_does_not_mutate_catalog() {
783 let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
784 let req = request(TransitionKind::Promote, &orders, "CN=node-a", "CN=node-b");
785 let _prepared = prepare(&catalog, &req).expect("prepare ok");
786 let range = catalog.range(&orders, RangeId::new(1)).unwrap();
789 assert_eq!(range.owner(), &ident("CN=node-a"));
790 assert_eq!(range.epoch(), OwnershipEpoch::initial());
791 }
792
793 fn bumped(catalog: &ShardOwnershipCatalog, orders: &CollectionId) -> RangeOwnership {
797 catalog
798 .range(orders, RangeId::new(1))
799 .unwrap()
800 .transfer_to(ident("CN=tmp"), [])
801 }
802
803 #[test]
804 fn stale_catalog_version_is_rejected() {
805 let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
806 let v2_entry = catalog
809 .range(&orders, RangeId::new(1))
810 .unwrap()
811 .update_replicas([ident("CN=node-b")]);
812 catalog.apply_update(v2_entry).unwrap();
813
814 let req = request(TransitionKind::Promote, &orders, "CN=node-a", "CN=node-b");
815 let err = prepare(&catalog, &req).unwrap_err();
817 match err {
818 TransitionRejection::StaleCatalogVersion {
819 expected, current, ..
820 } => {
821 assert_eq!(expected, CatalogVersion::initial());
822 assert_eq!(current.value(), 2);
823 }
824 other => panic!("expected StaleCatalogVersion, got {other:?}"),
825 }
826 }
827
828 #[test]
829 fn stale_expected_owner_is_rejected() {
830 let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
831 let req = request(TransitionKind::Promote, &orders, "CN=node-x", "CN=node-b");
833 let err = prepare(&catalog, &req).unwrap_err();
834 match err {
835 TransitionRejection::OwnerMismatch {
836 expected, current, ..
837 } => {
838 assert_eq!(expected, ident("CN=node-x"));
839 assert_eq!(current, ident("CN=node-a"));
840 }
841 other => panic!("expected OwnerMismatch, got {other:?}"),
842 }
843 }
844
845 #[test]
846 fn stale_expected_epoch_is_rejected() {
847 let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
848 let wrong_epoch = bumped(&catalog, &orders).epoch();
850 assert_eq!(wrong_epoch.value(), 2);
851 let req = TransitionRequest::new(
852 TransitionKind::Promote,
853 orders.clone(),
854 RangeId::new(1),
855 ident("CN=node-a"),
856 wrong_epoch,
857 CatalogVersion::initial(),
858 ident("CN=node-b"),
859 CommitWatermark::new(1, 10),
860 )
861 .with_evidence(CatchUpEvidence::new(ident("CN=node-b"), 1, 10));
862 let err = prepare(&catalog, &req).unwrap_err();
863 assert!(matches!(err, TransitionRejection::StaleEpoch { .. }));
864 }
865
866 #[test]
867 fn invalid_candidate_not_a_replica_is_rejected() {
868 let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
869 let req = request(TransitionKind::Promote, &orders, "CN=node-a", "CN=node-z");
871 let err = prepare(&catalog, &req).unwrap_err();
872 match err {
873 TransitionRejection::InvalidCandidate { reason, .. } => {
874 assert_eq!(reason, InvalidCandidateReason::NotAReplica);
875 }
876 other => panic!("expected InvalidCandidate(NotAReplica), got {other:?}"),
877 }
878 }
879
880 #[test]
881 fn invalid_candidate_already_owner_is_rejected() {
882 let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
883 let req = request(TransitionKind::Promote, &orders, "CN=node-a", "CN=node-a")
885 .with_evidence(CatchUpEvidence::new(ident("CN=node-a"), 1, 10));
886 let err = prepare(&catalog, &req).unwrap_err();
887 match err {
888 TransitionRejection::InvalidCandidate { reason, .. } => {
889 assert_eq!(reason, InvalidCandidateReason::AlreadyOwner);
890 }
891 other => panic!("expected InvalidCandidate(AlreadyOwner), got {other:?}"),
892 }
893 }
894
895 #[test]
896 fn missing_safety_evidence_fails_closed() {
897 let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
898 let req = TransitionRequest::new(
899 TransitionKind::Promote,
900 orders.clone(),
901 RangeId::new(1),
902 ident("CN=node-a"),
903 OwnershipEpoch::initial(),
904 CatalogVersion::initial(),
905 ident("CN=node-b"),
906 CommitWatermark::new(1, 10),
907 ); let err = prepare(&catalog, &req).unwrap_err();
909 assert!(matches!(
910 err,
911 TransitionRejection::MissingSafetyEvidence { .. }
912 ));
913 }
914
915 #[test]
916 fn evidence_for_a_different_candidate_is_rejected() {
917 let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
918 let req = request(TransitionKind::Promote, &orders, "CN=node-a", "CN=node-b")
920 .with_evidence(CatchUpEvidence::new(ident("CN=node-c"), 9, 99));
921 let err = prepare(&catalog, &req).unwrap_err();
922 assert!(matches!(
923 err,
924 TransitionRejection::EvidenceForWrongCandidate { .. }
925 ));
926 }
927
928 #[test]
929 fn candidate_behind_commit_watermark_fails_safety_check() {
930 let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
931 let req = TransitionRequest::new(
933 TransitionKind::Promote,
934 orders.clone(),
935 RangeId::new(1),
936 ident("CN=node-a"),
937 OwnershipEpoch::initial(),
938 CatalogVersion::initial(),
939 ident("CN=node-b"),
940 CommitWatermark::new(2, 50),
941 )
942 .with_evidence(CatchUpEvidence::new(ident("CN=node-b"), 2, 49));
943 let err = prepare(&catalog, &req).unwrap_err();
944 match err {
945 TransitionRejection::SafetyCheckFailed {
946 applied_lsn,
947 watermark,
948 ..
949 } => {
950 assert_eq!(applied_lsn, 49);
951 assert_eq!(watermark, CommitWatermark::new(2, 50));
952 }
953 other => panic!("expected SafetyCheckFailed, got {other:?}"),
954 }
955 }
956
957 #[test]
958 fn candidate_on_older_term_fails_even_with_higher_lsn() {
959 let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
960 let req = TransitionRequest::new(
962 TransitionKind::Promote,
963 orders.clone(),
964 RangeId::new(1),
965 ident("CN=node-a"),
966 OwnershipEpoch::initial(),
967 CatalogVersion::initial(),
968 ident("CN=node-b"),
969 CommitWatermark::new(3, 10),
970 )
971 .with_evidence(CatchUpEvidence::new(ident("CN=node-b"), 2, 9999));
972 let err = prepare(&catalog, &req).unwrap_err();
973 assert!(matches!(err, TransitionRejection::SafetyCheckFailed { .. }));
974 }
975
976 #[test]
977 fn evidence_on_newer_term_covers_watermark() {
978 let evidence = CatchUpEvidence::new(ident("CN=node-b"), 5, 0);
980 assert!(evidence.covers(CommitWatermark::new(4, 9999)));
981 }
982
983 #[test]
984 fn rejected_transition_leaves_catalog_unchanged() {
985 let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
986 let req = request(TransitionKind::Promote, &orders, "CN=node-a", "CN=node-z");
987 assert!(run_transition(&mut catalog, &req).is_err());
988 let range = catalog.range(&orders, RangeId::new(1)).unwrap();
990 assert_eq!(range.owner(), &ident("CN=node-a"));
991 assert_eq!(range.epoch(), OwnershipEpoch::initial());
992 assert_eq!(range.version(), CatalogVersion::initial());
993 }
994
995 #[test]
996 fn second_transition_with_stale_cas_loses() {
997 let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b", "CN=node-c"]);
1000 let first = request(TransitionKind::Promote, &orders, "CN=node-a", "CN=node-b");
1001 run_transition(&mut catalog, &first).unwrap();
1002
1003 let second = request(TransitionKind::Promote, &orders, "CN=node-a", "CN=node-c");
1005 let err = run_transition(&mut catalog, &second).unwrap_err();
1006 assert!(matches!(
1007 err,
1008 TransitionError::Rejected(TransitionRejection::OwnerMismatch { .. })
1009 ));
1010 }
1011}