use std::collections::BTreeMap;
use super::identity::NodeIdentity;
use super::slot::hash_shard_key_to_range_key;
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct CollectionId(String);
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CollectionIdError;
impl std::fmt::Display for CollectionIdError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "collection id is empty")
}
}
impl std::error::Error for CollectionIdError {}
impl CollectionId {
pub fn new(value: impl AsRef<str>) -> Result<Self, CollectionIdError> {
let value = value.as_ref().trim();
if value.is_empty() {
return Err(CollectionIdError);
}
Ok(Self(value.to_string()))
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl std::fmt::Display for CollectionId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct RangeId(u64);
impl RangeId {
pub fn new(value: u64) -> Self {
Self(value)
}
pub fn value(self) -> u64 {
self.0
}
}
impl std::fmt::Display for RangeId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum ShardKeyMode {
#[default]
Hash,
Ordered,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RangeBound {
Min,
Key(Vec<u8>),
Max,
}
impl RangeBound {
pub fn key(bytes: impl Into<Vec<u8>>) -> Self {
RangeBound::Key(bytes.into())
}
fn position(&self) -> Position<'_> {
match self {
RangeBound::Min => Position::Min,
RangeBound::Key(k) => Position::Key(k),
RangeBound::Max => Position::Max,
}
}
}
#[derive(PartialEq, Eq, PartialOrd, Ord)]
enum Position<'a> {
Min,
Key(&'a [u8]),
Max,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RangeBounds {
lower: RangeBound,
upper: RangeBound,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RangeBoundsError;
impl std::fmt::Display for RangeBoundsError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"range lower bound must be strictly below the upper bound"
)
}
}
impl std::error::Error for RangeBoundsError {}
impl RangeBounds {
pub fn new(lower: RangeBound, upper: RangeBound) -> Result<Self, RangeBoundsError> {
if lower.position() >= upper.position() {
return Err(RangeBoundsError);
}
Ok(Self { lower, upper })
}
pub fn full() -> Self {
Self {
lower: RangeBound::Min,
upper: RangeBound::Max,
}
}
pub fn lower(&self) -> &RangeBound {
&self.lower
}
pub fn upper(&self) -> &RangeBound {
&self.upper
}
pub fn contains(&self, key: &[u8]) -> bool {
let key = Position::Key(key);
self.lower.position() <= key && key < self.upper.position()
}
pub fn overlaps(&self, other: &RangeBounds) -> bool {
self.lower.position() < other.upper.position()
&& other.lower.position() < self.upper.position()
}
pub fn split_at(&self, at: &[u8]) -> Result<(RangeBounds, RangeBounds), RangeBoundsError> {
let at_pos = Position::Key(at);
if at_pos <= self.lower.position() || at_pos >= self.upper.position() {
return Err(RangeBoundsError);
}
let lower = RangeBounds {
lower: self.lower.clone(),
upper: RangeBound::key(at.to_vec()),
};
let upper = RangeBounds {
lower: RangeBound::key(at.to_vec()),
upper: self.upper.clone(),
};
Ok((lower, upper))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct CatalogVersion(u64);
impl CatalogVersion {
pub fn initial() -> Self {
Self(1)
}
pub fn value(self) -> u64 {
self.0
}
fn next(self) -> Self {
Self(self.0 + 1)
}
}
impl std::fmt::Display for CatalogVersion {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct OwnershipEpoch(u64);
impl OwnershipEpoch {
pub fn initial() -> Self {
Self(1)
}
pub fn value(self) -> u64 {
self.0
}
fn next(self) -> Self {
Self(self.0 + 1)
}
}
impl std::fmt::Display for OwnershipEpoch {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct PlacementMetadata {
replication_factor: usize,
attributes: BTreeMap<String, String>,
}
impl PlacementMetadata {
pub fn with_replication_factor(replication_factor: usize) -> Self {
Self {
replication_factor,
attributes: BTreeMap::new(),
}
}
pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.attributes.insert(key.into(), value.into());
self
}
pub fn replication_factor(&self) -> usize {
self.replication_factor
}
pub fn attribute(&self, key: &str) -> Option<&str> {
self.attributes.get(key).map(String::as_str)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RangeOwnership {
collection: CollectionId,
range_id: RangeId,
shard_key_mode: ShardKeyMode,
bounds: RangeBounds,
owner: NodeIdentity,
replicas: Vec<NodeIdentity>,
epoch: OwnershipEpoch,
version: CatalogVersion,
placement: PlacementMetadata,
}
impl RangeOwnership {
#[allow(clippy::too_many_arguments)]
pub fn establish(
collection: CollectionId,
range_id: RangeId,
shard_key_mode: ShardKeyMode,
bounds: RangeBounds,
owner: NodeIdentity,
replicas: impl IntoIterator<Item = NodeIdentity>,
placement: PlacementMetadata,
) -> Self {
Self {
collection,
range_id,
shard_key_mode,
bounds,
owner,
replicas: replicas.into_iter().collect(),
epoch: OwnershipEpoch::initial(),
version: CatalogVersion::initial(),
placement,
}
}
pub fn collection(&self) -> &CollectionId {
&self.collection
}
pub fn range_id(&self) -> RangeId {
self.range_id
}
pub fn shard_key_mode(&self) -> ShardKeyMode {
self.shard_key_mode
}
pub fn bounds(&self) -> &RangeBounds {
&self.bounds
}
pub fn owner(&self) -> &NodeIdentity {
&self.owner
}
pub fn replicas(&self) -> &[NodeIdentity] {
&self.replicas
}
pub fn epoch(&self) -> OwnershipEpoch {
self.epoch
}
pub fn version(&self) -> CatalogVersion {
self.version
}
pub fn placement(&self) -> &PlacementMetadata {
&self.placement
}
fn key(&self) -> (CollectionId, RangeId) {
(self.collection.clone(), self.range_id)
}
pub fn transfer_to(
&self,
new_owner: NodeIdentity,
new_replicas: impl IntoIterator<Item = NodeIdentity>,
) -> Self {
Self {
owner: new_owner,
replicas: new_replicas.into_iter().collect(),
epoch: self.epoch.next(),
version: self.version.next(),
..self.clone()
}
}
pub fn update_replicas(&self, new_replicas: impl IntoIterator<Item = NodeIdentity>) -> Self {
Self {
replicas: new_replicas.into_iter().collect(),
version: self.version.next(),
..self.clone()
}
}
pub fn update_placement(&self, placement: PlacementMetadata) -> Self {
Self {
placement,
version: self.version.next(),
..self.clone()
}
}
pub fn with_bounds(&self, bounds: RangeBounds) -> Self {
Self {
bounds,
version: self.version.next(),
..self.clone()
}
}
pub fn role_of(&self, node: &NodeIdentity) -> RangeRole {
if self.owner == *node {
RangeRole::Owner
} else if self.replicas.iter().any(|replica| replica == node) {
RangeRole::Replica
} else {
RangeRole::NoCopy
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RangeRole {
Owner,
Replica,
NoCopy,
}
impl RangeRole {
pub fn may_write_public(self) -> bool {
matches!(self, RangeRole::Owner)
}
fn label(self) -> &'static str {
match self {
RangeRole::Owner => "owner",
RangeRole::Replica => "replica",
RangeRole::NoCopy => "no-copy",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum UpdateOutcome {
Created,
Updated,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CatalogError {
StaleVersion {
collection: CollectionId,
range_id: RangeId,
current: CatalogVersion,
attempted: CatalogVersion,
},
ShardKeyModeMismatch {
collection: CollectionId,
declared: ShardKeyMode,
attempted: ShardKeyMode,
},
OverlappingRange {
collection: CollectionId,
existing: RangeId,
attempted: RangeId,
},
}
impl std::fmt::Display for CatalogError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::StaleVersion {
collection,
range_id,
current,
attempted,
} => write!(
f,
"stale catalog update for {collection}/{range_id}: current version {current}, attempted {attempted}"
),
Self::ShardKeyModeMismatch {
collection,
declared,
attempted,
} => write!(
f,
"collection {collection} is declared {declared:?} but range uses {attempted:?}"
),
Self::OverlappingRange {
collection,
existing,
attempted,
} => write!(
f,
"range {attempted} overlaps existing range {existing} of collection {collection}"
),
}
}
}
impl std::error::Error for CatalogError {}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RangeWriteReject {
NoRange { collection: CollectionId },
NotOwner {
collection: CollectionId,
range_id: RangeId,
role: RangeRole,
owner: NodeIdentity,
},
StaleEpoch {
collection: CollectionId,
range_id: RangeId,
expected: OwnershipEpoch,
current: OwnershipEpoch,
},
}
impl std::fmt::Display for RangeWriteReject {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::NoRange { collection } => write!(
f,
"no range of collection {collection} covers the routed key — re-resolve routing"
),
Self::NotOwner {
collection,
range_id,
role,
owner,
} => write!(
f,
"this node is {} of {collection}/{range_id}, not its owner — route the write to {owner}",
role.label()
),
Self::StaleEpoch {
collection,
range_id,
expected,
current,
} => write!(
f,
"stale ownership epoch for {collection}/{range_id}: write authorised under epoch {expected}, current is {current}"
),
}
}
}
impl std::error::Error for RangeWriteReject {}
#[derive(Debug, Clone, Default)]
pub struct ShardOwnershipCatalog {
collections: BTreeMap<CollectionId, ShardKeyMode>,
ranges: BTreeMap<(CollectionId, RangeId), RangeOwnership>,
}
impl ShardOwnershipCatalog {
pub fn new() -> Self {
Self::default()
}
pub fn declare_collection(
&mut self,
collection: CollectionId,
mode: ShardKeyMode,
) -> Result<(), CatalogError> {
match self.collections.get(&collection) {
Some(&declared) if declared != mode => Err(CatalogError::ShardKeyModeMismatch {
collection,
declared,
attempted: mode,
}),
_ => {
self.collections.insert(collection, mode);
Ok(())
}
}
}
pub fn shard_key_mode(&self, collection: &CollectionId) -> Option<ShardKeyMode> {
self.collections.get(collection).copied()
}
pub fn apply_update(&mut self, entry: RangeOwnership) -> Result<UpdateOutcome, CatalogError> {
match self.collections.get(entry.collection()) {
Some(&declared) if declared != entry.shard_key_mode() => {
return Err(CatalogError::ShardKeyModeMismatch {
collection: entry.collection().clone(),
declared,
attempted: entry.shard_key_mode(),
});
}
_ => {}
}
let key = entry.key();
match self.ranges.get(&key) {
Some(current) => {
if entry.version() <= current.version() {
return Err(CatalogError::StaleVersion {
collection: entry.collection().clone(),
range_id: entry.range_id(),
current: current.version(),
attempted: entry.version(),
});
}
if let Some(existing) = self.overlapping_sibling(&entry) {
return Err(CatalogError::OverlappingRange {
collection: entry.collection().clone(),
existing,
attempted: entry.range_id(),
});
}
self.collections
.insert(entry.collection().clone(), entry.shard_key_mode());
self.ranges.insert(key, entry);
Ok(UpdateOutcome::Updated)
}
None => {
if let Some(existing) = self.overlapping_sibling(&entry) {
return Err(CatalogError::OverlappingRange {
collection: entry.collection().clone(),
existing,
attempted: entry.range_id(),
});
}
self.collections
.insert(entry.collection().clone(), entry.shard_key_mode());
self.ranges.insert(key, entry);
Ok(UpdateOutcome::Created)
}
}
}
fn overlapping_sibling(&self, entry: &RangeOwnership) -> Option<RangeId> {
self.ranges_for(entry.collection())
.find(|range| {
range.range_id() != entry.range_id() && range.bounds().overlaps(entry.bounds())
})
.map(RangeOwnership::range_id)
}
pub fn range(&self, collection: &CollectionId, range_id: RangeId) -> Option<&RangeOwnership> {
self.ranges.get(&(collection.clone(), range_id))
}
pub fn ranges_for<'a>(
&'a self,
collection: &CollectionId,
) -> impl Iterator<Item = &'a RangeOwnership> {
let collection = collection.clone();
self.ranges
.iter()
.filter(move |((c, _), _)| *c == collection)
.map(|(_, r)| r)
}
pub fn route(&self, collection: &CollectionId, key: &[u8]) -> Option<&RangeOwnership> {
self.ranges_for(collection)
.find(|r| r.bounds().contains(key))
}
pub fn route_shard_key(
&self,
collection: &CollectionId,
shard_key: &[u8],
) -> Option<&RangeOwnership> {
match self.shard_key_mode(collection)? {
ShardKeyMode::Ordered => self.route(collection, shard_key),
ShardKeyMode::Hash => {
let range_key = hash_shard_key_to_range_key(shard_key);
self.route(collection, &range_key)
}
}
}
pub fn role_at(
&self,
node: &NodeIdentity,
collection: &CollectionId,
range_id: RangeId,
) -> Option<RangeRole> {
self.range(collection, range_id)
.map(|range| range.role_of(node))
}
pub fn admit_public_write(
&self,
node: &NodeIdentity,
collection: &CollectionId,
key: &[u8],
expected_epoch: OwnershipEpoch,
) -> Result<&RangeOwnership, RangeWriteReject> {
let range =
self.route_shard_key(collection, key)
.ok_or_else(|| RangeWriteReject::NoRange {
collection: collection.clone(),
})?;
let role = range.role_of(node);
if !role.may_write_public() {
return Err(RangeWriteReject::NotOwner {
collection: collection.clone(),
range_id: range.range_id(),
role,
owner: range.owner().clone(),
});
}
if expected_epoch != range.epoch() {
return Err(RangeWriteReject::StaleEpoch {
collection: collection.clone(),
range_id: range.range_id(),
expected: expected_epoch,
current: range.epoch(),
});
}
Ok(range)
}
pub fn range_count(&self) -> usize {
self.ranges.len()
}
pub fn entries(&self) -> impl Iterator<Item = &RangeOwnership> {
self.ranges.values()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn collection(name: &str) -> CollectionId {
CollectionId::new(name).unwrap()
}
fn ident(cn: &str) -> NodeIdentity {
NodeIdentity::from_certificate_subject(cn).unwrap()
}
fn bounds(lower: &[u8], upper: &[u8]) -> RangeBounds {
RangeBounds::new(RangeBound::key(lower), RangeBound::key(upper)).unwrap()
}
fn hash_range(coll: &CollectionId, id: u64, bnds: RangeBounds, owner: &str) -> RangeOwnership {
RangeOwnership::establish(
coll.clone(),
RangeId::new(id),
ShardKeyMode::Hash,
bnds,
ident(owner),
[ident("CN=replica-1")],
PlacementMetadata::with_replication_factor(3),
)
}
fn single_hash_slot_bounds(key: &[u8]) -> RangeBounds {
let slot = super::super::slot::hash_shard_key_to_slot(key);
let lower = RangeBound::key(slot.range_key());
let upper = match slot.value().checked_add(1) {
Some(next) if next < super::super::slot::PRODUCTION_HASH_SLOT_COUNT => {
RangeBound::key(super::super::slot::HashSlot::new(next).unwrap().range_key())
}
_ => RangeBound::Max,
};
RangeBounds::new(lower, upper).unwrap()
}
#[test]
fn empty_catalog_creation() {
let catalog = ShardOwnershipCatalog::new();
assert_eq!(catalog.range_count(), 0);
assert!(catalog.shard_key_mode(&collection("orders")).is_none());
}
#[test]
fn hash_is_the_default_shard_key_mode() {
assert_eq!(ShardKeyMode::default(), ShardKeyMode::Hash);
let mut catalog = ShardOwnershipCatalog::new();
let orders = collection("orders");
catalog
.apply_update(hash_range(&orders, 1, RangeBounds::full(), "CN=node-a"))
.unwrap();
assert_eq!(catalog.shard_key_mode(&orders), Some(ShardKeyMode::Hash));
}
#[test]
fn hash_range_entry_routes_to_owner() {
let mut catalog = ShardOwnershipCatalog::new();
let orders = collection("orders");
catalog
.apply_update(hash_range(
&orders,
1,
RangeBounds::new(RangeBound::Min, RangeBound::key([0x80])).unwrap(),
"CN=node-a",
))
.unwrap();
catalog
.apply_update(hash_range(
&orders,
2,
RangeBounds::new(RangeBound::key([0x80]), RangeBound::Max).unwrap(),
"CN=node-b",
))
.unwrap();
assert_eq!(
catalog.route(&orders, &[0x10]).unwrap().owner(),
&ident("CN=node-a")
);
assert_eq!(
catalog.route(&orders, &[0x80]).unwrap().owner(),
&ident("CN=node-b")
);
assert_eq!(
catalog.route(&orders, &[0xff]).unwrap().owner(),
&ident("CN=node-b")
);
let r = catalog.route(&orders, &[0x10]).unwrap();
assert_eq!(r.replicas(), &[ident("CN=replica-1")]);
assert_eq!(r.epoch(), OwnershipEpoch::initial());
}
#[test]
fn hash_mode_routes_logical_shard_key_through_hash_slot() {
let mut catalog = ShardOwnershipCatalog::new();
let orders = collection("orders");
let key = b"tenant:42";
catalog
.apply_update(hash_range(
&orders,
1,
single_hash_slot_bounds(key),
"CN=node-a",
))
.unwrap();
let routed = catalog
.route_shard_key(&orders, key)
.expect("hash slot range covers the logical shard key");
assert_eq!(routed.owner(), &ident("CN=node-a"));
}
#[test]
fn ordered_mode_can_be_declared_and_routed() {
let mut catalog = ShardOwnershipCatalog::new();
let events = collection("events");
catalog
.declare_collection(events.clone(), ShardKeyMode::Ordered)
.unwrap();
assert_eq!(catalog.shard_key_mode(&events), Some(ShardKeyMode::Ordered));
catalog
.apply_update(RangeOwnership::establish(
events.clone(),
RangeId::new(1),
ShardKeyMode::Ordered,
bounds(b"a", b"m"),
ident("CN=node-a"),
[],
PlacementMetadata::with_replication_factor(3),
))
.unwrap();
catalog
.apply_update(RangeOwnership::establish(
events.clone(),
RangeId::new(2),
ShardKeyMode::Ordered,
bounds(b"m", b"z"),
ident("CN=node-b"),
[],
PlacementMetadata::with_replication_factor(3),
))
.unwrap();
assert_eq!(
catalog.route(&events, b"alpha").unwrap().owner(),
&ident("CN=node-a")
);
assert_eq!(
catalog.route(&events, b"mike").unwrap().owner(),
&ident("CN=node-b")
);
assert!(catalog.route(&events, b"zzz").is_none());
}
#[test]
fn declaring_a_conflicting_mode_is_rejected() {
let mut catalog = ShardOwnershipCatalog::new();
let events = collection("events");
catalog
.declare_collection(events.clone(), ShardKeyMode::Ordered)
.unwrap();
catalog
.declare_collection(events.clone(), ShardKeyMode::Ordered)
.unwrap();
let err = catalog
.declare_collection(events.clone(), ShardKeyMode::Hash)
.unwrap_err();
assert_eq!(
err,
CatalogError::ShardKeyModeMismatch {
collection: events.clone(),
declared: ShardKeyMode::Ordered,
attempted: ShardKeyMode::Hash,
}
);
let err = catalog
.apply_update(hash_range(&events, 1, RangeBounds::full(), "CN=node-a"))
.unwrap_err();
assert!(matches!(err, CatalogError::ShardKeyModeMismatch { .. }));
}
#[test]
fn version_bumps_on_owner_transfer_and_epoch_fences() {
let mut catalog = ShardOwnershipCatalog::new();
let orders = collection("orders");
catalog
.apply_update(hash_range(&orders, 1, RangeBounds::full(), "CN=node-a"))
.unwrap();
let current = catalog.range(&orders, RangeId::new(1)).unwrap();
assert_eq!(current.version(), CatalogVersion::initial());
assert_eq!(current.epoch(), OwnershipEpoch::initial());
let moved = current.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]);
let outcome = catalog.apply_update(moved).unwrap();
assert_eq!(outcome, UpdateOutcome::Updated);
let after = catalog.range(&orders, RangeId::new(1)).unwrap();
assert_eq!(after.owner(), &ident("CN=node-b"));
assert_eq!(after.version().value(), 2);
assert_eq!(after.epoch().value(), 2);
let replicas_changed = after.update_replicas([ident("CN=node-c")]);
catalog.apply_update(replicas_changed).unwrap();
let after2 = catalog.range(&orders, RangeId::new(1)).unwrap();
assert_eq!(after2.version().value(), 3);
assert_eq!(after2.epoch().value(), 2); assert_eq!(after2.replicas(), &[ident("CN=node-c")]);
}
#[test]
fn stale_update_is_rejected_and_leaves_catalog_unchanged() {
let mut catalog = ShardOwnershipCatalog::new();
let orders = collection("orders");
catalog
.apply_update(hash_range(&orders, 1, RangeBounds::full(), "CN=node-a"))
.unwrap();
let v1 = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
catalog
.apply_update(v1.transfer_to(ident("CN=node-b"), []))
.unwrap();
assert_eq!(
catalog.range(&orders, RangeId::new(1)).unwrap().owner(),
&ident("CN=node-b")
);
let err = catalog.apply_update(v1.clone()).unwrap_err();
assert_eq!(
err,
CatalogError::StaleVersion {
collection: orders.clone(),
range_id: RangeId::new(1),
current: CatalogVersion::initial().next(),
attempted: CatalogVersion::initial(),
}
);
assert_eq!(
catalog.range(&orders, RangeId::new(1)).unwrap().owner(),
&ident("CN=node-b")
);
assert_eq!(
catalog
.range(&orders, RangeId::new(1))
.unwrap()
.version()
.value(),
2
);
}
#[test]
fn overlapping_range_creation_is_rejected() {
let mut catalog = ShardOwnershipCatalog::new();
let orders = collection("orders");
catalog
.apply_update(hash_range(
&orders,
1,
bounds(&[0x00], &[0x80]),
"CN=node-a",
))
.unwrap();
let err = catalog
.apply_update(hash_range(
&orders,
2,
bounds(&[0x40], &[0xc0]),
"CN=node-b",
))
.unwrap_err();
assert_eq!(
err,
CatalogError::OverlappingRange {
collection: orders.clone(),
existing: RangeId::new(1),
attempted: RangeId::new(2),
}
);
assert_eq!(catalog.range_count(), 1);
}
#[test]
fn overlapping_range_update_is_rejected() {
let mut catalog = ShardOwnershipCatalog::new();
let orders = collection("orders");
catalog
.apply_update(hash_range(
&orders,
1,
bounds(&[0x00], &[0x80]),
"CN=node-a",
))
.unwrap();
catalog
.apply_update(hash_range(
&orders,
2,
RangeBounds::new(RangeBound::key([0x80]), RangeBound::Max).unwrap(),
"CN=node-b",
))
.unwrap();
let widened = catalog
.range(&orders, RangeId::new(1))
.unwrap()
.with_bounds(bounds(&[0x00], &[0xc0]));
let err = catalog.apply_update(widened).unwrap_err();
assert_eq!(
err,
CatalogError::OverlappingRange {
collection: orders.clone(),
existing: RangeId::new(2),
attempted: RangeId::new(1),
}
);
assert_eq!(
catalog.range(&orders, RangeId::new(1)).unwrap().bounds(),
&bounds(&[0x00], &[0x80])
);
}
#[test]
fn catalog_replicates_to_data_members_with_read_visibility() {
let orders = collection("orders");
let mut leader = ShardOwnershipCatalog::new();
let mut data_member = ShardOwnershipCatalog::new();
let create = hash_range(&orders, 1, RangeBounds::full(), "CN=node-a");
leader.apply_update(create.clone()).unwrap();
assert_eq!(
data_member.apply_update(create).unwrap(),
UpdateOutcome::Created
);
assert_eq!(
data_member.route(&orders, b"any-key").unwrap().owner(),
&ident("CN=node-a")
);
let v2 = leader
.range(&orders, RangeId::new(1))
.unwrap()
.transfer_to(ident("CN=node-b"), []);
leader.apply_update(v2.clone()).unwrap();
assert_eq!(
data_member.apply_update(v2.clone()).unwrap(),
UpdateOutcome::Updated
);
assert_eq!(
data_member.route(&orders, b"any-key").unwrap().owner(),
&ident("CN=node-b")
);
let err = data_member.apply_update(v2).unwrap_err();
assert!(matches!(err, CatalogError::StaleVersion { .. }));
assert_eq!(
data_member
.range(&orders, RangeId::new(1))
.unwrap()
.version()
.value(),
2
);
}
#[test]
fn range_bounds_reject_empty_or_inverted() {
assert!(RangeBounds::new(RangeBound::key([0x10]), RangeBound::key([0x10])).is_err());
assert!(RangeBounds::new(RangeBound::key([0x20]), RangeBound::key([0x10])).is_err());
assert!(RangeBounds::new(RangeBound::Max, RangeBound::Min).is_err());
assert!(RangeBounds::full().contains(b"anything"));
}
fn range_with(
coll: &CollectionId,
id: u64,
bnds: RangeBounds,
owner: &str,
replicas: &[&str],
) -> RangeOwnership {
RangeOwnership::establish(
coll.clone(),
RangeId::new(id),
ShardKeyMode::Hash,
bnds,
ident(owner),
replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
PlacementMetadata::with_replication_factor(3),
)
}
#[test]
fn role_of_distinguishes_owner_replica_and_no_copy() {
let orders = collection("orders");
let range = range_with(&orders, 1, RangeBounds::full(), "CN=node-a", &["CN=node-b"]);
assert_eq!(range.role_of(&ident("CN=node-a")), RangeRole::Owner);
assert_eq!(range.role_of(&ident("CN=node-b")), RangeRole::Replica);
assert_eq!(range.role_of(&ident("CN=node-c")), RangeRole::NoCopy);
assert!(RangeRole::Owner.may_write_public());
assert!(!RangeRole::Replica.may_write_public());
assert!(!RangeRole::NoCopy.may_write_public());
}
#[test]
fn role_is_per_range_not_a_global_node_role() {
let mut catalog = ShardOwnershipCatalog::new();
let orders = collection("orders");
catalog
.apply_update(range_with(
&orders,
1,
RangeBounds::new(RangeBound::Min, RangeBound::key([0x80])).unwrap(),
"CN=node-a",
&["CN=node-b"],
))
.unwrap();
catalog
.apply_update(range_with(
&orders,
2,
RangeBounds::new(RangeBound::key([0x80]), RangeBound::Max).unwrap(),
"CN=node-b",
&["CN=node-a"],
))
.unwrap();
let node_a = ident("CN=node-a");
assert_eq!(
catalog.role_at(&node_a, &orders, RangeId::new(1)),
Some(RangeRole::Owner)
);
assert_eq!(
catalog.role_at(&node_a, &orders, RangeId::new(2)),
Some(RangeRole::Replica)
);
assert_eq!(catalog.role_at(&node_a, &orders, RangeId::new(99)), None);
assert_eq!(
catalog.role_at(&node_a, &collection("ghost"), RangeId::new(1)),
None
);
}
#[test]
fn public_write_admitted_on_owner_at_matching_epoch() {
let mut catalog = ShardOwnershipCatalog::new();
let orders = collection("orders");
catalog
.apply_update(range_with(
&orders,
1,
RangeBounds::full(),
"CN=node-a",
&["CN=node-b"],
))
.unwrap();
let admitted = catalog
.admit_public_write(
&ident("CN=node-a"),
&orders,
b"k",
OwnershipEpoch::initial(),
)
.expect("owner at current epoch may write");
assert_eq!(admitted.owner(), &ident("CN=node-a"));
assert_eq!(admitted.range_id(), RangeId::new(1));
}
#[test]
fn public_write_uses_hash_slot_routing_for_hash_collections() {
let mut catalog = ShardOwnershipCatalog::new();
let orders = collection("orders");
let key = b"tenant:42";
catalog
.apply_update(hash_range(
&orders,
1,
single_hash_slot_bounds(key),
"CN=node-a",
))
.unwrap();
let admitted = catalog
.admit_public_write(&ident("CN=node-a"), &orders, key, OwnershipEpoch::initial())
.expect("hash owner admits write routed by shard-key slot");
assert_eq!(admitted.range_id(), RangeId::new(1));
}
#[test]
fn public_write_rejected_on_replica_with_routing_error() {
let mut catalog = ShardOwnershipCatalog::new();
let orders = collection("orders");
catalog
.apply_update(range_with(
&orders,
1,
RangeBounds::full(),
"CN=node-a",
&["CN=node-b"],
))
.unwrap();
let err = catalog
.admit_public_write(
&ident("CN=node-b"),
&orders,
b"k",
OwnershipEpoch::initial(),
)
.unwrap_err();
match err {
RangeWriteReject::NotOwner {
role, ref owner, ..
} => {
assert_eq!(role, RangeRole::Replica);
assert_eq!(owner, &ident("CN=node-a"));
}
other => panic!("expected NotOwner(Replica), got {other:?}"),
}
assert!(err.to_string().contains("route the write to"));
}
#[test]
fn public_write_rejected_on_no_copy_holder() {
let mut catalog = ShardOwnershipCatalog::new();
let orders = collection("orders");
catalog
.apply_update(range_with(
&orders,
1,
RangeBounds::full(),
"CN=node-a",
&["CN=node-b"],
))
.unwrap();
let err = catalog
.admit_public_write(
&ident("CN=node-c"),
&orders,
b"k",
OwnershipEpoch::initial(),
)
.unwrap_err();
match err {
RangeWriteReject::NotOwner { role, .. } => assert_eq!(role, RangeRole::NoCopy),
other => panic!("expected NotOwner(NoCopy), got {other:?}"),
}
}
#[test]
fn public_write_rejected_on_stale_ownership_epoch() {
let mut catalog = ShardOwnershipCatalog::new();
let orders = collection("orders");
let v1 = range_with(&orders, 1, RangeBounds::full(), "CN=node-a", &["CN=node-b"]);
let original_epoch = v1.epoch();
catalog.apply_update(v1.clone()).unwrap();
let v2 = v1.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]);
catalog.apply_update(v2.clone()).unwrap();
let v3 = v2.transfer_to(ident("CN=node-a"), [ident("CN=node-b")]);
catalog.apply_update(v3.clone()).unwrap();
assert_ne!(original_epoch, v3.epoch());
let err = catalog
.admit_public_write(&ident("CN=node-a"), &orders, b"k", original_epoch)
.unwrap_err();
match err {
RangeWriteReject::StaleEpoch {
expected, current, ..
} => {
assert_eq!(expected, original_epoch);
assert_eq!(current, v3.epoch());
}
other => panic!("expected StaleEpoch, got {other:?}"),
}
assert!(catalog
.admit_public_write(&ident("CN=node-a"), &orders, b"k", v3.epoch())
.is_ok());
}
#[test]
fn public_write_rejected_when_no_range_covers_the_key() {
let catalog = ShardOwnershipCatalog::new();
let orders = collection("orders");
let err = catalog
.admit_public_write(
&ident("CN=node-a"),
&orders,
b"k",
OwnershipEpoch::initial(),
)
.unwrap_err();
assert!(matches!(err, RangeWriteReject::NoRange { .. }));
}
#[test]
fn internal_apply_path_stays_privileged_for_a_public_write_replica() {
use crate::replication::cdc::{ChangeOperation, ChangeRecord, RangeAuthority};
let mut catalog = ShardOwnershipCatalog::new();
let orders = collection("orders");
catalog
.apply_update(range_with(
&orders,
7,
RangeBounds::full(),
"CN=node-a",
&["CN=node-b"],
))
.unwrap();
assert!(matches!(
catalog
.admit_public_write(
&ident("CN=node-b"),
&orders,
b"k",
OwnershipEpoch::initial()
)
.unwrap_err(),
RangeWriteReject::NotOwner {
role: RangeRole::Replica,
..
}
));
let record = ChangeRecord {
term: 1,
lsn: 1,
timestamp: 0,
operation: ChangeOperation::Insert,
collection: orders.as_str().to_string(),
entity_id: 1,
entity_kind: "row".to_string(),
entity_bytes: Some(vec![1]),
metadata: None,
refresh_records: None,
range_id: None,
ownership_epoch: None,
}
.with_range_authority(7, OwnershipEpoch::initial().value());
let fence = RangeAuthority {
range_id: 7,
min_term: 1,
min_ownership_epoch: OwnershipEpoch::initial().value(),
};
assert!(
fence.admit(&record).is_ok(),
"replica internal apply must remain privileged for the owner's changes"
);
}
}