1use super::identity::NodeIdentity;
71use super::ownership::{
72 CollectionId, OwnershipEpoch, RangeId, RangeOwnership, RangeRole, ShardOwnershipCatalog,
73};
74
75#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
83pub struct SupervisorTerm(u64);
84
85impl SupervisorTerm {
86 pub fn genesis() -> Self {
88 Self(1)
89 }
90
91 pub fn new(value: u64) -> Self {
92 Self(value)
93 }
94
95 pub fn value(self) -> u64 {
96 self.0
97 }
98
99 pub fn next(self) -> Self {
101 Self(self.0 + 1)
102 }
103}
104
105impl std::fmt::Display for SupervisorTerm {
106 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107 write!(f, "{}", self.0)
108 }
109}
110
111#[derive(Debug, Clone, PartialEq, Eq)]
122pub struct OwnershipLease {
123 supervisor_term: SupervisorTerm,
124 collection: CollectionId,
125 range_id: RangeId,
126 owner: NodeIdentity,
127 epoch: OwnershipEpoch,
128 granted_at_ms: u64,
129 expires_at_ms: u64,
130}
131
132impl OwnershipLease {
133 #[allow(clippy::too_many_arguments)]
137 pub fn grant(
138 supervisor_term: SupervisorTerm,
139 collection: CollectionId,
140 range_id: RangeId,
141 owner: NodeIdentity,
142 epoch: OwnershipEpoch,
143 granted_at_ms: u64,
144 ttl_ms: u64,
145 ) -> Self {
146 Self {
147 supervisor_term,
148 collection,
149 range_id,
150 owner,
151 epoch,
152 granted_at_ms,
153 expires_at_ms: granted_at_ms.saturating_add(ttl_ms),
154 }
155 }
156
157 pub fn supervisor_term(&self) -> SupervisorTerm {
158 self.supervisor_term
159 }
160
161 pub fn collection(&self) -> &CollectionId {
162 &self.collection
163 }
164
165 pub fn range_id(&self) -> RangeId {
166 self.range_id
167 }
168
169 pub fn owner(&self) -> &NodeIdentity {
170 &self.owner
171 }
172
173 pub fn epoch(&self) -> OwnershipEpoch {
174 self.epoch
175 }
176
177 pub fn granted_at_ms(&self) -> u64 {
178 self.granted_at_ms
179 }
180
181 pub fn expires_at_ms(&self) -> u64 {
182 self.expires_at_ms
183 }
184
185 pub fn is_expired(&self, now_ms: u64) -> bool {
189 now_ms >= self.expires_at_ms
190 }
191
192 pub fn remaining_ms(&self, now_ms: u64) -> u64 {
195 self.expires_at_ms.saturating_sub(now_ms)
196 }
197
198 fn covers(&self, collection: &CollectionId, range_id: RangeId, owner: &NodeIdentity) -> bool {
202 self.collection == *collection && self.range_id == range_id && self.owner == *owner
203 }
204}
205
206#[derive(Debug, Clone, PartialEq, Eq)]
211pub enum FenceReason {
212 Unleased,
215 Revoked,
218 TermSuperseded {
221 lease_term: SupervisorTerm,
222 current_term: SupervisorTerm,
223 },
224 EpochSuperseded {
227 lease_epoch: OwnershipEpoch,
228 current_epoch: OwnershipEpoch,
229 },
230 Expired { now_ms: u64, expires_at_ms: u64 },
234}
235
236impl std::fmt::Display for FenceReason {
237 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
238 match self {
239 Self::Unleased => write!(f, "owner holds no ownership lease"),
240 Self::Revoked => write!(f, "ownership lease was revoked"),
241 Self::TermSuperseded {
242 lease_term,
243 current_term,
244 } => write!(
245 f,
246 "ownership lease granted under supervisor term {lease_term} is behind current term {current_term}"
247 ),
248 Self::EpochSuperseded {
249 lease_epoch,
250 current_epoch,
251 } => write!(
252 f,
253 "ownership lease epoch {lease_epoch} no longer matches current ownership epoch {current_epoch}"
254 ),
255 Self::Expired {
256 now_ms,
257 expires_at_ms,
258 } => write!(
259 f,
260 "ownership lease expired at {expires_at_ms} ms (now {now_ms} ms)"
261 ),
262 }
263 }
264}
265
266impl std::error::Error for FenceReason {}
267
268#[derive(Debug, Clone, PartialEq, Eq)]
270pub enum OwnerWriteMode {
271 Durable,
273 Fenced(FenceReason),
277}
278
279impl OwnerWriteMode {
280 pub fn may_write_durable(&self) -> bool {
282 matches!(self, OwnerWriteMode::Durable)
283 }
284
285 pub fn is_fenced(&self) -> bool {
287 matches!(self, OwnerWriteMode::Fenced(_))
288 }
289}
290
291#[derive(Debug, Clone, Copy, PartialEq, Eq)]
297pub enum RangeRequest {
298 DurableWrite,
300 StaleRead,
302 ReplicationCatchUp,
306}
307
308#[derive(Debug, Clone, PartialEq, Eq)]
310pub struct LeaseFenceRejection {
311 pub reason: FenceReason,
313}
314
315impl std::fmt::Display for LeaseFenceRejection {
316 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
317 write!(
318 f,
319 "durable write rejected: owner is self-fenced ({})",
320 self.reason
321 )
322 }
323}
324
325impl std::error::Error for LeaseFenceRejection {}
326
327#[derive(Debug, Clone, Default, PartialEq, Eq)]
337pub struct LeasedOwner {
338 lease: Option<OwnershipLease>,
339 revoked: bool,
340}
341
342impl LeasedOwner {
343 pub fn unleased() -> Self {
345 Self {
346 lease: None,
347 revoked: false,
348 }
349 }
350
351 pub fn with_lease(lease: OwnershipLease) -> Self {
353 Self {
354 lease: Some(lease),
355 revoked: false,
356 }
357 }
358
359 pub fn grant(&mut self, lease: OwnershipLease) {
362 self.lease = Some(lease);
363 self.revoked = false;
364 }
365
366 pub fn revoke(&mut self) {
370 self.revoked = true;
371 }
372
373 pub fn lease(&self) -> Option<&OwnershipLease> {
377 self.lease.as_ref()
378 }
379
380 pub fn evaluate(
391 &self,
392 current_term: SupervisorTerm,
393 current_epoch: OwnershipEpoch,
394 now_ms: u64,
395 ) -> OwnerWriteMode {
396 if self.revoked {
397 return OwnerWriteMode::Fenced(FenceReason::Revoked);
398 }
399 let Some(lease) = &self.lease else {
400 return OwnerWriteMode::Fenced(FenceReason::Unleased);
401 };
402 if lease.supervisor_term != current_term {
403 return OwnerWriteMode::Fenced(FenceReason::TermSuperseded {
404 lease_term: lease.supervisor_term,
405 current_term,
406 });
407 }
408 if lease.epoch != current_epoch {
409 return OwnerWriteMode::Fenced(FenceReason::EpochSuperseded {
410 lease_epoch: lease.epoch,
411 current_epoch,
412 });
413 }
414 if lease.is_expired(now_ms) {
415 return OwnerWriteMode::Fenced(FenceReason::Expired {
416 now_ms,
417 expires_at_ms: lease.expires_at_ms,
418 });
419 }
420 OwnerWriteMode::Durable
421 }
422
423 pub fn admit_request(
433 &self,
434 request: RangeRequest,
435 current_term: SupervisorTerm,
436 current_epoch: OwnershipEpoch,
437 now_ms: u64,
438 ) -> Result<(), LeaseFenceRejection> {
439 match self.evaluate(current_term, current_epoch, now_ms) {
440 OwnerWriteMode::Durable => Ok(()),
441 OwnerWriteMode::Fenced(reason) => match request {
442 RangeRequest::StaleRead | RangeRequest::ReplicationCatchUp => Ok(()),
443 RangeRequest::DurableWrite => Err(LeaseFenceRejection { reason }),
444 },
445 }
446 }
447}
448
449#[derive(Debug, Clone, PartialEq, Eq)]
452pub enum DurableWriteReject {
453 NoRange { collection: CollectionId },
455 NotOwner {
458 collection: CollectionId,
459 range_id: RangeId,
460 role: RangeRole,
461 owner: NodeIdentity,
462 },
463 Fenced {
466 collection: CollectionId,
467 range_id: RangeId,
468 reason: FenceReason,
469 },
470}
471
472impl std::fmt::Display for DurableWriteReject {
473 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
474 match self {
475 Self::NoRange { collection } => write!(
476 f,
477 "no range of collection {collection} covers the routed key — re-resolve routing"
478 ),
479 Self::NotOwner {
480 collection,
481 range_id,
482 owner,
483 ..
484 } => write!(
485 f,
486 "this node does not own {collection}/{range_id} — route the durable write to {owner}"
487 ),
488 Self::Fenced {
489 collection,
490 range_id,
491 reason,
492 } => write!(
493 f,
494 "owner of {collection}/{range_id} is self-fenced and rejects the durable write: {reason}"
495 ),
496 }
497 }
498}
499
500impl std::error::Error for DurableWriteReject {}
501
502pub fn admit_durable_write<'c>(
517 catalog: &'c ShardOwnershipCatalog,
518 holder: &LeasedOwner,
519 node: &NodeIdentity,
520 collection: &CollectionId,
521 key: &[u8],
522 current_term: SupervisorTerm,
523 now_ms: u64,
524) -> Result<&'c RangeOwnership, DurableWriteReject> {
525 let range = catalog
526 .route(collection, key)
527 .ok_or_else(|| DurableWriteReject::NoRange {
528 collection: collection.clone(),
529 })?;
530
531 let role = range.role_of(node);
532 if !role.may_write_public() {
533 return Err(DurableWriteReject::NotOwner {
534 collection: collection.clone(),
535 range_id: range.range_id(),
536 role,
537 owner: range.owner().clone(),
538 });
539 }
540
541 let covered = holder
545 .lease()
546 .is_some_and(|lease| lease.covers(collection, range.range_id(), node));
547
548 let mode = if covered {
549 holder.evaluate(current_term, range.epoch(), now_ms)
550 } else {
551 OwnerWriteMode::Fenced(FenceReason::Unleased)
552 };
553
554 match mode {
555 OwnerWriteMode::Durable => Ok(range),
556 OwnerWriteMode::Fenced(reason) => Err(DurableWriteReject::Fenced {
557 collection: collection.clone(),
558 range_id: range.range_id(),
559 reason,
560 }),
561 }
562}
563
564#[cfg(test)]
565mod tests {
566 use super::*;
567 use crate::cluster::ownership::{PlacementMetadata, RangeBounds, ShardKeyMode};
568
569 fn collection(name: &str) -> CollectionId {
570 CollectionId::new(name).unwrap()
571 }
572
573 fn ident(cn: &str) -> NodeIdentity {
574 NodeIdentity::from_certificate_subject(cn).unwrap()
575 }
576
577 fn catalog_with(owner: &str, replicas: &[&str]) -> (ShardOwnershipCatalog, CollectionId) {
579 let orders = collection("orders");
580 let mut catalog = ShardOwnershipCatalog::new();
581 catalog
582 .apply_update(RangeOwnership::establish(
583 orders.clone(),
584 RangeId::new(1),
585 ShardKeyMode::Hash,
586 RangeBounds::full(),
587 ident(owner),
588 replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
589 PlacementMetadata::with_replication_factor(3),
590 ))
591 .unwrap();
592 (catalog, orders)
593 }
594
595 fn next_epoch() -> OwnershipEpoch {
598 RangeOwnership::establish(
599 collection("orders"),
600 RangeId::new(1),
601 ShardKeyMode::Hash,
602 RangeBounds::full(),
603 ident("CN=node-a"),
604 [ident("CN=node-b")],
605 PlacementMetadata::with_replication_factor(3),
606 )
607 .transfer_to(ident("CN=node-b"), [])
608 .epoch()
609 }
610
611 fn lease_for(orders: &CollectionId, owner: &str, ttl_ms: u64) -> OwnershipLease {
614 OwnershipLease::grant(
615 SupervisorTerm::genesis(),
616 orders.clone(),
617 RangeId::new(1),
618 ident(owner),
619 OwnershipEpoch::initial(),
620 0,
621 ttl_ms,
622 )
623 }
624
625 #[test]
630 fn lease_window_is_half_open() {
631 let orders = collection("orders");
632 let lease = lease_for(&orders, "CN=node-a", 1_000);
633 assert_eq!(lease.granted_at_ms(), 0);
634 assert_eq!(lease.expires_at_ms(), 1_000);
635 assert!(!lease.is_expired(0));
636 assert!(!lease.is_expired(999));
637 assert!(lease.is_expired(1_000));
640 assert!(lease.is_expired(1_001));
641 assert_eq!(lease.remaining_ms(250), 750);
642 assert_eq!(lease.remaining_ms(1_000), 0);
643 assert_eq!(lease.remaining_ms(5_000), 0);
644 }
645
646 #[test]
647 fn lease_binds_term_range_owner_and_epoch() {
648 let orders = collection("orders");
649 let lease = lease_for(&orders, "CN=node-a", 1_000);
650 assert_eq!(lease.supervisor_term(), SupervisorTerm::genesis());
651 assert_eq!(lease.collection(), &orders);
652 assert_eq!(lease.range_id(), RangeId::new(1));
653 assert_eq!(lease.owner(), &ident("CN=node-a"));
654 assert_eq!(lease.epoch(), OwnershipEpoch::initial());
655 }
656
657 #[test]
662 fn valid_lease_authorises_durable_writes() {
663 let orders = collection("orders");
664 let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
665 let mode = owner.evaluate(SupervisorTerm::genesis(), OwnershipEpoch::initial(), 500);
666 assert_eq!(mode, OwnerWriteMode::Durable);
667 assert!(mode.may_write_durable());
668 assert!(!mode.is_fenced());
669 }
670
671 #[test]
672 fn unleased_owner_is_fenced() {
673 let owner = LeasedOwner::unleased();
674 let mode = owner.evaluate(SupervisorTerm::genesis(), OwnershipEpoch::initial(), 0);
675 assert_eq!(mode, OwnerWriteMode::Fenced(FenceReason::Unleased));
676 }
677
678 #[test]
679 fn expired_lease_self_fences() {
680 let orders = collection("orders");
681 let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
682 let mode = owner.evaluate(SupervisorTerm::genesis(), OwnershipEpoch::initial(), 1_500);
685 match mode {
686 OwnerWriteMode::Fenced(FenceReason::Expired {
687 now_ms,
688 expires_at_ms,
689 }) => {
690 assert_eq!(now_ms, 1_500);
691 assert_eq!(expires_at_ms, 1_000);
692 }
693 other => panic!("expected Expired fence, got {other:?}"),
694 }
695 }
696
697 #[test]
698 fn epoch_mismatch_self_fences() {
699 let orders = collection("orders");
700 let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
702 let current_epoch = next_epoch();
703 let mode = owner.evaluate(SupervisorTerm::genesis(), current_epoch, 500);
704 match mode {
705 OwnerWriteMode::Fenced(FenceReason::EpochSuperseded {
706 lease_epoch,
707 current_epoch: reported,
708 }) => {
709 assert_eq!(lease_epoch, OwnershipEpoch::initial());
710 assert_eq!(reported, current_epoch);
711 }
712 other => panic!("expected EpochSuperseded fence, got {other:?}"),
713 }
714 }
715
716 #[test]
717 fn supervisor_term_advance_self_fences() {
718 let orders = collection("orders");
719 let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
720 let current_term = SupervisorTerm::genesis().next();
723 let mode = owner.evaluate(current_term, OwnershipEpoch::initial(), 500);
724 match mode {
725 OwnerWriteMode::Fenced(FenceReason::TermSuperseded {
726 lease_term,
727 current_term: reported,
728 }) => {
729 assert_eq!(lease_term, SupervisorTerm::genesis());
730 assert_eq!(reported, current_term);
731 }
732 other => panic!("expected TermSuperseded fence, got {other:?}"),
733 }
734 }
735
736 #[test]
737 fn revoked_lease_self_fences_before_expiry() {
738 let orders = collection("orders");
739 let mut owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
740 owner.revoke();
741 let mode = owner.evaluate(SupervisorTerm::genesis(), OwnershipEpoch::initial(), 100);
743 assert_eq!(mode, OwnerWriteMode::Fenced(FenceReason::Revoked));
744 }
745
746 #[test]
747 fn revoke_takes_precedence_over_other_causes() {
748 let orders = collection("orders");
751 let mut owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
752 owner.revoke();
753 let mode = owner.evaluate(SupervisorTerm::genesis().next(), next_epoch(), 10_000);
754 assert_eq!(mode, OwnerWriteMode::Fenced(FenceReason::Revoked));
755 }
756
757 #[test]
758 fn renewing_a_lease_clears_a_prior_revoke_and_extends_window() {
759 let orders = collection("orders");
760 let mut owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
761 owner.revoke();
762 assert!(owner
763 .evaluate(SupervisorTerm::genesis(), OwnershipEpoch::initial(), 100)
764 .is_fenced());
765 owner.grant(OwnershipLease::grant(
768 SupervisorTerm::genesis(),
769 orders.clone(),
770 RangeId::new(1),
771 ident("CN=node-a"),
772 OwnershipEpoch::initial(),
773 900,
774 1_000,
775 ));
776 let mode = owner.evaluate(SupervisorTerm::genesis(), OwnershipEpoch::initial(), 1_500);
777 assert_eq!(mode, OwnerWriteMode::Durable);
778 }
779
780 #[test]
785 fn valid_lease_admits_every_request_kind() {
786 let orders = collection("orders");
787 let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
788 for req in [
789 RangeRequest::DurableWrite,
790 RangeRequest::StaleRead,
791 RangeRequest::ReplicationCatchUp,
792 ] {
793 assert!(owner
794 .admit_request(
795 req,
796 SupervisorTerm::genesis(),
797 OwnershipEpoch::initial(),
798 500
799 )
800 .is_ok());
801 }
802 }
803
804 #[test]
805 fn self_fenced_read_mode_serves_reads_and_catch_up_but_rejects_durable_writes() {
806 let orders = collection("orders");
807 let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
808 let now = 2_000;
810 let term = SupervisorTerm::genesis();
811 let epoch = OwnershipEpoch::initial();
812
813 assert!(owner
815 .admit_request(RangeRequest::StaleRead, term, epoch, now)
816 .is_ok());
817 assert!(owner
818 .admit_request(RangeRequest::ReplicationCatchUp, term, epoch, now)
819 .is_ok());
820
821 let err = owner
823 .admit_request(RangeRequest::DurableWrite, term, epoch, now)
824 .unwrap_err();
825 assert!(matches!(err.reason, FenceReason::Expired { .. }));
826 assert!(err.to_string().contains("self-fenced"));
827 }
828
829 #[test]
830 fn unleased_owner_rejects_durable_write_but_still_catches_up() {
831 let owner = LeasedOwner::unleased();
832 let term = SupervisorTerm::genesis();
833 let epoch = OwnershipEpoch::initial();
834 assert_eq!(
835 owner
836 .admit_request(RangeRequest::DurableWrite, term, epoch, 0)
837 .unwrap_err()
838 .reason,
839 FenceReason::Unleased
840 );
841 assert!(owner
844 .admit_request(RangeRequest::ReplicationCatchUp, term, epoch, 0)
845 .is_ok());
846 }
847
848 #[test]
853 fn durable_write_admitted_for_leased_owner() {
854 let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
855 let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
856 let range = admit_durable_write(
857 &catalog,
858 &owner,
859 &ident("CN=node-a"),
860 &orders,
861 b"k",
862 SupervisorTerm::genesis(),
863 500,
864 )
865 .expect("leased owner at current term/epoch may write");
866 assert_eq!(range.owner(), &ident("CN=node-a"));
867 assert_eq!(range.range_id(), RangeId::new(1));
868 }
869
870 #[test]
871 fn durable_write_rejected_for_catalog_owner_without_a_lease() {
872 let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
875 let owner = LeasedOwner::unleased();
876 let err = admit_durable_write(
877 &catalog,
878 &owner,
879 &ident("CN=node-a"),
880 &orders,
881 b"k",
882 SupervisorTerm::genesis(),
883 0,
884 )
885 .unwrap_err();
886 match err {
887 DurableWriteReject::Fenced { reason, .. } => assert_eq!(reason, FenceReason::Unleased),
888 other => panic!("expected Fenced(Unleased), got {other:?}"),
889 }
890 }
891
892 #[test]
893 fn durable_write_rejected_for_non_owner_before_lease_is_even_consulted() {
894 let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
895 let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-b", 1_000));
898 let err = admit_durable_write(
899 &catalog,
900 &owner,
901 &ident("CN=node-b"),
902 &orders,
903 b"k",
904 SupervisorTerm::genesis(),
905 500,
906 )
907 .unwrap_err();
908 match err {
909 DurableWriteReject::NotOwner { role, owner, .. } => {
910 assert_eq!(role, RangeRole::Replica);
911 assert_eq!(owner, ident("CN=node-a"));
912 }
913 other => panic!("expected NotOwner, got {other:?}"),
914 }
915 }
916
917 #[test]
918 fn durable_write_rejected_when_no_range_covers_the_key() {
919 let catalog = ShardOwnershipCatalog::new();
920 let orders = collection("orders");
921 let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
922 let err = admit_durable_write(
923 &catalog,
924 &owner,
925 &ident("CN=node-a"),
926 &orders,
927 b"k",
928 SupervisorTerm::genesis(),
929 500,
930 )
931 .unwrap_err();
932 assert!(matches!(err, DurableWriteReject::NoRange { .. }));
933 }
934
935 #[test]
936 fn durable_write_fenced_when_lease_epoch_trails_the_catalog() {
937 let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
941 let stale_lease = lease_for(&orders, "CN=node-a", 100_000);
942
943 let v1 = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
944 let v2 = v1.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]);
945 catalog.apply_update(v2.clone()).unwrap();
946 let v3 = v2.transfer_to(ident("CN=node-a"), [ident("CN=node-b")]);
947 catalog.apply_update(v3).unwrap();
948
949 let owner = LeasedOwner::with_lease(stale_lease);
950 let current_epoch = catalog.range(&orders, RangeId::new(1)).unwrap().epoch();
951 assert_eq!(current_epoch.value(), 3);
952
953 let err = admit_durable_write(
954 &catalog,
955 &owner,
956 &ident("CN=node-a"),
957 &orders,
958 b"k",
959 SupervisorTerm::genesis(),
960 500,
961 )
962 .unwrap_err();
963 match err {
964 DurableWriteReject::Fenced {
965 reason: FenceReason::EpochSuperseded { lease_epoch, .. },
966 ..
967 } => assert_eq!(lease_epoch, OwnershipEpoch::initial()),
968 other => panic!("expected Fenced(EpochSuperseded), got {other:?}"),
969 }
970 }
971
972 #[test]
973 fn durable_write_fenced_when_lease_is_for_a_different_range() {
974 let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
975 let wrong_range_lease = OwnershipLease::grant(
977 SupervisorTerm::genesis(),
978 orders.clone(),
979 RangeId::new(2),
980 ident("CN=node-a"),
981 OwnershipEpoch::initial(),
982 0,
983 1_000,
984 );
985 let owner = LeasedOwner::with_lease(wrong_range_lease);
986 let err = admit_durable_write(
987 &catalog,
988 &owner,
989 &ident("CN=node-a"),
990 &orders,
991 b"k",
992 SupervisorTerm::genesis(),
993 500,
994 )
995 .unwrap_err();
996 match err {
998 DurableWriteReject::Fenced { reason, .. } => assert_eq!(reason, FenceReason::Unleased),
999 other => panic!("expected Fenced(Unleased), got {other:?}"),
1000 }
1001 }
1002
1003 #[test]
1004 fn durable_write_rejected_after_self_fence_then_restored_on_renewal() {
1005 let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
1009 let mut owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
1010 let term = SupervisorTerm::genesis();
1011
1012 assert!(admit_durable_write(
1014 &catalog,
1015 &owner,
1016 &ident("CN=node-a"),
1017 &orders,
1018 b"k",
1019 term,
1020 500
1021 )
1022 .is_ok());
1023 let err = admit_durable_write(
1025 &catalog,
1026 &owner,
1027 &ident("CN=node-a"),
1028 &orders,
1029 b"k",
1030 term,
1031 2_000,
1032 )
1033 .unwrap_err();
1034 assert!(matches!(
1035 err,
1036 DurableWriteReject::Fenced {
1037 reason: FenceReason::Expired { .. },
1038 ..
1039 }
1040 ));
1041 owner.grant(OwnershipLease::grant(
1043 term,
1044 orders.clone(),
1045 RangeId::new(1),
1046 ident("CN=node-a"),
1047 OwnershipEpoch::initial(),
1048 2_000,
1049 1_000,
1050 ));
1051 assert!(admit_durable_write(
1052 &catalog,
1053 &owner,
1054 &ident("CN=node-a"),
1055 &orders,
1056 b"k",
1057 term,
1058 2_500
1059 )
1060 .is_ok());
1061 }
1062}