use super::identity::NodeIdentity;
use super::ownership::{
CollectionId, OwnershipEpoch, RangeId, RangeOwnership, RangeRole, ShardOwnershipCatalog,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct SupervisorTerm(u64);
impl SupervisorTerm {
pub fn genesis() -> Self {
Self(1)
}
pub fn new(value: u64) -> Self {
Self(value)
}
pub fn value(self) -> u64 {
self.0
}
pub fn next(self) -> Self {
Self(self.0 + 1)
}
}
impl std::fmt::Display for SupervisorTerm {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct OwnershipLease {
supervisor_term: SupervisorTerm,
collection: CollectionId,
range_id: RangeId,
owner: NodeIdentity,
epoch: OwnershipEpoch,
granted_at_ms: u64,
expires_at_ms: u64,
}
impl OwnershipLease {
#[allow(clippy::too_many_arguments)]
pub fn grant(
supervisor_term: SupervisorTerm,
collection: CollectionId,
range_id: RangeId,
owner: NodeIdentity,
epoch: OwnershipEpoch,
granted_at_ms: u64,
ttl_ms: u64,
) -> Self {
Self {
supervisor_term,
collection,
range_id,
owner,
epoch,
granted_at_ms,
expires_at_ms: granted_at_ms.saturating_add(ttl_ms),
}
}
pub fn supervisor_term(&self) -> SupervisorTerm {
self.supervisor_term
}
pub fn collection(&self) -> &CollectionId {
&self.collection
}
pub fn range_id(&self) -> RangeId {
self.range_id
}
pub fn owner(&self) -> &NodeIdentity {
&self.owner
}
pub fn epoch(&self) -> OwnershipEpoch {
self.epoch
}
pub fn granted_at_ms(&self) -> u64 {
self.granted_at_ms
}
pub fn expires_at_ms(&self) -> u64 {
self.expires_at_ms
}
pub fn is_expired(&self, now_ms: u64) -> bool {
now_ms >= self.expires_at_ms
}
pub fn remaining_ms(&self, now_ms: u64) -> u64 {
self.expires_at_ms.saturating_sub(now_ms)
}
fn covers(&self, collection: &CollectionId, range_id: RangeId, owner: &NodeIdentity) -> bool {
self.collection == *collection && self.range_id == range_id && self.owner == *owner
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FenceReason {
Unleased,
Revoked,
TermSuperseded {
lease_term: SupervisorTerm,
current_term: SupervisorTerm,
},
EpochSuperseded {
lease_epoch: OwnershipEpoch,
current_epoch: OwnershipEpoch,
},
Expired { now_ms: u64, expires_at_ms: u64 },
}
impl std::fmt::Display for FenceReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Unleased => write!(f, "owner holds no ownership lease"),
Self::Revoked => write!(f, "ownership lease was revoked"),
Self::TermSuperseded {
lease_term,
current_term,
} => write!(
f,
"ownership lease granted under supervisor term {lease_term} is behind current term {current_term}"
),
Self::EpochSuperseded {
lease_epoch,
current_epoch,
} => write!(
f,
"ownership lease epoch {lease_epoch} no longer matches current ownership epoch {current_epoch}"
),
Self::Expired {
now_ms,
expires_at_ms,
} => write!(
f,
"ownership lease expired at {expires_at_ms} ms (now {now_ms} ms)"
),
}
}
}
impl std::error::Error for FenceReason {}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum OwnerWriteMode {
Durable,
Fenced(FenceReason),
}
impl OwnerWriteMode {
pub fn may_write_durable(&self) -> bool {
matches!(self, OwnerWriteMode::Durable)
}
pub fn is_fenced(&self) -> bool {
matches!(self, OwnerWriteMode::Fenced(_))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RangeRequest {
DurableWrite,
StaleRead,
ReplicationCatchUp,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct LeaseFenceRejection {
pub reason: FenceReason,
}
impl std::fmt::Display for LeaseFenceRejection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"durable write rejected: owner is self-fenced ({})",
self.reason
)
}
}
impl std::error::Error for LeaseFenceRejection {}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct LeasedOwner {
lease: Option<OwnershipLease>,
revoked: bool,
}
impl LeasedOwner {
pub fn unleased() -> Self {
Self {
lease: None,
revoked: false,
}
}
pub fn with_lease(lease: OwnershipLease) -> Self {
Self {
lease: Some(lease),
revoked: false,
}
}
pub fn grant(&mut self, lease: OwnershipLease) {
self.lease = Some(lease);
self.revoked = false;
}
pub fn revoke(&mut self) {
self.revoked = true;
}
pub fn lease(&self) -> Option<&OwnershipLease> {
self.lease.as_ref()
}
pub fn evaluate(
&self,
current_term: SupervisorTerm,
current_epoch: OwnershipEpoch,
now_ms: u64,
) -> OwnerWriteMode {
if self.revoked {
return OwnerWriteMode::Fenced(FenceReason::Revoked);
}
let Some(lease) = &self.lease else {
return OwnerWriteMode::Fenced(FenceReason::Unleased);
};
if lease.supervisor_term != current_term {
return OwnerWriteMode::Fenced(FenceReason::TermSuperseded {
lease_term: lease.supervisor_term,
current_term,
});
}
if lease.epoch != current_epoch {
return OwnerWriteMode::Fenced(FenceReason::EpochSuperseded {
lease_epoch: lease.epoch,
current_epoch,
});
}
if lease.is_expired(now_ms) {
return OwnerWriteMode::Fenced(FenceReason::Expired {
now_ms,
expires_at_ms: lease.expires_at_ms,
});
}
OwnerWriteMode::Durable
}
pub fn admit_request(
&self,
request: RangeRequest,
current_term: SupervisorTerm,
current_epoch: OwnershipEpoch,
now_ms: u64,
) -> Result<(), LeaseFenceRejection> {
match self.evaluate(current_term, current_epoch, now_ms) {
OwnerWriteMode::Durable => Ok(()),
OwnerWriteMode::Fenced(reason) => match request {
RangeRequest::StaleRead | RangeRequest::ReplicationCatchUp => Ok(()),
RangeRequest::DurableWrite => Err(LeaseFenceRejection { reason }),
},
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DurableWriteReject {
NoRange { collection: CollectionId },
NotOwner {
collection: CollectionId,
range_id: RangeId,
role: RangeRole,
owner: NodeIdentity,
},
Fenced {
collection: CollectionId,
range_id: RangeId,
reason: FenceReason,
},
}
impl std::fmt::Display for DurableWriteReject {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::NoRange { collection } => write!(
f,
"no range of collection {collection} covers the routed key — re-resolve routing"
),
Self::NotOwner {
collection,
range_id,
owner,
..
} => write!(
f,
"this node does not own {collection}/{range_id} — route the durable write to {owner}"
),
Self::Fenced {
collection,
range_id,
reason,
} => write!(
f,
"owner of {collection}/{range_id} is self-fenced and rejects the durable write: {reason}"
),
}
}
}
impl std::error::Error for DurableWriteReject {}
pub fn admit_durable_write<'c>(
catalog: &'c ShardOwnershipCatalog,
holder: &LeasedOwner,
node: &NodeIdentity,
collection: &CollectionId,
key: &[u8],
current_term: SupervisorTerm,
now_ms: u64,
) -> Result<&'c RangeOwnership, DurableWriteReject> {
let range =
catalog
.route_shard_key(collection, key)
.ok_or_else(|| DurableWriteReject::NoRange {
collection: collection.clone(),
})?;
let role = range.role_of(node);
if !role.may_write_public() {
return Err(DurableWriteReject::NotOwner {
collection: collection.clone(),
range_id: range.range_id(),
role,
owner: range.owner().clone(),
});
}
let covered = holder
.lease()
.is_some_and(|lease| lease.covers(collection, range.range_id(), node));
let mode = if covered {
holder.evaluate(current_term, range.epoch(), now_ms)
} else {
OwnerWriteMode::Fenced(FenceReason::Unleased)
};
match mode {
OwnerWriteMode::Durable => Ok(range),
OwnerWriteMode::Fenced(reason) => Err(DurableWriteReject::Fenced {
collection: collection.clone(),
range_id: range.range_id(),
reason,
}),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cluster::ownership::{PlacementMetadata, RangeBounds, ShardKeyMode};
fn collection(name: &str) -> CollectionId {
CollectionId::new(name).unwrap()
}
fn ident(cn: &str) -> NodeIdentity {
NodeIdentity::from_certificate_subject(cn).unwrap()
}
fn catalog_with(owner: &str, replicas: &[&str]) -> (ShardOwnershipCatalog, CollectionId) {
let orders = collection("orders");
let mut catalog = ShardOwnershipCatalog::new();
catalog
.apply_update(RangeOwnership::establish(
orders.clone(),
RangeId::new(1),
ShardKeyMode::Hash,
RangeBounds::full(),
ident(owner),
replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
PlacementMetadata::with_replication_factor(3),
))
.unwrap();
(catalog, orders)
}
fn next_epoch() -> OwnershipEpoch {
RangeOwnership::establish(
collection("orders"),
RangeId::new(1),
ShardKeyMode::Hash,
RangeBounds::full(),
ident("CN=node-a"),
[ident("CN=node-b")],
PlacementMetadata::with_replication_factor(3),
)
.transfer_to(ident("CN=node-b"), [])
.epoch()
}
fn lease_for(orders: &CollectionId, owner: &str, ttl_ms: u64) -> OwnershipLease {
OwnershipLease::grant(
SupervisorTerm::genesis(),
orders.clone(),
RangeId::new(1),
ident(owner),
OwnershipEpoch::initial(),
0,
ttl_ms,
)
}
#[test]
fn lease_window_is_half_open() {
let orders = collection("orders");
let lease = lease_for(&orders, "CN=node-a", 1_000);
assert_eq!(lease.granted_at_ms(), 0);
assert_eq!(lease.expires_at_ms(), 1_000);
assert!(!lease.is_expired(0));
assert!(!lease.is_expired(999));
assert!(lease.is_expired(1_000));
assert!(lease.is_expired(1_001));
assert_eq!(lease.remaining_ms(250), 750);
assert_eq!(lease.remaining_ms(1_000), 0);
assert_eq!(lease.remaining_ms(5_000), 0);
}
#[test]
fn lease_binds_term_range_owner_and_epoch() {
let orders = collection("orders");
let lease = lease_for(&orders, "CN=node-a", 1_000);
assert_eq!(lease.supervisor_term(), SupervisorTerm::genesis());
assert_eq!(lease.collection(), &orders);
assert_eq!(lease.range_id(), RangeId::new(1));
assert_eq!(lease.owner(), &ident("CN=node-a"));
assert_eq!(lease.epoch(), OwnershipEpoch::initial());
}
#[test]
fn valid_lease_authorises_durable_writes() {
let orders = collection("orders");
let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
let mode = owner.evaluate(SupervisorTerm::genesis(), OwnershipEpoch::initial(), 500);
assert_eq!(mode, OwnerWriteMode::Durable);
assert!(mode.may_write_durable());
assert!(!mode.is_fenced());
}
#[test]
fn unleased_owner_is_fenced() {
let owner = LeasedOwner::unleased();
let mode = owner.evaluate(SupervisorTerm::genesis(), OwnershipEpoch::initial(), 0);
assert_eq!(mode, OwnerWriteMode::Fenced(FenceReason::Unleased));
}
#[test]
fn expired_lease_self_fences() {
let orders = collection("orders");
let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
let mode = owner.evaluate(SupervisorTerm::genesis(), OwnershipEpoch::initial(), 1_500);
match mode {
OwnerWriteMode::Fenced(FenceReason::Expired {
now_ms,
expires_at_ms,
}) => {
assert_eq!(now_ms, 1_500);
assert_eq!(expires_at_ms, 1_000);
}
other => panic!("expected Expired fence, got {other:?}"),
}
}
#[test]
fn epoch_mismatch_self_fences() {
let orders = collection("orders");
let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
let current_epoch = next_epoch();
let mode = owner.evaluate(SupervisorTerm::genesis(), current_epoch, 500);
match mode {
OwnerWriteMode::Fenced(FenceReason::EpochSuperseded {
lease_epoch,
current_epoch: reported,
}) => {
assert_eq!(lease_epoch, OwnershipEpoch::initial());
assert_eq!(reported, current_epoch);
}
other => panic!("expected EpochSuperseded fence, got {other:?}"),
}
}
#[test]
fn supervisor_term_advance_self_fences() {
let orders = collection("orders");
let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
let current_term = SupervisorTerm::genesis().next();
let mode = owner.evaluate(current_term, OwnershipEpoch::initial(), 500);
match mode {
OwnerWriteMode::Fenced(FenceReason::TermSuperseded {
lease_term,
current_term: reported,
}) => {
assert_eq!(lease_term, SupervisorTerm::genesis());
assert_eq!(reported, current_term);
}
other => panic!("expected TermSuperseded fence, got {other:?}"),
}
}
#[test]
fn revoked_lease_self_fences_before_expiry() {
let orders = collection("orders");
let mut owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
owner.revoke();
let mode = owner.evaluate(SupervisorTerm::genesis(), OwnershipEpoch::initial(), 100);
assert_eq!(mode, OwnerWriteMode::Fenced(FenceReason::Revoked));
}
#[test]
fn revoke_takes_precedence_over_other_causes() {
let orders = collection("orders");
let mut owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
owner.revoke();
let mode = owner.evaluate(SupervisorTerm::genesis().next(), next_epoch(), 10_000);
assert_eq!(mode, OwnerWriteMode::Fenced(FenceReason::Revoked));
}
#[test]
fn renewing_a_lease_clears_a_prior_revoke_and_extends_window() {
let orders = collection("orders");
let mut owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
owner.revoke();
assert!(owner
.evaluate(SupervisorTerm::genesis(), OwnershipEpoch::initial(), 100)
.is_fenced());
owner.grant(OwnershipLease::grant(
SupervisorTerm::genesis(),
orders.clone(),
RangeId::new(1),
ident("CN=node-a"),
OwnershipEpoch::initial(),
900,
1_000,
));
let mode = owner.evaluate(SupervisorTerm::genesis(), OwnershipEpoch::initial(), 1_500);
assert_eq!(mode, OwnerWriteMode::Durable);
}
#[test]
fn valid_lease_admits_every_request_kind() {
let orders = collection("orders");
let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
for req in [
RangeRequest::DurableWrite,
RangeRequest::StaleRead,
RangeRequest::ReplicationCatchUp,
] {
assert!(owner
.admit_request(
req,
SupervisorTerm::genesis(),
OwnershipEpoch::initial(),
500
)
.is_ok());
}
}
#[test]
fn self_fenced_read_mode_serves_reads_and_catch_up_but_rejects_durable_writes() {
let orders = collection("orders");
let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
let now = 2_000;
let term = SupervisorTerm::genesis();
let epoch = OwnershipEpoch::initial();
assert!(owner
.admit_request(RangeRequest::StaleRead, term, epoch, now)
.is_ok());
assert!(owner
.admit_request(RangeRequest::ReplicationCatchUp, term, epoch, now)
.is_ok());
let err = owner
.admit_request(RangeRequest::DurableWrite, term, epoch, now)
.unwrap_err();
assert!(matches!(err.reason, FenceReason::Expired { .. }));
assert!(err.to_string().contains("self-fenced"));
}
#[test]
fn unleased_owner_rejects_durable_write_but_still_catches_up() {
let owner = LeasedOwner::unleased();
let term = SupervisorTerm::genesis();
let epoch = OwnershipEpoch::initial();
assert_eq!(
owner
.admit_request(RangeRequest::DurableWrite, term, epoch, 0)
.unwrap_err()
.reason,
FenceReason::Unleased
);
assert!(owner
.admit_request(RangeRequest::ReplicationCatchUp, term, epoch, 0)
.is_ok());
}
#[test]
fn durable_write_admitted_for_leased_owner() {
let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
let range = admit_durable_write(
&catalog,
&owner,
&ident("CN=node-a"),
&orders,
b"k",
SupervisorTerm::genesis(),
500,
)
.expect("leased owner at current term/epoch may write");
assert_eq!(range.owner(), &ident("CN=node-a"));
assert_eq!(range.range_id(), RangeId::new(1));
}
#[test]
fn durable_write_rejected_for_catalog_owner_without_a_lease() {
let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let owner = LeasedOwner::unleased();
let err = admit_durable_write(
&catalog,
&owner,
&ident("CN=node-a"),
&orders,
b"k",
SupervisorTerm::genesis(),
0,
)
.unwrap_err();
match err {
DurableWriteReject::Fenced { reason, .. } => assert_eq!(reason, FenceReason::Unleased),
other => panic!("expected Fenced(Unleased), got {other:?}"),
}
}
#[test]
fn durable_write_rejected_for_non_owner_before_lease_is_even_consulted() {
let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-b", 1_000));
let err = admit_durable_write(
&catalog,
&owner,
&ident("CN=node-b"),
&orders,
b"k",
SupervisorTerm::genesis(),
500,
)
.unwrap_err();
match err {
DurableWriteReject::NotOwner { role, owner, .. } => {
assert_eq!(role, RangeRole::Replica);
assert_eq!(owner, ident("CN=node-a"));
}
other => panic!("expected NotOwner, got {other:?}"),
}
}
#[test]
fn durable_write_rejected_when_no_range_covers_the_key() {
let catalog = ShardOwnershipCatalog::new();
let orders = collection("orders");
let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
let err = admit_durable_write(
&catalog,
&owner,
&ident("CN=node-a"),
&orders,
b"k",
SupervisorTerm::genesis(),
500,
)
.unwrap_err();
assert!(matches!(err, DurableWriteReject::NoRange { .. }));
}
#[test]
fn durable_write_fenced_when_lease_epoch_trails_the_catalog() {
let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let stale_lease = lease_for(&orders, "CN=node-a", 100_000);
let v1 = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
let v2 = v1.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]);
catalog.apply_update(v2.clone()).unwrap();
let v3 = v2.transfer_to(ident("CN=node-a"), [ident("CN=node-b")]);
catalog.apply_update(v3).unwrap();
let owner = LeasedOwner::with_lease(stale_lease);
let current_epoch = catalog.range(&orders, RangeId::new(1)).unwrap().epoch();
assert_eq!(current_epoch.value(), 3);
let err = admit_durable_write(
&catalog,
&owner,
&ident("CN=node-a"),
&orders,
b"k",
SupervisorTerm::genesis(),
500,
)
.unwrap_err();
match err {
DurableWriteReject::Fenced {
reason: FenceReason::EpochSuperseded { lease_epoch, .. },
..
} => assert_eq!(lease_epoch, OwnershipEpoch::initial()),
other => panic!("expected Fenced(EpochSuperseded), got {other:?}"),
}
}
#[test]
fn durable_write_fenced_when_lease_is_for_a_different_range() {
let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let wrong_range_lease = OwnershipLease::grant(
SupervisorTerm::genesis(),
orders.clone(),
RangeId::new(2),
ident("CN=node-a"),
OwnershipEpoch::initial(),
0,
1_000,
);
let owner = LeasedOwner::with_lease(wrong_range_lease);
let err = admit_durable_write(
&catalog,
&owner,
&ident("CN=node-a"),
&orders,
b"k",
SupervisorTerm::genesis(),
500,
)
.unwrap_err();
match err {
DurableWriteReject::Fenced { reason, .. } => assert_eq!(reason, FenceReason::Unleased),
other => panic!("expected Fenced(Unleased), got {other:?}"),
}
}
#[test]
fn durable_write_rejected_after_self_fence_then_restored_on_renewal() {
let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let mut owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
let term = SupervisorTerm::genesis();
assert!(admit_durable_write(
&catalog,
&owner,
&ident("CN=node-a"),
&orders,
b"k",
term,
500
)
.is_ok());
let err = admit_durable_write(
&catalog,
&owner,
&ident("CN=node-a"),
&orders,
b"k",
term,
2_000,
)
.unwrap_err();
assert!(matches!(
err,
DurableWriteReject::Fenced {
reason: FenceReason::Expired { .. },
..
}
));
owner.grant(OwnershipLease::grant(
term,
orders.clone(),
RangeId::new(1),
ident("CN=node-a"),
OwnershipEpoch::initial(),
2_000,
1_000,
));
assert!(admit_durable_write(
&catalog,
&owner,
&ident("CN=node-a"),
&orders,
b"k",
term,
2_500
)
.is_ok());
}
}