1use super::identity::NodeIdentity;
71use super::ownership::{
72 CollectionId, OwnershipEpoch, RangeId, RangeOwnership, RangeRole, ShardOwnershipCatalog,
73};
74
75#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
83pub struct SupervisorTerm(u64);
84
85impl SupervisorTerm {
86 pub fn genesis() -> Self {
88 Self(1)
89 }
90
91 pub fn new(value: u64) -> Self {
92 Self(value)
93 }
94
95 pub fn value(self) -> u64 {
96 self.0
97 }
98
99 pub fn next(self) -> Self {
101 Self(self.0 + 1)
102 }
103}
104
105impl std::fmt::Display for SupervisorTerm {
106 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107 write!(f, "{}", self.0)
108 }
109}
110
111#[derive(Debug, Clone, PartialEq, Eq)]
122pub struct OwnershipLease {
123 supervisor_term: SupervisorTerm,
124 collection: CollectionId,
125 range_id: RangeId,
126 owner: NodeIdentity,
127 epoch: OwnershipEpoch,
128 granted_at_ms: u64,
129 expires_at_ms: u64,
130}
131
132impl OwnershipLease {
133 #[allow(clippy::too_many_arguments)]
137 pub fn grant(
138 supervisor_term: SupervisorTerm,
139 collection: CollectionId,
140 range_id: RangeId,
141 owner: NodeIdentity,
142 epoch: OwnershipEpoch,
143 granted_at_ms: u64,
144 ttl_ms: u64,
145 ) -> Self {
146 Self {
147 supervisor_term,
148 collection,
149 range_id,
150 owner,
151 epoch,
152 granted_at_ms,
153 expires_at_ms: granted_at_ms.saturating_add(ttl_ms),
154 }
155 }
156
157 pub fn supervisor_term(&self) -> SupervisorTerm {
158 self.supervisor_term
159 }
160
161 pub fn collection(&self) -> &CollectionId {
162 &self.collection
163 }
164
165 pub fn range_id(&self) -> RangeId {
166 self.range_id
167 }
168
169 pub fn owner(&self) -> &NodeIdentity {
170 &self.owner
171 }
172
173 pub fn epoch(&self) -> OwnershipEpoch {
174 self.epoch
175 }
176
177 pub fn granted_at_ms(&self) -> u64 {
178 self.granted_at_ms
179 }
180
181 pub fn expires_at_ms(&self) -> u64 {
182 self.expires_at_ms
183 }
184
185 pub fn is_expired(&self, now_ms: u64) -> bool {
189 now_ms >= self.expires_at_ms
190 }
191
192 pub fn remaining_ms(&self, now_ms: u64) -> u64 {
195 self.expires_at_ms.saturating_sub(now_ms)
196 }
197
198 fn covers(&self, collection: &CollectionId, range_id: RangeId, owner: &NodeIdentity) -> bool {
202 self.collection == *collection && self.range_id == range_id && self.owner == *owner
203 }
204}
205
206#[derive(Debug, Clone, PartialEq, Eq)]
211pub enum FenceReason {
212 Unleased,
215 Revoked,
218 TermSuperseded {
221 lease_term: SupervisorTerm,
222 current_term: SupervisorTerm,
223 },
224 EpochSuperseded {
227 lease_epoch: OwnershipEpoch,
228 current_epoch: OwnershipEpoch,
229 },
230 Expired { now_ms: u64, expires_at_ms: u64 },
234}
235
236impl std::fmt::Display for FenceReason {
237 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
238 match self {
239 Self::Unleased => write!(f, "owner holds no ownership lease"),
240 Self::Revoked => write!(f, "ownership lease was revoked"),
241 Self::TermSuperseded {
242 lease_term,
243 current_term,
244 } => write!(
245 f,
246 "ownership lease granted under supervisor term {lease_term} is behind current term {current_term}"
247 ),
248 Self::EpochSuperseded {
249 lease_epoch,
250 current_epoch,
251 } => write!(
252 f,
253 "ownership lease epoch {lease_epoch} no longer matches current ownership epoch {current_epoch}"
254 ),
255 Self::Expired {
256 now_ms,
257 expires_at_ms,
258 } => write!(
259 f,
260 "ownership lease expired at {expires_at_ms} ms (now {now_ms} ms)"
261 ),
262 }
263 }
264}
265
266impl std::error::Error for FenceReason {}
267
268#[derive(Debug, Clone, PartialEq, Eq)]
270pub enum OwnerWriteMode {
271 Durable,
273 Fenced(FenceReason),
277}
278
279impl OwnerWriteMode {
280 pub fn may_write_durable(&self) -> bool {
282 matches!(self, OwnerWriteMode::Durable)
283 }
284
285 pub fn is_fenced(&self) -> bool {
287 matches!(self, OwnerWriteMode::Fenced(_))
288 }
289}
290
291#[derive(Debug, Clone, Copy, PartialEq, Eq)]
297pub enum RangeRequest {
298 DurableWrite,
300 StaleRead,
302 ReplicationCatchUp,
306}
307
308#[derive(Debug, Clone, PartialEq, Eq)]
310pub struct LeaseFenceRejection {
311 pub reason: FenceReason,
313}
314
315impl std::fmt::Display for LeaseFenceRejection {
316 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
317 write!(
318 f,
319 "durable write rejected: owner is self-fenced ({})",
320 self.reason
321 )
322 }
323}
324
325impl std::error::Error for LeaseFenceRejection {}
326
327#[derive(Debug, Clone, Default, PartialEq, Eq)]
337pub struct LeasedOwner {
338 lease: Option<OwnershipLease>,
339 revoked: bool,
340}
341
342impl LeasedOwner {
343 pub fn unleased() -> Self {
345 Self {
346 lease: None,
347 revoked: false,
348 }
349 }
350
351 pub fn with_lease(lease: OwnershipLease) -> Self {
353 Self {
354 lease: Some(lease),
355 revoked: false,
356 }
357 }
358
359 pub fn grant(&mut self, lease: OwnershipLease) {
362 self.lease = Some(lease);
363 self.revoked = false;
364 }
365
366 pub fn revoke(&mut self) {
370 self.revoked = true;
371 }
372
373 pub fn lease(&self) -> Option<&OwnershipLease> {
377 self.lease.as_ref()
378 }
379
380 pub fn evaluate(
391 &self,
392 current_term: SupervisorTerm,
393 current_epoch: OwnershipEpoch,
394 now_ms: u64,
395 ) -> OwnerWriteMode {
396 if self.revoked {
397 return OwnerWriteMode::Fenced(FenceReason::Revoked);
398 }
399 let Some(lease) = &self.lease else {
400 return OwnerWriteMode::Fenced(FenceReason::Unleased);
401 };
402 if lease.supervisor_term != current_term {
403 return OwnerWriteMode::Fenced(FenceReason::TermSuperseded {
404 lease_term: lease.supervisor_term,
405 current_term,
406 });
407 }
408 if lease.epoch != current_epoch {
409 return OwnerWriteMode::Fenced(FenceReason::EpochSuperseded {
410 lease_epoch: lease.epoch,
411 current_epoch,
412 });
413 }
414 if lease.is_expired(now_ms) {
415 return OwnerWriteMode::Fenced(FenceReason::Expired {
416 now_ms,
417 expires_at_ms: lease.expires_at_ms,
418 });
419 }
420 OwnerWriteMode::Durable
421 }
422
423 pub fn admit_request(
433 &self,
434 request: RangeRequest,
435 current_term: SupervisorTerm,
436 current_epoch: OwnershipEpoch,
437 now_ms: u64,
438 ) -> Result<(), LeaseFenceRejection> {
439 match self.evaluate(current_term, current_epoch, now_ms) {
440 OwnerWriteMode::Durable => Ok(()),
441 OwnerWriteMode::Fenced(reason) => match request {
442 RangeRequest::StaleRead | RangeRequest::ReplicationCatchUp => Ok(()),
443 RangeRequest::DurableWrite => Err(LeaseFenceRejection { reason }),
444 },
445 }
446 }
447}
448
449#[derive(Debug, Clone, PartialEq, Eq)]
452pub enum DurableWriteReject {
453 NoRange { collection: CollectionId },
455 NotOwner {
458 collection: CollectionId,
459 range_id: RangeId,
460 role: RangeRole,
461 owner: NodeIdentity,
462 },
463 Fenced {
466 collection: CollectionId,
467 range_id: RangeId,
468 reason: FenceReason,
469 },
470}
471
472impl std::fmt::Display for DurableWriteReject {
473 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
474 match self {
475 Self::NoRange { collection } => write!(
476 f,
477 "no range of collection {collection} covers the routed key — re-resolve routing"
478 ),
479 Self::NotOwner {
480 collection,
481 range_id,
482 owner,
483 ..
484 } => write!(
485 f,
486 "this node does not own {collection}/{range_id} — route the durable write to {owner}"
487 ),
488 Self::Fenced {
489 collection,
490 range_id,
491 reason,
492 } => write!(
493 f,
494 "owner of {collection}/{range_id} is self-fenced and rejects the durable write: {reason}"
495 ),
496 }
497 }
498}
499
500impl std::error::Error for DurableWriteReject {}
501
502pub fn admit_durable_write<'c>(
517 catalog: &'c ShardOwnershipCatalog,
518 holder: &LeasedOwner,
519 node: &NodeIdentity,
520 collection: &CollectionId,
521 key: &[u8],
522 current_term: SupervisorTerm,
523 now_ms: u64,
524) -> Result<&'c RangeOwnership, DurableWriteReject> {
525 let range =
526 catalog
527 .route_shard_key(collection, key)
528 .ok_or_else(|| DurableWriteReject::NoRange {
529 collection: collection.clone(),
530 })?;
531
532 let role = range.role_of(node);
533 if !role.may_write_public() {
534 return Err(DurableWriteReject::NotOwner {
535 collection: collection.clone(),
536 range_id: range.range_id(),
537 role,
538 owner: range.owner().clone(),
539 });
540 }
541
542 let covered = holder
546 .lease()
547 .is_some_and(|lease| lease.covers(collection, range.range_id(), node));
548
549 let mode = if covered {
550 holder.evaluate(current_term, range.epoch(), now_ms)
551 } else {
552 OwnerWriteMode::Fenced(FenceReason::Unleased)
553 };
554
555 match mode {
556 OwnerWriteMode::Durable => Ok(range),
557 OwnerWriteMode::Fenced(reason) => Err(DurableWriteReject::Fenced {
558 collection: collection.clone(),
559 range_id: range.range_id(),
560 reason,
561 }),
562 }
563}
564
565#[cfg(test)]
566mod tests {
567 use super::*;
568 use crate::cluster::ownership::{PlacementMetadata, RangeBounds, ShardKeyMode};
569
570 fn collection(name: &str) -> CollectionId {
571 CollectionId::new(name).unwrap()
572 }
573
574 fn ident(cn: &str) -> NodeIdentity {
575 NodeIdentity::from_certificate_subject(cn).unwrap()
576 }
577
578 fn catalog_with(owner: &str, replicas: &[&str]) -> (ShardOwnershipCatalog, CollectionId) {
580 let orders = collection("orders");
581 let mut catalog = ShardOwnershipCatalog::new();
582 catalog
583 .apply_update(RangeOwnership::establish(
584 orders.clone(),
585 RangeId::new(1),
586 ShardKeyMode::Hash,
587 RangeBounds::full(),
588 ident(owner),
589 replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
590 PlacementMetadata::with_replication_factor(3),
591 ))
592 .unwrap();
593 (catalog, orders)
594 }
595
596 fn next_epoch() -> OwnershipEpoch {
599 RangeOwnership::establish(
600 collection("orders"),
601 RangeId::new(1),
602 ShardKeyMode::Hash,
603 RangeBounds::full(),
604 ident("CN=node-a"),
605 [ident("CN=node-b")],
606 PlacementMetadata::with_replication_factor(3),
607 )
608 .transfer_to(ident("CN=node-b"), [])
609 .epoch()
610 }
611
612 fn lease_for(orders: &CollectionId, owner: &str, ttl_ms: u64) -> OwnershipLease {
615 OwnershipLease::grant(
616 SupervisorTerm::genesis(),
617 orders.clone(),
618 RangeId::new(1),
619 ident(owner),
620 OwnershipEpoch::initial(),
621 0,
622 ttl_ms,
623 )
624 }
625
626 #[test]
631 fn lease_window_is_half_open() {
632 let orders = collection("orders");
633 let lease = lease_for(&orders, "CN=node-a", 1_000);
634 assert_eq!(lease.granted_at_ms(), 0);
635 assert_eq!(lease.expires_at_ms(), 1_000);
636 assert!(!lease.is_expired(0));
637 assert!(!lease.is_expired(999));
638 assert!(lease.is_expired(1_000));
641 assert!(lease.is_expired(1_001));
642 assert_eq!(lease.remaining_ms(250), 750);
643 assert_eq!(lease.remaining_ms(1_000), 0);
644 assert_eq!(lease.remaining_ms(5_000), 0);
645 }
646
647 #[test]
648 fn lease_binds_term_range_owner_and_epoch() {
649 let orders = collection("orders");
650 let lease = lease_for(&orders, "CN=node-a", 1_000);
651 assert_eq!(lease.supervisor_term(), SupervisorTerm::genesis());
652 assert_eq!(lease.collection(), &orders);
653 assert_eq!(lease.range_id(), RangeId::new(1));
654 assert_eq!(lease.owner(), &ident("CN=node-a"));
655 assert_eq!(lease.epoch(), OwnershipEpoch::initial());
656 }
657
658 #[test]
663 fn valid_lease_authorises_durable_writes() {
664 let orders = collection("orders");
665 let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
666 let mode = owner.evaluate(SupervisorTerm::genesis(), OwnershipEpoch::initial(), 500);
667 assert_eq!(mode, OwnerWriteMode::Durable);
668 assert!(mode.may_write_durable());
669 assert!(!mode.is_fenced());
670 }
671
672 #[test]
673 fn unleased_owner_is_fenced() {
674 let owner = LeasedOwner::unleased();
675 let mode = owner.evaluate(SupervisorTerm::genesis(), OwnershipEpoch::initial(), 0);
676 assert_eq!(mode, OwnerWriteMode::Fenced(FenceReason::Unleased));
677 }
678
679 #[test]
680 fn expired_lease_self_fences() {
681 let orders = collection("orders");
682 let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
683 let mode = owner.evaluate(SupervisorTerm::genesis(), OwnershipEpoch::initial(), 1_500);
686 match mode {
687 OwnerWriteMode::Fenced(FenceReason::Expired {
688 now_ms,
689 expires_at_ms,
690 }) => {
691 assert_eq!(now_ms, 1_500);
692 assert_eq!(expires_at_ms, 1_000);
693 }
694 other => panic!("expected Expired fence, got {other:?}"),
695 }
696 }
697
698 #[test]
699 fn epoch_mismatch_self_fences() {
700 let orders = collection("orders");
701 let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
703 let current_epoch = next_epoch();
704 let mode = owner.evaluate(SupervisorTerm::genesis(), current_epoch, 500);
705 match mode {
706 OwnerWriteMode::Fenced(FenceReason::EpochSuperseded {
707 lease_epoch,
708 current_epoch: reported,
709 }) => {
710 assert_eq!(lease_epoch, OwnershipEpoch::initial());
711 assert_eq!(reported, current_epoch);
712 }
713 other => panic!("expected EpochSuperseded fence, got {other:?}"),
714 }
715 }
716
717 #[test]
718 fn supervisor_term_advance_self_fences() {
719 let orders = collection("orders");
720 let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
721 let current_term = SupervisorTerm::genesis().next();
724 let mode = owner.evaluate(current_term, OwnershipEpoch::initial(), 500);
725 match mode {
726 OwnerWriteMode::Fenced(FenceReason::TermSuperseded {
727 lease_term,
728 current_term: reported,
729 }) => {
730 assert_eq!(lease_term, SupervisorTerm::genesis());
731 assert_eq!(reported, current_term);
732 }
733 other => panic!("expected TermSuperseded fence, got {other:?}"),
734 }
735 }
736
737 #[test]
738 fn revoked_lease_self_fences_before_expiry() {
739 let orders = collection("orders");
740 let mut owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
741 owner.revoke();
742 let mode = owner.evaluate(SupervisorTerm::genesis(), OwnershipEpoch::initial(), 100);
744 assert_eq!(mode, OwnerWriteMode::Fenced(FenceReason::Revoked));
745 }
746
747 #[test]
748 fn revoke_takes_precedence_over_other_causes() {
749 let orders = collection("orders");
752 let mut owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
753 owner.revoke();
754 let mode = owner.evaluate(SupervisorTerm::genesis().next(), next_epoch(), 10_000);
755 assert_eq!(mode, OwnerWriteMode::Fenced(FenceReason::Revoked));
756 }
757
758 #[test]
759 fn renewing_a_lease_clears_a_prior_revoke_and_extends_window() {
760 let orders = collection("orders");
761 let mut owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
762 owner.revoke();
763 assert!(owner
764 .evaluate(SupervisorTerm::genesis(), OwnershipEpoch::initial(), 100)
765 .is_fenced());
766 owner.grant(OwnershipLease::grant(
769 SupervisorTerm::genesis(),
770 orders.clone(),
771 RangeId::new(1),
772 ident("CN=node-a"),
773 OwnershipEpoch::initial(),
774 900,
775 1_000,
776 ));
777 let mode = owner.evaluate(SupervisorTerm::genesis(), OwnershipEpoch::initial(), 1_500);
778 assert_eq!(mode, OwnerWriteMode::Durable);
779 }
780
781 #[test]
786 fn valid_lease_admits_every_request_kind() {
787 let orders = collection("orders");
788 let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
789 for req in [
790 RangeRequest::DurableWrite,
791 RangeRequest::StaleRead,
792 RangeRequest::ReplicationCatchUp,
793 ] {
794 assert!(owner
795 .admit_request(
796 req,
797 SupervisorTerm::genesis(),
798 OwnershipEpoch::initial(),
799 500
800 )
801 .is_ok());
802 }
803 }
804
805 #[test]
806 fn self_fenced_read_mode_serves_reads_and_catch_up_but_rejects_durable_writes() {
807 let orders = collection("orders");
808 let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
809 let now = 2_000;
811 let term = SupervisorTerm::genesis();
812 let epoch = OwnershipEpoch::initial();
813
814 assert!(owner
816 .admit_request(RangeRequest::StaleRead, term, epoch, now)
817 .is_ok());
818 assert!(owner
819 .admit_request(RangeRequest::ReplicationCatchUp, term, epoch, now)
820 .is_ok());
821
822 let err = owner
824 .admit_request(RangeRequest::DurableWrite, term, epoch, now)
825 .unwrap_err();
826 assert!(matches!(err.reason, FenceReason::Expired { .. }));
827 assert!(err.to_string().contains("self-fenced"));
828 }
829
830 #[test]
831 fn unleased_owner_rejects_durable_write_but_still_catches_up() {
832 let owner = LeasedOwner::unleased();
833 let term = SupervisorTerm::genesis();
834 let epoch = OwnershipEpoch::initial();
835 assert_eq!(
836 owner
837 .admit_request(RangeRequest::DurableWrite, term, epoch, 0)
838 .unwrap_err()
839 .reason,
840 FenceReason::Unleased
841 );
842 assert!(owner
845 .admit_request(RangeRequest::ReplicationCatchUp, term, epoch, 0)
846 .is_ok());
847 }
848
849 #[test]
854 fn durable_write_admitted_for_leased_owner() {
855 let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
856 let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
857 let range = admit_durable_write(
858 &catalog,
859 &owner,
860 &ident("CN=node-a"),
861 &orders,
862 b"k",
863 SupervisorTerm::genesis(),
864 500,
865 )
866 .expect("leased owner at current term/epoch may write");
867 assert_eq!(range.owner(), &ident("CN=node-a"));
868 assert_eq!(range.range_id(), RangeId::new(1));
869 }
870
871 #[test]
872 fn durable_write_rejected_for_catalog_owner_without_a_lease() {
873 let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
876 let owner = LeasedOwner::unleased();
877 let err = admit_durable_write(
878 &catalog,
879 &owner,
880 &ident("CN=node-a"),
881 &orders,
882 b"k",
883 SupervisorTerm::genesis(),
884 0,
885 )
886 .unwrap_err();
887 match err {
888 DurableWriteReject::Fenced { reason, .. } => assert_eq!(reason, FenceReason::Unleased),
889 other => panic!("expected Fenced(Unleased), got {other:?}"),
890 }
891 }
892
893 #[test]
894 fn durable_write_rejected_for_non_owner_before_lease_is_even_consulted() {
895 let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
896 let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-b", 1_000));
899 let err = admit_durable_write(
900 &catalog,
901 &owner,
902 &ident("CN=node-b"),
903 &orders,
904 b"k",
905 SupervisorTerm::genesis(),
906 500,
907 )
908 .unwrap_err();
909 match err {
910 DurableWriteReject::NotOwner { role, owner, .. } => {
911 assert_eq!(role, RangeRole::Replica);
912 assert_eq!(owner, ident("CN=node-a"));
913 }
914 other => panic!("expected NotOwner, got {other:?}"),
915 }
916 }
917
918 #[test]
919 fn durable_write_rejected_when_no_range_covers_the_key() {
920 let catalog = ShardOwnershipCatalog::new();
921 let orders = collection("orders");
922 let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
923 let err = admit_durable_write(
924 &catalog,
925 &owner,
926 &ident("CN=node-a"),
927 &orders,
928 b"k",
929 SupervisorTerm::genesis(),
930 500,
931 )
932 .unwrap_err();
933 assert!(matches!(err, DurableWriteReject::NoRange { .. }));
934 }
935
936 #[test]
937 fn durable_write_fenced_when_lease_epoch_trails_the_catalog() {
938 let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
942 let stale_lease = lease_for(&orders, "CN=node-a", 100_000);
943
944 let v1 = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
945 let v2 = v1.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]);
946 catalog.apply_update(v2.clone()).unwrap();
947 let v3 = v2.transfer_to(ident("CN=node-a"), [ident("CN=node-b")]);
948 catalog.apply_update(v3).unwrap();
949
950 let owner = LeasedOwner::with_lease(stale_lease);
951 let current_epoch = catalog.range(&orders, RangeId::new(1)).unwrap().epoch();
952 assert_eq!(current_epoch.value(), 3);
953
954 let err = admit_durable_write(
955 &catalog,
956 &owner,
957 &ident("CN=node-a"),
958 &orders,
959 b"k",
960 SupervisorTerm::genesis(),
961 500,
962 )
963 .unwrap_err();
964 match err {
965 DurableWriteReject::Fenced {
966 reason: FenceReason::EpochSuperseded { lease_epoch, .. },
967 ..
968 } => assert_eq!(lease_epoch, OwnershipEpoch::initial()),
969 other => panic!("expected Fenced(EpochSuperseded), got {other:?}"),
970 }
971 }
972
973 #[test]
974 fn durable_write_fenced_when_lease_is_for_a_different_range() {
975 let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
976 let wrong_range_lease = OwnershipLease::grant(
978 SupervisorTerm::genesis(),
979 orders.clone(),
980 RangeId::new(2),
981 ident("CN=node-a"),
982 OwnershipEpoch::initial(),
983 0,
984 1_000,
985 );
986 let owner = LeasedOwner::with_lease(wrong_range_lease);
987 let err = admit_durable_write(
988 &catalog,
989 &owner,
990 &ident("CN=node-a"),
991 &orders,
992 b"k",
993 SupervisorTerm::genesis(),
994 500,
995 )
996 .unwrap_err();
997 match err {
999 DurableWriteReject::Fenced { reason, .. } => assert_eq!(reason, FenceReason::Unleased),
1000 other => panic!("expected Fenced(Unleased), got {other:?}"),
1001 }
1002 }
1003
1004 #[test]
1005 fn durable_write_rejected_after_self_fence_then_restored_on_renewal() {
1006 let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
1010 let mut owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
1011 let term = SupervisorTerm::genesis();
1012
1013 assert!(admit_durable_write(
1015 &catalog,
1016 &owner,
1017 &ident("CN=node-a"),
1018 &orders,
1019 b"k",
1020 term,
1021 500
1022 )
1023 .is_ok());
1024 let err = admit_durable_write(
1026 &catalog,
1027 &owner,
1028 &ident("CN=node-a"),
1029 &orders,
1030 b"k",
1031 term,
1032 2_000,
1033 )
1034 .unwrap_err();
1035 assert!(matches!(
1036 err,
1037 DurableWriteReject::Fenced {
1038 reason: FenceReason::Expired { .. },
1039 ..
1040 }
1041 ));
1042 owner.grant(OwnershipLease::grant(
1044 term,
1045 orders.clone(),
1046 RangeId::new(1),
1047 ident("CN=node-a"),
1048 OwnershipEpoch::initial(),
1049 2_000,
1050 1_000,
1051 ));
1052 assert!(admit_durable_write(
1053 &catalog,
1054 &owner,
1055 &ident("CN=node-a"),
1056 &orders,
1057 b"k",
1058 term,
1059 2_500
1060 )
1061 .is_ok());
1062 }
1063}