use std::collections::BTreeMap;
use super::identity::NodeIdentity;
use super::ownership::{
CatalogVersion, CollectionId, OwnershipEpoch, RangeBounds, RangeId, RangeOwnership,
ShardKeyMode, ShardOwnershipCatalog,
};
use super::routing::RoutingHint;
use super::slot::hash_shard_key_to_range_key;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TopologyRange {
collection: CollectionId,
range_id: RangeId,
shard_key_mode: ShardKeyMode,
bounds: RangeBounds,
owner: NodeIdentity,
replicas: Vec<NodeIdentity>,
epoch: OwnershipEpoch,
version: CatalogVersion,
}
impl TopologyRange {
fn from_ownership(range: &RangeOwnership) -> Self {
Self {
collection: range.collection().clone(),
range_id: range.range_id(),
shard_key_mode: range.shard_key_mode(),
bounds: range.bounds().clone(),
owner: range.owner().clone(),
replicas: range.replicas().to_vec(),
epoch: range.epoch(),
version: range.version(),
}
}
pub fn collection(&self) -> &CollectionId {
&self.collection
}
pub fn range_id(&self) -> RangeId {
self.range_id
}
pub fn shard_key_mode(&self) -> ShardKeyMode {
self.shard_key_mode
}
pub fn bounds(&self) -> &RangeBounds {
&self.bounds
}
pub fn owner(&self) -> &NodeIdentity {
&self.owner
}
pub fn replicas(&self) -> &[NodeIdentity] {
&self.replicas
}
pub fn epoch(&self) -> OwnershipEpoch {
self.epoch
}
pub fn version(&self) -> CatalogVersion {
self.version
}
fn key(&self) -> (CollectionId, RangeId) {
(self.collection.clone(), self.range_id)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TopologySnapshot {
version: CatalogVersion,
ranges: Vec<TopologyRange>,
}
impl TopologySnapshot {
pub fn version(&self) -> CatalogVersion {
self.version
}
pub fn ranges(&self) -> &[TopologyRange] {
&self.ranges
}
pub fn range(&self, collection: &CollectionId, range_id: RangeId) -> Option<&TopologyRange> {
self.ranges
.iter()
.find(|r| r.collection() == collection && r.range_id() == range_id)
}
pub fn route(&self, collection: &CollectionId, key: &[u8]) -> Option<&TopologyRange> {
self.ranges
.iter()
.find(|r| r.collection() == collection && r.bounds().contains(key))
}
pub fn route_shard_key(
&self,
collection: &CollectionId,
shard_key: &[u8],
) -> Option<&TopologyRange> {
match self.shard_key_mode(collection)? {
ShardKeyMode::Ordered => self.route(collection, shard_key),
ShardKeyMode::Hash => {
let range_key = hash_shard_key_to_range_key(shard_key);
self.route(collection, &range_key)
}
}
}
fn shard_key_mode(&self, collection: &CollectionId) -> Option<ShardKeyMode> {
self.ranges
.iter()
.find(|range| range.collection() == collection)
.map(TopologyRange::shard_key_mode)
}
}
impl ShardOwnershipCatalog {
pub fn topology_snapshot(&self) -> TopologySnapshot {
let ranges: Vec<TopologyRange> =
self.entries().map(TopologyRange::from_ownership).collect();
let version = ranges
.iter()
.map(TopologyRange::version)
.max()
.unwrap_or_else(CatalogVersion::initial);
TopologySnapshot { version, ranges }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RefreshOutcome {
Applied { ranges_changed: usize },
Ignored,
}
impl RefreshOutcome {
pub fn was_applied(self) -> bool {
matches!(self, RefreshOutcome::Applied { .. })
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum HintOutcome {
Corrected,
AlreadyCurrent,
UnknownRange,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TopologyUpdate {
Full(TopologySnapshot),
Range(TopologyRange),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ClientTopology {
version: CatalogVersion,
ranges: BTreeMap<(CollectionId, RangeId), TopologyRange>,
needs_refresh: bool,
}
impl ClientTopology {
pub fn from_snapshot(snapshot: TopologySnapshot) -> Self {
let mut cache = Self {
version: snapshot.version(),
ranges: BTreeMap::new(),
needs_refresh: false,
};
for range in snapshot.ranges {
cache.ranges.insert(range.key(), range);
}
cache
}
pub fn version(&self) -> CatalogVersion {
self.version
}
pub fn needs_refresh(&self) -> bool {
self.needs_refresh
}
pub fn route(&self, collection: &CollectionId, key: &[u8]) -> Option<&TopologyRange> {
self.ranges
.values()
.find(|r| r.collection() == collection && r.bounds().contains(key))
}
pub fn resolve(&self, collection: &CollectionId, key: &[u8]) -> Option<&NodeIdentity> {
self.route_shard_key(collection, key)
.map(TopologyRange::owner)
}
pub fn route_shard_key(
&self,
collection: &CollectionId,
shard_key: &[u8],
) -> Option<&TopologyRange> {
match self.shard_key_mode(collection)? {
ShardKeyMode::Ordered => self.route(collection, shard_key),
ShardKeyMode::Hash => {
let range_key = hash_shard_key_to_range_key(shard_key);
self.route(collection, &range_key)
}
}
}
fn shard_key_mode(&self, collection: &CollectionId) -> Option<ShardKeyMode> {
self.ranges
.values()
.find(|range| range.collection() == collection)
.map(TopologyRange::shard_key_mode)
}
pub fn range(&self, collection: &CollectionId, range_id: RangeId) -> Option<&TopologyRange> {
self.ranges.get(&(collection.clone(), range_id))
}
pub fn apply_refresh(&mut self, snapshot: TopologySnapshot) -> RefreshOutcome {
if !self.ranges.is_empty() && snapshot.version() < self.version {
return RefreshOutcome::Ignored;
}
if self.snapshot_rolls_back_any_range(&snapshot) {
return RefreshOutcome::Ignored;
}
let mut changed = 0usize;
let mut next: BTreeMap<(CollectionId, RangeId), TopologyRange> = BTreeMap::new();
for range in snapshot.ranges {
let key = range.key();
if self.ranges.get(&key) != Some(&range) {
changed += 1;
}
next.insert(key, range);
}
if !self.ranges.is_empty() && snapshot.version <= self.version && changed == 0 {
return RefreshOutcome::Ignored;
}
self.ranges = next;
self.version = snapshot.version;
self.needs_refresh = false;
RefreshOutcome::Applied {
ranges_changed: changed,
}
}
fn snapshot_rolls_back_any_range(&self, snapshot: &TopologySnapshot) -> bool {
snapshot.ranges().iter().any(|incoming| {
self.ranges
.get(&incoming.key())
.is_some_and(|current| incoming.version() < current.version())
})
}
pub fn apply_update(&mut self, update: TopologyUpdate) -> RefreshOutcome {
match update {
TopologyUpdate::Full(snapshot) => self.apply_refresh(snapshot),
TopologyUpdate::Range(range) => {
let key = range.key();
let newer = match self.ranges.get(&key) {
Some(current) => range.version() > current.version(),
None => true,
};
if !newer {
return RefreshOutcome::Ignored;
}
if range.version() > self.version {
self.version = range.version();
}
self.ranges.insert(key, range);
RefreshOutcome::Applied { ranges_changed: 1 }
}
}
}
pub fn apply_hint(&mut self, hint: &RoutingHint) -> HintOutcome {
let key = (hint.collection().clone(), hint.range_id());
match self.ranges.get_mut(&key) {
Some(range) => {
if hint.version() <= range.version {
return HintOutcome::AlreadyCurrent;
}
range.owner = hint.owner().clone();
range.epoch = hint.epoch();
range.version = hint.version();
self.needs_refresh = true;
HintOutcome::Corrected
}
None => {
self.needs_refresh = true;
HintOutcome::UnknownRange
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cluster::ownership::{PlacementMetadata, RangeBound, ShardKeyMode};
use crate::cluster::routing::{RequestOperation, RouteDecision, RoutedRequest, RoutingPolicy};
fn collection(name: &str) -> CollectionId {
CollectionId::new(name).unwrap()
}
fn ident(cn: &str) -> NodeIdentity {
NodeIdentity::from_certificate_subject(cn).unwrap()
}
fn full_range(coll: &CollectionId, id: u64, owner: &str, replicas: &[&str]) -> RangeOwnership {
RangeOwnership::establish(
coll.clone(),
RangeId::new(id),
ShardKeyMode::Hash,
RangeBounds::full(),
ident(owner),
replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
PlacementMetadata::with_replication_factor(3),
)
}
fn split_range(
coll: &CollectionId,
id: u64,
lower: RangeBound,
upper: RangeBound,
owner: &str,
) -> RangeOwnership {
RangeOwnership::establish(
coll.clone(),
RangeId::new(id),
ShardKeyMode::Ordered,
RangeBounds::new(lower, upper).unwrap(),
ident(owner),
Vec::<NodeIdentity>::new(),
PlacementMetadata::with_replication_factor(1),
)
}
fn single_hash_slot_bounds(key: &[u8]) -> RangeBounds {
let slot = super::super::slot::hash_shard_key_to_slot(key);
let lower = RangeBound::key(slot.range_key());
let upper = match slot.value().checked_add(1) {
Some(next) if next < super::super::slot::PRODUCTION_HASH_SLOT_COUNT => {
RangeBound::key(super::super::slot::HashSlot::new(next).unwrap().range_key())
}
_ => RangeBound::Max,
};
RangeBounds::new(lower, upper).unwrap()
}
fn hash_slot_range(
coll: &CollectionId,
id: u64,
shard_key: &[u8],
owner: &str,
) -> RangeOwnership {
RangeOwnership::establish(
coll.clone(),
RangeId::new(id),
ShardKeyMode::Hash,
single_hash_slot_bounds(shard_key),
ident(owner),
Vec::<NodeIdentity>::new(),
PlacementMetadata::with_replication_factor(1),
)
}
fn catalog_with(ranges: impl IntoIterator<Item = RangeOwnership>) -> ShardOwnershipCatalog {
let mut catalog = ShardOwnershipCatalog::new();
for range in ranges {
catalog.apply_update(range).unwrap();
}
catalog
}
#[test]
fn snapshot_exposes_routing_metadata_for_direct_routing() {
let orders = collection("orders");
let catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &["CN=node-b"])]);
let snapshot = catalog.topology_snapshot();
assert_eq!(snapshot.version(), CatalogVersion::initial());
assert_eq!(snapshot.ranges().len(), 1);
let range = snapshot
.route(&orders, b"any-key")
.expect("full range covers all keys");
assert_eq!(range.owner(), &ident("CN=node-a"));
assert_eq!(range.replicas(), &[ident("CN=node-b")]);
assert_eq!(range.epoch(), OwnershipEpoch::initial());
assert_eq!(range.range_id(), RangeId::new(1));
}
#[test]
fn snapshot_routes_keys_to_distinct_owners() {
let parts = collection("parts");
let catalog = catalog_with([
split_range(
&parts,
1,
RangeBound::Min,
RangeBound::key(b"m"),
"CN=node-a",
),
split_range(
&parts,
2,
RangeBound::key(b"m"),
RangeBound::Max,
"CN=node-b",
),
]);
let snapshot = catalog.topology_snapshot();
assert_eq!(
snapshot.route(&parts, b"apple").unwrap().owner(),
&ident("CN=node-a")
);
assert_eq!(
snapshot.route(&parts, b"zebra").unwrap().owner(),
&ident("CN=node-b")
);
}
#[test]
fn client_resolves_owner_from_polled_snapshot() {
let orders = collection("orders");
let catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &[])]);
let client = ClientTopology::from_snapshot(catalog.topology_snapshot());
assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-a"));
assert!(!client.needs_refresh());
}
#[test]
fn client_resolves_hash_collection_by_shard_key_slot() {
let orders = collection("orders");
let key = b"tenant:42";
let catalog = catalog_with([hash_slot_range(&orders, 1, key, "CN=node-a")]);
let client = ClientTopology::from_snapshot(catalog.topology_snapshot());
let routed = client
.route_shard_key(&orders, key)
.expect("hash slot range covers the logical shard key");
assert_eq!(routed.owner(), &ident("CN=node-a"));
assert_eq!(client.resolve(&orders, key).unwrap(), &ident("CN=node-a"));
}
#[test]
fn refresh_is_monotonic() {
let orders = collection("orders");
let mut catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &["CN=node-b"])]);
let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
let v1 = client.version();
let r = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
catalog
.apply_update(r.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
.unwrap();
let fresh = catalog.topology_snapshot();
assert!(fresh.version() > v1);
assert_eq!(
client.apply_refresh(fresh.clone()),
RefreshOutcome::Applied { ranges_changed: 1 }
);
assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-b"));
assert_eq!(client.apply_refresh(fresh), RefreshOutcome::Ignored);
}
#[test]
fn refresh_applies_same_generation_snapshot_when_another_range_changed() {
let parts = collection("parts");
let mut catalog = catalog_with([
split_range(
&parts,
1,
RangeBound::Min,
RangeBound::key(b"m"),
"CN=node-a",
),
split_range(
&parts,
2,
RangeBound::key(b"m"),
RangeBound::Max,
"CN=node-b",
),
]);
let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
let r1 = catalog.range(&parts, RangeId::new(1)).unwrap().clone();
catalog
.apply_update(r1.transfer_to(ident("CN=node-c"), Vec::<NodeIdentity>::new()))
.unwrap();
assert_eq!(
client.apply_refresh(catalog.topology_snapshot()),
RefreshOutcome::Applied { ranges_changed: 1 }
);
assert_eq!(
client.resolve(&parts, b"apple").unwrap(),
&ident("CN=node-c")
);
let r2 = catalog.range(&parts, RangeId::new(2)).unwrap().clone();
catalog
.apply_update(r2.transfer_to(ident("CN=node-d"), Vec::<NodeIdentity>::new()))
.unwrap();
let same_generation = catalog.topology_snapshot();
assert_eq!(same_generation.version(), client.version());
assert_eq!(
client.apply_refresh(same_generation),
RefreshOutcome::Applied { ranges_changed: 1 }
);
assert_eq!(
client.resolve(&parts, b"zebra").unwrap(),
&ident("CN=node-d")
);
}
#[test]
fn equal_generation_refresh_does_not_roll_back_a_newer_range() {
let parts = collection("parts");
let base = catalog_with([
split_range(
&parts,
1,
RangeBound::Min,
RangeBound::key(b"m"),
"CN=node-a",
),
split_range(
&parts,
2,
RangeBound::key(b"m"),
RangeBound::Max,
"CN=node-b",
),
]);
let mut current_catalog = base.clone();
let mut stale_fork = base;
let mut client = ClientTopology::from_snapshot(current_catalog.topology_snapshot());
let r1 = current_catalog
.range(&parts, RangeId::new(1))
.unwrap()
.clone();
current_catalog
.apply_update(r1.transfer_to(ident("CN=node-c"), Vec::<NodeIdentity>::new()))
.unwrap();
assert!(client
.apply_refresh(current_catalog.topology_snapshot())
.was_applied());
let r2 = stale_fork.range(&parts, RangeId::new(2)).unwrap().clone();
stale_fork
.apply_update(r2.transfer_to(ident("CN=node-d"), Vec::<NodeIdentity>::new()))
.unwrap();
let fork_snapshot = stale_fork.topology_snapshot();
assert_eq!(fork_snapshot.version(), client.version());
assert_eq!(client.apply_refresh(fork_snapshot), RefreshOutcome::Ignored);
assert_eq!(
client.resolve(&parts, b"apple").unwrap(),
&ident("CN=node-c")
);
assert_eq!(
client.resolve(&parts, b"zebra").unwrap(),
&ident("CN=node-b")
);
}
#[test]
fn redirect_hint_corrects_cache_but_is_not_authoritative() {
let orders = collection("orders");
let mut catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &["CN=node-b"])]);
let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
let r = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
catalog
.apply_update(r.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
.unwrap();
let stale_owner = client.resolve(&orders, b"k").unwrap().clone();
assert_eq!(stale_owner, ident("CN=node-a"));
let request =
RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::Transaction);
let hint = match catalog.plan_route(&stale_owner, &request, &RoutingPolicy::forwarding()) {
RouteDecision::Redirect { hint, .. } => hint,
other => panic!("expected redirect, got {other:?}"),
};
assert_eq!(client.apply_hint(&hint), HintOutcome::Corrected);
assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-b"));
assert!(
client.needs_refresh(),
"a hint is advisory, not authoritative"
);
assert!(client
.apply_refresh(catalog.topology_snapshot())
.was_applied());
assert!(!client.needs_refresh());
let range = client.range(&orders, RangeId::new(1)).unwrap();
assert_eq!(range.replicas(), &[ident("CN=node-a")]);
}
#[test]
fn hint_for_unknown_range_does_not_invent_topology() {
let orders = collection("orders");
let other = collection("other");
let catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &[])]);
let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
let foreign = catalog_with([full_range(&other, 9, "CN=node-z", &[])]);
let request =
RoutedRequest::new(other.clone(), b"k".to_vec(), RequestOperation::Transaction);
let hint = foreign
.plan_route(&ident("CN=node-b"), &request, &RoutingPolicy::forwarding())
.hint()
.cloned()
.unwrap();
assert_eq!(client.apply_hint(&hint), HintOutcome::UnknownRange);
assert!(
client.range(&other, RangeId::new(9)).is_none(),
"no phantom range"
);
assert!(client.needs_refresh());
}
#[test]
fn stale_hint_is_ignored_after_authoritative_catch_up() {
let orders = collection("orders");
let mut catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &["CN=node-b"])]);
let request =
RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::Transaction);
let early_hint = catalog
.plan_route(&ident("CN=node-b"), &request, &RoutingPolicy::forwarding())
.hint()
.cloned()
.unwrap();
let r = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
catalog
.apply_update(r.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
.unwrap();
let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
assert_eq!(client.apply_hint(&early_hint), HintOutcome::AlreadyCurrent);
assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-b"));
assert!(!client.needs_refresh());
}
#[test]
fn push_full_snapshot_applies_like_a_poll() {
let orders = collection("orders");
let mut catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &[])]);
let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
let r = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
catalog
.apply_update(r.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
.unwrap();
let update = TopologyUpdate::Full(catalog.topology_snapshot());
assert!(client.apply_update(update).was_applied());
assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-b"));
}
#[test]
fn push_range_delta_advances_one_range() {
let parts = collection("parts");
let mut catalog = catalog_with([
split_range(
&parts,
1,
RangeBound::Min,
RangeBound::key(b"m"),
"CN=node-a",
),
split_range(
&parts,
2,
RangeBound::key(b"m"),
RangeBound::Max,
"CN=node-b",
),
]);
let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
let r2 = catalog.range(&parts, RangeId::new(2)).unwrap().clone();
catalog
.apply_update(r2.transfer_to(ident("CN=node-c"), Vec::<NodeIdentity>::new()))
.unwrap();
let moved = catalog
.topology_snapshot()
.range(&parts, RangeId::new(2))
.unwrap()
.clone();
assert_eq!(
client.apply_update(TopologyUpdate::Range(moved)),
RefreshOutcome::Applied { ranges_changed: 1 }
);
assert_eq!(
client.resolve(&parts, b"apple").unwrap(),
&ident("CN=node-a")
);
assert_eq!(
client.resolve(&parts, b"zebra").unwrap(),
&ident("CN=node-c")
);
}
#[test]
fn missed_push_still_converges_via_hint_and_poll() {
let orders = collection("orders");
let mut catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &["CN=node-b"])]);
let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
let r = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
catalog
.apply_update(r.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
.unwrap();
let _dropped_push = TopologyUpdate::Full(catalog.topology_snapshot());
assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-a"));
let request =
RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::Transaction);
let hint = catalog
.plan_route(&ident("CN=node-a"), &request, &RoutingPolicy::forwarding())
.hint()
.cloned()
.unwrap();
assert_eq!(client.apply_hint(&hint), HintOutcome::Corrected);
assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-b"));
assert!(client.needs_refresh());
assert!(client
.apply_refresh(catalog.topology_snapshot())
.was_applied());
assert!(!client.needs_refresh());
}
#[test]
fn out_of_order_push_keeps_newest() {
let orders = collection("orders");
let mut catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &["CN=node-b"])]);
let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
let r1 = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
catalog
.apply_update(r1.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
.unwrap();
let push_v2 = catalog
.topology_snapshot()
.range(&orders, RangeId::new(1))
.unwrap()
.clone();
let r2 = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
catalog
.apply_update(r2.transfer_to(ident("CN=node-c"), [ident("CN=node-b")]))
.unwrap();
let push_v3 = catalog
.topology_snapshot()
.range(&orders, RangeId::new(1))
.unwrap()
.clone();
assert!(client
.apply_update(TopologyUpdate::Range(push_v3))
.was_applied());
assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-c"));
assert_eq!(
client.apply_update(TopologyUpdate::Range(push_v2)),
RefreshOutcome::Ignored
);
assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-c"));
}
}