1use std::collections::BTreeMap;
47
48use super::identity::NodeIdentity;
49use super::slot::hash_shard_key_to_range_key;
50
51#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
59pub struct CollectionId(String);
60
61#[derive(Debug, Clone, PartialEq, Eq)]
62pub struct CollectionIdError;
63
64impl std::fmt::Display for CollectionIdError {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 write!(f, "collection id is empty")
67 }
68}
69
70impl std::error::Error for CollectionIdError {}
71
72impl CollectionId {
73 pub fn new(value: impl AsRef<str>) -> Result<Self, CollectionIdError> {
75 let value = value.as_ref().trim();
76 if value.is_empty() {
77 return Err(CollectionIdError);
78 }
79 Ok(Self(value.to_string()))
80 }
81
82 pub fn as_str(&self) -> &str {
83 &self.0
84 }
85}
86
87impl std::fmt::Display for CollectionId {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 f.write_str(&self.0)
90 }
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
101pub struct RangeId(u64);
102
103impl RangeId {
104 pub fn new(value: u64) -> Self {
105 Self(value)
106 }
107
108 pub fn value(self) -> u64 {
109 self.0
110 }
111}
112
113impl std::fmt::Display for RangeId {
114 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115 write!(f, "{}", self.0)
116 }
117}
118
119#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
128pub enum ShardKeyMode {
129 #[default]
131 Hash,
132 Ordered,
135}
136
137#[derive(Debug, Clone, PartialEq, Eq)]
145pub enum RangeBound {
146 Min,
148 Key(Vec<u8>),
150 Max,
152}
153
154impl RangeBound {
155 pub fn key(bytes: impl Into<Vec<u8>>) -> Self {
157 RangeBound::Key(bytes.into())
158 }
159
160 fn position(&self) -> Position<'_> {
164 match self {
165 RangeBound::Min => Position::Min,
166 RangeBound::Key(k) => Position::Key(k),
167 RangeBound::Max => Position::Max,
168 }
169 }
170}
171
172#[derive(PartialEq, Eq, PartialOrd, Ord)]
173enum Position<'a> {
174 Min,
175 Key(&'a [u8]),
176 Max,
177}
178
179#[derive(Debug, Clone, PartialEq, Eq)]
186pub struct RangeBounds {
187 lower: RangeBound,
188 upper: RangeBound,
189}
190
191#[derive(Debug, Clone, PartialEq, Eq)]
192pub struct RangeBoundsError;
193
194impl std::fmt::Display for RangeBoundsError {
195 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
196 write!(
197 f,
198 "range lower bound must be strictly below the upper bound"
199 )
200 }
201}
202
203impl std::error::Error for RangeBoundsError {}
204
205impl RangeBounds {
206 pub fn new(lower: RangeBound, upper: RangeBound) -> Result<Self, RangeBoundsError> {
209 if lower.position() >= upper.position() {
210 return Err(RangeBoundsError);
211 }
212 Ok(Self { lower, upper })
213 }
214
215 pub fn full() -> Self {
217 Self {
218 lower: RangeBound::Min,
219 upper: RangeBound::Max,
220 }
221 }
222
223 pub fn lower(&self) -> &RangeBound {
224 &self.lower
225 }
226
227 pub fn upper(&self) -> &RangeBound {
228 &self.upper
229 }
230
231 pub fn contains(&self, key: &[u8]) -> bool {
234 let key = Position::Key(key);
235 self.lower.position() <= key && key < self.upper.position()
236 }
237
238 pub fn overlaps(&self, other: &RangeBounds) -> bool {
241 self.lower.position() < other.upper.position()
242 && other.lower.position() < self.upper.position()
243 }
244
245 pub fn split_at(&self, at: &[u8]) -> Result<(RangeBounds, RangeBounds), RangeBoundsError> {
254 let at_pos = Position::Key(at);
255 if at_pos <= self.lower.position() || at_pos >= self.upper.position() {
256 return Err(RangeBoundsError);
257 }
258 let lower = RangeBounds {
259 lower: self.lower.clone(),
260 upper: RangeBound::key(at.to_vec()),
261 };
262 let upper = RangeBounds {
263 lower: RangeBound::key(at.to_vec()),
264 upper: self.upper.clone(),
265 };
266 Ok((lower, upper))
267 }
268}
269
270#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
277pub struct CatalogVersion(u64);
278
279impl CatalogVersion {
280 pub fn initial() -> Self {
282 Self(1)
283 }
284
285 pub fn value(self) -> u64 {
286 self.0
287 }
288
289 fn next(self) -> Self {
290 Self(self.0 + 1)
291 }
292}
293
294impl std::fmt::Display for CatalogVersion {
295 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
296 write!(f, "{}", self.0)
297 }
298}
299
300#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
308pub struct OwnershipEpoch(u64);
309
310impl OwnershipEpoch {
311 pub fn initial() -> Self {
313 Self(1)
314 }
315
316 pub fn value(self) -> u64 {
317 self.0
318 }
319
320 fn next(self) -> Self {
321 Self(self.0 + 1)
322 }
323}
324
325impl std::fmt::Display for OwnershipEpoch {
326 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
327 write!(f, "{}", self.0)
328 }
329}
330
331#[derive(Debug, Clone, Default, PartialEq, Eq)]
337pub struct PlacementMetadata {
338 replication_factor: usize,
339 attributes: BTreeMap<String, String>,
340}
341
342impl PlacementMetadata {
343 pub fn with_replication_factor(replication_factor: usize) -> Self {
345 Self {
346 replication_factor,
347 attributes: BTreeMap::new(),
348 }
349 }
350
351 pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
353 self.attributes.insert(key.into(), value.into());
354 self
355 }
356
357 pub fn replication_factor(&self) -> usize {
358 self.replication_factor
359 }
360
361 pub fn attribute(&self, key: &str) -> Option<&str> {
362 self.attributes.get(key).map(String::as_str)
363 }
364}
365
366#[derive(Debug, Clone, PartialEq, Eq)]
377pub struct RangeOwnership {
378 collection: CollectionId,
379 range_id: RangeId,
380 shard_key_mode: ShardKeyMode,
381 bounds: RangeBounds,
382 owner: NodeIdentity,
383 replicas: Vec<NodeIdentity>,
384 epoch: OwnershipEpoch,
385 version: CatalogVersion,
386 placement: PlacementMetadata,
387}
388
389impl RangeOwnership {
390 #[allow(clippy::too_many_arguments)]
393 pub fn establish(
394 collection: CollectionId,
395 range_id: RangeId,
396 shard_key_mode: ShardKeyMode,
397 bounds: RangeBounds,
398 owner: NodeIdentity,
399 replicas: impl IntoIterator<Item = NodeIdentity>,
400 placement: PlacementMetadata,
401 ) -> Self {
402 Self {
403 collection,
404 range_id,
405 shard_key_mode,
406 bounds,
407 owner,
408 replicas: replicas.into_iter().collect(),
409 epoch: OwnershipEpoch::initial(),
410 version: CatalogVersion::initial(),
411 placement,
412 }
413 }
414
415 pub fn collection(&self) -> &CollectionId {
416 &self.collection
417 }
418
419 pub fn range_id(&self) -> RangeId {
420 self.range_id
421 }
422
423 pub fn shard_key_mode(&self) -> ShardKeyMode {
424 self.shard_key_mode
425 }
426
427 pub fn bounds(&self) -> &RangeBounds {
428 &self.bounds
429 }
430
431 pub fn owner(&self) -> &NodeIdentity {
432 &self.owner
433 }
434
435 pub fn replicas(&self) -> &[NodeIdentity] {
436 &self.replicas
437 }
438
439 pub fn epoch(&self) -> OwnershipEpoch {
440 self.epoch
441 }
442
443 pub fn version(&self) -> CatalogVersion {
444 self.version
445 }
446
447 pub fn placement(&self) -> &PlacementMetadata {
448 &self.placement
449 }
450
451 fn key(&self) -> (CollectionId, RangeId) {
453 (self.collection.clone(), self.range_id)
454 }
455
456 pub fn transfer_to(
460 &self,
461 new_owner: NodeIdentity,
462 new_replicas: impl IntoIterator<Item = NodeIdentity>,
463 ) -> Self {
464 Self {
465 owner: new_owner,
466 replicas: new_replicas.into_iter().collect(),
467 epoch: self.epoch.next(),
468 version: self.version.next(),
469 ..self.clone()
470 }
471 }
472
473 pub fn update_replicas(&self, new_replicas: impl IntoIterator<Item = NodeIdentity>) -> Self {
476 Self {
477 replicas: new_replicas.into_iter().collect(),
478 version: self.version.next(),
479 ..self.clone()
480 }
481 }
482
483 pub fn update_placement(&self, placement: PlacementMetadata) -> Self {
486 Self {
487 placement,
488 version: self.version.next(),
489 ..self.clone()
490 }
491 }
492
493 pub fn with_bounds(&self, bounds: RangeBounds) -> Self {
499 Self {
500 bounds,
501 version: self.version.next(),
502 ..self.clone()
503 }
504 }
505
506 pub fn role_of(&self, node: &NodeIdentity) -> RangeRole {
516 if self.owner == *node {
517 RangeRole::Owner
518 } else if self.replicas.iter().any(|replica| replica == node) {
519 RangeRole::Replica
520 } else {
521 RangeRole::NoCopy
522 }
523 }
524}
525
526#[derive(Debug, Clone, Copy, PartialEq, Eq)]
538pub enum RangeRole {
539 Owner,
542 Replica,
546 NoCopy,
548}
549
550impl RangeRole {
551 pub fn may_write_public(self) -> bool {
554 matches!(self, RangeRole::Owner)
555 }
556
557 fn label(self) -> &'static str {
558 match self {
559 RangeRole::Owner => "owner",
560 RangeRole::Replica => "replica",
561 RangeRole::NoCopy => "no-copy",
562 }
563 }
564}
565
566#[derive(Debug, Clone, Copy, PartialEq, Eq)]
568pub enum UpdateOutcome {
569 Created,
571 Updated,
573}
574
575#[derive(Debug, Clone, PartialEq, Eq)]
577pub enum CatalogError {
578 StaleVersion {
582 collection: CollectionId,
583 range_id: RangeId,
584 current: CatalogVersion,
585 attempted: CatalogVersion,
586 },
587 ShardKeyModeMismatch {
590 collection: CollectionId,
591 declared: ShardKeyMode,
592 attempted: ShardKeyMode,
593 },
594 OverlappingRange {
597 collection: CollectionId,
598 existing: RangeId,
599 attempted: RangeId,
600 },
601}
602
603impl std::fmt::Display for CatalogError {
604 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
605 match self {
606 Self::StaleVersion {
607 collection,
608 range_id,
609 current,
610 attempted,
611 } => write!(
612 f,
613 "stale catalog update for {collection}/{range_id}: current version {current}, attempted {attempted}"
614 ),
615 Self::ShardKeyModeMismatch {
616 collection,
617 declared,
618 attempted,
619 } => write!(
620 f,
621 "collection {collection} is declared {declared:?} but range uses {attempted:?}"
622 ),
623 Self::OverlappingRange {
624 collection,
625 existing,
626 attempted,
627 } => write!(
628 f,
629 "range {attempted} overlaps existing range {existing} of collection {collection}"
630 ),
631 }
632 }
633}
634
635impl std::error::Error for CatalogError {}
636
637#[derive(Debug, Clone, PartialEq, Eq)]
647pub enum RangeWriteReject {
648 NoRange { collection: CollectionId },
651 NotOwner {
658 collection: CollectionId,
659 range_id: RangeId,
660 role: RangeRole,
661 owner: NodeIdentity,
662 },
663 StaleEpoch {
668 collection: CollectionId,
669 range_id: RangeId,
670 expected: OwnershipEpoch,
671 current: OwnershipEpoch,
672 },
673}
674
675impl std::fmt::Display for RangeWriteReject {
676 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
677 match self {
678 Self::NoRange { collection } => write!(
679 f,
680 "no range of collection {collection} covers the routed key — re-resolve routing"
681 ),
682 Self::NotOwner {
683 collection,
684 range_id,
685 role,
686 owner,
687 } => write!(
688 f,
689 "this node is {} of {collection}/{range_id}, not its owner — route the write to {owner}",
690 role.label()
691 ),
692 Self::StaleEpoch {
693 collection,
694 range_id,
695 expected,
696 current,
697 } => write!(
698 f,
699 "stale ownership epoch for {collection}/{range_id}: write authorised under epoch {expected}, current is {current}"
700 ),
701 }
702 }
703}
704
705impl std::error::Error for RangeWriteReject {}
706
707#[derive(Debug, Clone, Default)]
718pub struct ShardOwnershipCatalog {
719 collections: BTreeMap<CollectionId, ShardKeyMode>,
724 ranges: BTreeMap<(CollectionId, RangeId), RangeOwnership>,
725}
726
727impl ShardOwnershipCatalog {
728 pub fn new() -> Self {
730 Self::default()
731 }
732
733 pub fn declare_collection(
741 &mut self,
742 collection: CollectionId,
743 mode: ShardKeyMode,
744 ) -> Result<(), CatalogError> {
745 match self.collections.get(&collection) {
746 Some(&declared) if declared != mode => Err(CatalogError::ShardKeyModeMismatch {
747 collection,
748 declared,
749 attempted: mode,
750 }),
751 _ => {
752 self.collections.insert(collection, mode);
753 Ok(())
754 }
755 }
756 }
757
758 pub fn shard_key_mode(&self, collection: &CollectionId) -> Option<ShardKeyMode> {
761 self.collections.get(collection).copied()
762 }
763
764 pub fn apply_update(&mut self, entry: RangeOwnership) -> Result<UpdateOutcome, CatalogError> {
775 match self.collections.get(entry.collection()) {
777 Some(&declared) if declared != entry.shard_key_mode() => {
778 return Err(CatalogError::ShardKeyModeMismatch {
779 collection: entry.collection().clone(),
780 declared,
781 attempted: entry.shard_key_mode(),
782 });
783 }
784 _ => {}
785 }
786
787 let key = entry.key();
788 match self.ranges.get(&key) {
789 Some(current) => {
790 if entry.version() <= current.version() {
791 return Err(CatalogError::StaleVersion {
792 collection: entry.collection().clone(),
793 range_id: entry.range_id(),
794 current: current.version(),
795 attempted: entry.version(),
796 });
797 }
798 if let Some(existing) = self.overlapping_sibling(&entry) {
799 return Err(CatalogError::OverlappingRange {
800 collection: entry.collection().clone(),
801 existing,
802 attempted: entry.range_id(),
803 });
804 }
805 self.collections
806 .insert(entry.collection().clone(), entry.shard_key_mode());
807 self.ranges.insert(key, entry);
808 Ok(UpdateOutcome::Updated)
809 }
810 None => {
811 if let Some(existing) = self.overlapping_sibling(&entry) {
814 return Err(CatalogError::OverlappingRange {
815 collection: entry.collection().clone(),
816 existing,
817 attempted: entry.range_id(),
818 });
819 }
820 self.collections
821 .insert(entry.collection().clone(), entry.shard_key_mode());
822 self.ranges.insert(key, entry);
823 Ok(UpdateOutcome::Created)
824 }
825 }
826 }
827
828 fn overlapping_sibling(&self, entry: &RangeOwnership) -> Option<RangeId> {
829 self.ranges_for(entry.collection())
830 .find(|range| {
831 range.range_id() != entry.range_id() && range.bounds().overlaps(entry.bounds())
832 })
833 .map(RangeOwnership::range_id)
834 }
835
836 pub fn range(&self, collection: &CollectionId, range_id: RangeId) -> Option<&RangeOwnership> {
839 self.ranges.get(&(collection.clone(), range_id))
840 }
841
842 pub fn ranges_for<'a>(
844 &'a self,
845 collection: &CollectionId,
846 ) -> impl Iterator<Item = &'a RangeOwnership> {
847 let collection = collection.clone();
848 self.ranges
849 .iter()
850 .filter(move |((c, _), _)| *c == collection)
851 .map(|(_, r)| r)
852 }
853
854 pub fn route(&self, collection: &CollectionId, key: &[u8]) -> Option<&RangeOwnership> {
859 self.ranges_for(collection)
860 .find(|r| r.bounds().contains(key))
861 }
862
863 pub fn route_shard_key(
869 &self,
870 collection: &CollectionId,
871 shard_key: &[u8],
872 ) -> Option<&RangeOwnership> {
873 match self.shard_key_mode(collection)? {
874 ShardKeyMode::Ordered => self.route(collection, shard_key),
875 ShardKeyMode::Hash => {
876 let range_key = hash_shard_key_to_range_key(shard_key);
877 self.route(collection, &range_key)
878 }
879 }
880 }
881
882 pub fn role_at(
887 &self,
888 node: &NodeIdentity,
889 collection: &CollectionId,
890 range_id: RangeId,
891 ) -> Option<RangeRole> {
892 self.range(collection, range_id)
893 .map(|range| range.role_of(node))
894 }
895
896 pub fn admit_public_write(
912 &self,
913 node: &NodeIdentity,
914 collection: &CollectionId,
915 key: &[u8],
916 expected_epoch: OwnershipEpoch,
917 ) -> Result<&RangeOwnership, RangeWriteReject> {
918 let range =
919 self.route_shard_key(collection, key)
920 .ok_or_else(|| RangeWriteReject::NoRange {
921 collection: collection.clone(),
922 })?;
923 let role = range.role_of(node);
924 if !role.may_write_public() {
925 return Err(RangeWriteReject::NotOwner {
926 collection: collection.clone(),
927 range_id: range.range_id(),
928 role,
929 owner: range.owner().clone(),
930 });
931 }
932 if expected_epoch != range.epoch() {
933 return Err(RangeWriteReject::StaleEpoch {
934 collection: collection.clone(),
935 range_id: range.range_id(),
936 expected: expected_epoch,
937 current: range.epoch(),
938 });
939 }
940 Ok(range)
941 }
942
943 pub fn range_count(&self) -> usize {
945 self.ranges.len()
946 }
947
948 pub fn entries(&self) -> impl Iterator<Item = &RangeOwnership> {
952 self.ranges.values()
953 }
954}
955
956#[cfg(test)]
957mod tests {
958 use super::*;
959
960 fn collection(name: &str) -> CollectionId {
961 CollectionId::new(name).unwrap()
962 }
963
964 fn ident(cn: &str) -> NodeIdentity {
965 NodeIdentity::from_certificate_subject(cn).unwrap()
966 }
967
968 fn bounds(lower: &[u8], upper: &[u8]) -> RangeBounds {
969 RangeBounds::new(RangeBound::key(lower), RangeBound::key(upper)).unwrap()
970 }
971
972 fn hash_range(coll: &CollectionId, id: u64, bnds: RangeBounds, owner: &str) -> RangeOwnership {
974 RangeOwnership::establish(
975 coll.clone(),
976 RangeId::new(id),
977 ShardKeyMode::Hash,
978 bnds,
979 ident(owner),
980 [ident("CN=replica-1")],
981 PlacementMetadata::with_replication_factor(3),
982 )
983 }
984
985 fn single_hash_slot_bounds(key: &[u8]) -> RangeBounds {
986 let slot = super::super::slot::hash_shard_key_to_slot(key);
987 let lower = RangeBound::key(slot.range_key());
988 let upper = match slot.value().checked_add(1) {
989 Some(next) if next < super::super::slot::PRODUCTION_HASH_SLOT_COUNT => {
990 RangeBound::key(super::super::slot::HashSlot::new(next).unwrap().range_key())
991 }
992 _ => RangeBound::Max,
993 };
994 RangeBounds::new(lower, upper).unwrap()
995 }
996
997 #[test]
998 fn empty_catalog_creation() {
999 let catalog = ShardOwnershipCatalog::new();
1000 assert_eq!(catalog.range_count(), 0);
1001 assert!(catalog.shard_key_mode(&collection("orders")).is_none());
1002 }
1003
1004 #[test]
1005 fn hash_is_the_default_shard_key_mode() {
1006 assert_eq!(ShardKeyMode::default(), ShardKeyMode::Hash);
1009
1010 let mut catalog = ShardOwnershipCatalog::new();
1011 let orders = collection("orders");
1012 catalog
1013 .apply_update(hash_range(&orders, 1, RangeBounds::full(), "CN=node-a"))
1014 .unwrap();
1015 assert_eq!(catalog.shard_key_mode(&orders), Some(ShardKeyMode::Hash));
1016 }
1017
1018 #[test]
1019 fn hash_range_entry_routes_to_owner() {
1020 let mut catalog = ShardOwnershipCatalog::new();
1021 let orders = collection("orders");
1022
1023 catalog
1025 .apply_update(hash_range(
1026 &orders,
1027 1,
1028 RangeBounds::new(RangeBound::Min, RangeBound::key([0x80])).unwrap(),
1029 "CN=node-a",
1030 ))
1031 .unwrap();
1032 catalog
1033 .apply_update(hash_range(
1034 &orders,
1035 2,
1036 RangeBounds::new(RangeBound::key([0x80]), RangeBound::Max).unwrap(),
1037 "CN=node-b",
1038 ))
1039 .unwrap();
1040
1041 assert_eq!(
1043 catalog.route(&orders, &[0x10]).unwrap().owner(),
1044 &ident("CN=node-a")
1045 );
1046 assert_eq!(
1047 catalog.route(&orders, &[0x80]).unwrap().owner(),
1048 &ident("CN=node-b")
1049 );
1050 assert_eq!(
1051 catalog.route(&orders, &[0xff]).unwrap().owner(),
1052 &ident("CN=node-b")
1053 );
1054 let r = catalog.route(&orders, &[0x10]).unwrap();
1056 assert_eq!(r.replicas(), &[ident("CN=replica-1")]);
1057 assert_eq!(r.epoch(), OwnershipEpoch::initial());
1058 }
1059
1060 #[test]
1061 fn hash_mode_routes_logical_shard_key_through_hash_slot() {
1062 let mut catalog = ShardOwnershipCatalog::new();
1063 let orders = collection("orders");
1064 let key = b"tenant:42";
1065 catalog
1066 .apply_update(hash_range(
1067 &orders,
1068 1,
1069 single_hash_slot_bounds(key),
1070 "CN=node-a",
1071 ))
1072 .unwrap();
1073
1074 let routed = catalog
1075 .route_shard_key(&orders, key)
1076 .expect("hash slot range covers the logical shard key");
1077 assert_eq!(routed.owner(), &ident("CN=node-a"));
1078 }
1079
1080 #[test]
1081 fn ordered_mode_can_be_declared_and_routed() {
1082 let mut catalog = ShardOwnershipCatalog::new();
1083 let events = collection("events");
1084 catalog
1085 .declare_collection(events.clone(), ShardKeyMode::Ordered)
1086 .unwrap();
1087 assert_eq!(catalog.shard_key_mode(&events), Some(ShardKeyMode::Ordered));
1088
1089 catalog
1091 .apply_update(RangeOwnership::establish(
1092 events.clone(),
1093 RangeId::new(1),
1094 ShardKeyMode::Ordered,
1095 bounds(b"a", b"m"),
1096 ident("CN=node-a"),
1097 [],
1098 PlacementMetadata::with_replication_factor(3),
1099 ))
1100 .unwrap();
1101 catalog
1102 .apply_update(RangeOwnership::establish(
1103 events.clone(),
1104 RangeId::new(2),
1105 ShardKeyMode::Ordered,
1106 bounds(b"m", b"z"),
1107 ident("CN=node-b"),
1108 [],
1109 PlacementMetadata::with_replication_factor(3),
1110 ))
1111 .unwrap();
1112
1113 assert_eq!(
1114 catalog.route(&events, b"alpha").unwrap().owner(),
1115 &ident("CN=node-a")
1116 );
1117 assert_eq!(
1118 catalog.route(&events, b"mike").unwrap().owner(),
1119 &ident("CN=node-b")
1120 );
1121 assert!(catalog.route(&events, b"zzz").is_none());
1123 }
1124
1125 #[test]
1126 fn declaring_a_conflicting_mode_is_rejected() {
1127 let mut catalog = ShardOwnershipCatalog::new();
1128 let events = collection("events");
1129 catalog
1130 .declare_collection(events.clone(), ShardKeyMode::Ordered)
1131 .unwrap();
1132 catalog
1134 .declare_collection(events.clone(), ShardKeyMode::Ordered)
1135 .unwrap();
1136 let err = catalog
1138 .declare_collection(events.clone(), ShardKeyMode::Hash)
1139 .unwrap_err();
1140 assert_eq!(
1141 err,
1142 CatalogError::ShardKeyModeMismatch {
1143 collection: events.clone(),
1144 declared: ShardKeyMode::Ordered,
1145 attempted: ShardKeyMode::Hash,
1146 }
1147 );
1148 let err = catalog
1150 .apply_update(hash_range(&events, 1, RangeBounds::full(), "CN=node-a"))
1151 .unwrap_err();
1152 assert!(matches!(err, CatalogError::ShardKeyModeMismatch { .. }));
1153 }
1154
1155 #[test]
1156 fn version_bumps_on_owner_transfer_and_epoch_fences() {
1157 let mut catalog = ShardOwnershipCatalog::new();
1158 let orders = collection("orders");
1159 catalog
1160 .apply_update(hash_range(&orders, 1, RangeBounds::full(), "CN=node-a"))
1161 .unwrap();
1162
1163 let current = catalog.range(&orders, RangeId::new(1)).unwrap();
1164 assert_eq!(current.version(), CatalogVersion::initial());
1165 assert_eq!(current.epoch(), OwnershipEpoch::initial());
1166
1167 let moved = current.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]);
1169 let outcome = catalog.apply_update(moved).unwrap();
1170 assert_eq!(outcome, UpdateOutcome::Updated);
1171
1172 let after = catalog.range(&orders, RangeId::new(1)).unwrap();
1173 assert_eq!(after.owner(), &ident("CN=node-b"));
1174 assert_eq!(after.version().value(), 2);
1175 assert_eq!(after.epoch().value(), 2); let replicas_changed = after.update_replicas([ident("CN=node-c")]);
1179 catalog.apply_update(replicas_changed).unwrap();
1180 let after2 = catalog.range(&orders, RangeId::new(1)).unwrap();
1181 assert_eq!(after2.version().value(), 3);
1182 assert_eq!(after2.epoch().value(), 2); assert_eq!(after2.replicas(), &[ident("CN=node-c")]);
1184 }
1185
1186 #[test]
1187 fn stale_update_is_rejected_and_leaves_catalog_unchanged() {
1188 let mut catalog = ShardOwnershipCatalog::new();
1189 let orders = collection("orders");
1190 catalog
1191 .apply_update(hash_range(&orders, 1, RangeBounds::full(), "CN=node-a"))
1192 .unwrap();
1193
1194 let v1 = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
1195 catalog
1197 .apply_update(v1.transfer_to(ident("CN=node-b"), []))
1198 .unwrap();
1199 assert_eq!(
1200 catalog.range(&orders, RangeId::new(1)).unwrap().owner(),
1201 &ident("CN=node-b")
1202 );
1203
1204 let err = catalog.apply_update(v1.clone()).unwrap_err();
1207 assert_eq!(
1208 err,
1209 CatalogError::StaleVersion {
1210 collection: orders.clone(),
1211 range_id: RangeId::new(1),
1212 current: CatalogVersion::initial().next(),
1213 attempted: CatalogVersion::initial(),
1214 }
1215 );
1216 assert_eq!(
1218 catalog.range(&orders, RangeId::new(1)).unwrap().owner(),
1219 &ident("CN=node-b")
1220 );
1221 assert_eq!(
1222 catalog
1223 .range(&orders, RangeId::new(1))
1224 .unwrap()
1225 .version()
1226 .value(),
1227 2
1228 );
1229 }
1230
1231 #[test]
1232 fn overlapping_range_creation_is_rejected() {
1233 let mut catalog = ShardOwnershipCatalog::new();
1234 let orders = collection("orders");
1235 catalog
1236 .apply_update(hash_range(
1237 &orders,
1238 1,
1239 bounds(&[0x00], &[0x80]),
1240 "CN=node-a",
1241 ))
1242 .unwrap();
1243 let err = catalog
1245 .apply_update(hash_range(
1246 &orders,
1247 2,
1248 bounds(&[0x40], &[0xc0]),
1249 "CN=node-b",
1250 ))
1251 .unwrap_err();
1252 assert_eq!(
1253 err,
1254 CatalogError::OverlappingRange {
1255 collection: orders.clone(),
1256 existing: RangeId::new(1),
1257 attempted: RangeId::new(2),
1258 }
1259 );
1260 assert_eq!(catalog.range_count(), 1);
1261 }
1262
1263 #[test]
1264 fn overlapping_range_update_is_rejected() {
1265 let mut catalog = ShardOwnershipCatalog::new();
1266 let orders = collection("orders");
1267 catalog
1268 .apply_update(hash_range(
1269 &orders,
1270 1,
1271 bounds(&[0x00], &[0x80]),
1272 "CN=node-a",
1273 ))
1274 .unwrap();
1275 catalog
1276 .apply_update(hash_range(
1277 &orders,
1278 2,
1279 RangeBounds::new(RangeBound::key([0x80]), RangeBound::Max).unwrap(),
1280 "CN=node-b",
1281 ))
1282 .unwrap();
1283
1284 let widened = catalog
1285 .range(&orders, RangeId::new(1))
1286 .unwrap()
1287 .with_bounds(bounds(&[0x00], &[0xc0]));
1288 let err = catalog.apply_update(widened).unwrap_err();
1289 assert_eq!(
1290 err,
1291 CatalogError::OverlappingRange {
1292 collection: orders.clone(),
1293 existing: RangeId::new(2),
1294 attempted: RangeId::new(1),
1295 }
1296 );
1297 assert_eq!(
1298 catalog.range(&orders, RangeId::new(1)).unwrap().bounds(),
1299 &bounds(&[0x00], &[0x80])
1300 );
1301 }
1302
1303 #[test]
1304 fn catalog_replicates_to_data_members_with_read_visibility() {
1305 let orders = collection("orders");
1308 let mut leader = ShardOwnershipCatalog::new();
1309 let mut data_member = ShardOwnershipCatalog::new();
1310
1311 let create = hash_range(&orders, 1, RangeBounds::full(), "CN=node-a");
1313 leader.apply_update(create.clone()).unwrap();
1314 assert_eq!(
1315 data_member.apply_update(create).unwrap(),
1316 UpdateOutcome::Created
1317 );
1318
1319 assert_eq!(
1321 data_member.route(&orders, b"any-key").unwrap().owner(),
1322 &ident("CN=node-a")
1323 );
1324
1325 let v2 = leader
1327 .range(&orders, RangeId::new(1))
1328 .unwrap()
1329 .transfer_to(ident("CN=node-b"), []);
1330 leader.apply_update(v2.clone()).unwrap();
1331 assert_eq!(
1332 data_member.apply_update(v2.clone()).unwrap(),
1333 UpdateOutcome::Updated
1334 );
1335 assert_eq!(
1336 data_member.route(&orders, b"any-key").unwrap().owner(),
1337 &ident("CN=node-b")
1338 );
1339
1340 let err = data_member.apply_update(v2).unwrap_err();
1343 assert!(matches!(err, CatalogError::StaleVersion { .. }));
1344 assert_eq!(
1345 data_member
1346 .range(&orders, RangeId::new(1))
1347 .unwrap()
1348 .version()
1349 .value(),
1350 2
1351 );
1352 }
1353
1354 #[test]
1355 fn range_bounds_reject_empty_or_inverted() {
1356 assert!(RangeBounds::new(RangeBound::key([0x10]), RangeBound::key([0x10])).is_err());
1357 assert!(RangeBounds::new(RangeBound::key([0x20]), RangeBound::key([0x10])).is_err());
1358 assert!(RangeBounds::new(RangeBound::Max, RangeBound::Min).is_err());
1359 assert!(RangeBounds::full().contains(b"anything"));
1360 }
1361
1362 fn range_with(
1368 coll: &CollectionId,
1369 id: u64,
1370 bnds: RangeBounds,
1371 owner: &str,
1372 replicas: &[&str],
1373 ) -> RangeOwnership {
1374 RangeOwnership::establish(
1375 coll.clone(),
1376 RangeId::new(id),
1377 ShardKeyMode::Hash,
1378 bnds,
1379 ident(owner),
1380 replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
1381 PlacementMetadata::with_replication_factor(3),
1382 )
1383 }
1384
1385 #[test]
1386 fn role_of_distinguishes_owner_replica_and_no_copy() {
1387 let orders = collection("orders");
1388 let range = range_with(&orders, 1, RangeBounds::full(), "CN=node-a", &["CN=node-b"]);
1389
1390 assert_eq!(range.role_of(&ident("CN=node-a")), RangeRole::Owner);
1391 assert_eq!(range.role_of(&ident("CN=node-b")), RangeRole::Replica);
1392 assert_eq!(range.role_of(&ident("CN=node-c")), RangeRole::NoCopy);
1393 assert!(RangeRole::Owner.may_write_public());
1394 assert!(!RangeRole::Replica.may_write_public());
1395 assert!(!RangeRole::NoCopy.may_write_public());
1396 }
1397
1398 #[test]
1399 fn role_is_per_range_not_a_global_node_role() {
1400 let mut catalog = ShardOwnershipCatalog::new();
1403 let orders = collection("orders");
1404 catalog
1405 .apply_update(range_with(
1406 &orders,
1407 1,
1408 RangeBounds::new(RangeBound::Min, RangeBound::key([0x80])).unwrap(),
1409 "CN=node-a",
1410 &["CN=node-b"],
1411 ))
1412 .unwrap();
1413 catalog
1414 .apply_update(range_with(
1415 &orders,
1416 2,
1417 RangeBounds::new(RangeBound::key([0x80]), RangeBound::Max).unwrap(),
1418 "CN=node-b",
1419 &["CN=node-a"],
1420 ))
1421 .unwrap();
1422
1423 let node_a = ident("CN=node-a");
1424 assert_eq!(
1425 catalog.role_at(&node_a, &orders, RangeId::new(1)),
1426 Some(RangeRole::Owner)
1427 );
1428 assert_eq!(
1429 catalog.role_at(&node_a, &orders, RangeId::new(2)),
1430 Some(RangeRole::Replica)
1431 );
1432 assert_eq!(catalog.role_at(&node_a, &orders, RangeId::new(99)), None);
1434 assert_eq!(
1436 catalog.role_at(&node_a, &collection("ghost"), RangeId::new(1)),
1437 None
1438 );
1439 }
1440
1441 #[test]
1442 fn public_write_admitted_on_owner_at_matching_epoch() {
1443 let mut catalog = ShardOwnershipCatalog::new();
1444 let orders = collection("orders");
1445 catalog
1446 .apply_update(range_with(
1447 &orders,
1448 1,
1449 RangeBounds::full(),
1450 "CN=node-a",
1451 &["CN=node-b"],
1452 ))
1453 .unwrap();
1454
1455 let admitted = catalog
1456 .admit_public_write(
1457 &ident("CN=node-a"),
1458 &orders,
1459 b"k",
1460 OwnershipEpoch::initial(),
1461 )
1462 .expect("owner at current epoch may write");
1463 assert_eq!(admitted.owner(), &ident("CN=node-a"));
1464 assert_eq!(admitted.range_id(), RangeId::new(1));
1465 }
1466
1467 #[test]
1468 fn public_write_uses_hash_slot_routing_for_hash_collections() {
1469 let mut catalog = ShardOwnershipCatalog::new();
1470 let orders = collection("orders");
1471 let key = b"tenant:42";
1472 catalog
1473 .apply_update(hash_range(
1474 &orders,
1475 1,
1476 single_hash_slot_bounds(key),
1477 "CN=node-a",
1478 ))
1479 .unwrap();
1480
1481 let admitted = catalog
1482 .admit_public_write(&ident("CN=node-a"), &orders, key, OwnershipEpoch::initial())
1483 .expect("hash owner admits write routed by shard-key slot");
1484 assert_eq!(admitted.range_id(), RangeId::new(1));
1485 }
1486
1487 #[test]
1488 fn public_write_rejected_on_replica_with_routing_error() {
1489 let mut catalog = ShardOwnershipCatalog::new();
1490 let orders = collection("orders");
1491 catalog
1492 .apply_update(range_with(
1493 &orders,
1494 1,
1495 RangeBounds::full(),
1496 "CN=node-a",
1497 &["CN=node-b"],
1498 ))
1499 .unwrap();
1500
1501 let err = catalog
1504 .admit_public_write(
1505 &ident("CN=node-b"),
1506 &orders,
1507 b"k",
1508 OwnershipEpoch::initial(),
1509 )
1510 .unwrap_err();
1511 match err {
1512 RangeWriteReject::NotOwner {
1513 role, ref owner, ..
1514 } => {
1515 assert_eq!(role, RangeRole::Replica);
1516 assert_eq!(owner, &ident("CN=node-a"));
1517 }
1518 other => panic!("expected NotOwner(Replica), got {other:?}"),
1519 }
1520 assert!(err.to_string().contains("route the write to"));
1522 }
1523
1524 #[test]
1525 fn public_write_rejected_on_no_copy_holder() {
1526 let mut catalog = ShardOwnershipCatalog::new();
1527 let orders = collection("orders");
1528 catalog
1529 .apply_update(range_with(
1530 &orders,
1531 1,
1532 RangeBounds::full(),
1533 "CN=node-a",
1534 &["CN=node-b"],
1535 ))
1536 .unwrap();
1537
1538 let err = catalog
1540 .admit_public_write(
1541 &ident("CN=node-c"),
1542 &orders,
1543 b"k",
1544 OwnershipEpoch::initial(),
1545 )
1546 .unwrap_err();
1547 match err {
1548 RangeWriteReject::NotOwner { role, .. } => assert_eq!(role, RangeRole::NoCopy),
1549 other => panic!("expected NotOwner(NoCopy), got {other:?}"),
1550 }
1551 }
1552
1553 #[test]
1554 fn public_write_rejected_on_stale_ownership_epoch() {
1555 let mut catalog = ShardOwnershipCatalog::new();
1559 let orders = collection("orders");
1560 let v1 = range_with(&orders, 1, RangeBounds::full(), "CN=node-a", &["CN=node-b"]);
1561 let original_epoch = v1.epoch();
1562 catalog.apply_update(v1.clone()).unwrap();
1563
1564 let v2 = v1.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]);
1565 catalog.apply_update(v2.clone()).unwrap();
1566 let v3 = v2.transfer_to(ident("CN=node-a"), [ident("CN=node-b")]);
1567 catalog.apply_update(v3.clone()).unwrap();
1568
1569 assert_ne!(original_epoch, v3.epoch());
1571 let err = catalog
1572 .admit_public_write(&ident("CN=node-a"), &orders, b"k", original_epoch)
1573 .unwrap_err();
1574 match err {
1575 RangeWriteReject::StaleEpoch {
1576 expected, current, ..
1577 } => {
1578 assert_eq!(expected, original_epoch);
1579 assert_eq!(current, v3.epoch());
1580 }
1581 other => panic!("expected StaleEpoch, got {other:?}"),
1582 }
1583 assert!(catalog
1585 .admit_public_write(&ident("CN=node-a"), &orders, b"k", v3.epoch())
1586 .is_ok());
1587 }
1588
1589 #[test]
1590 fn public_write_rejected_when_no_range_covers_the_key() {
1591 let catalog = ShardOwnershipCatalog::new();
1592 let orders = collection("orders");
1593 let err = catalog
1594 .admit_public_write(
1595 &ident("CN=node-a"),
1596 &orders,
1597 b"k",
1598 OwnershipEpoch::initial(),
1599 )
1600 .unwrap_err();
1601 assert!(matches!(err, RangeWriteReject::NoRange { .. }));
1602 }
1603
1604 #[test]
1605 fn internal_apply_path_stays_privileged_for_a_public_write_replica() {
1606 use crate::replication::cdc::{ChangeOperation, ChangeRecord, RangeAuthority};
1611
1612 let mut catalog = ShardOwnershipCatalog::new();
1613 let orders = collection("orders");
1614 catalog
1615 .apply_update(range_with(
1616 &orders,
1617 7,
1618 RangeBounds::full(),
1619 "CN=node-a",
1620 &["CN=node-b"],
1621 ))
1622 .unwrap();
1623
1624 assert!(matches!(
1626 catalog
1627 .admit_public_write(
1628 &ident("CN=node-b"),
1629 &orders,
1630 b"k",
1631 OwnershipEpoch::initial()
1632 )
1633 .unwrap_err(),
1634 RangeWriteReject::NotOwner {
1635 role: RangeRole::Replica,
1636 ..
1637 }
1638 ));
1639
1640 let record = ChangeRecord {
1643 term: 1,
1644 lsn: 1,
1645 timestamp: 0,
1646 operation: ChangeOperation::Insert,
1647 collection: orders.as_str().to_string(),
1648 entity_id: 1,
1649 entity_kind: "row".to_string(),
1650 entity_bytes: Some(vec![1]),
1651 metadata: None,
1652 refresh_records: None,
1653 range_id: None,
1654 ownership_epoch: None,
1655 }
1656 .with_range_authority(7, OwnershipEpoch::initial().value());
1657 let fence = RangeAuthority {
1658 range_id: 7,
1659 min_term: 1,
1660 min_ownership_epoch: OwnershipEpoch::initial().value(),
1661 };
1662 assert!(
1663 fence.admit(&record).is_ok(),
1664 "replica internal apply must remain privileged for the owner's changes"
1665 );
1666 }
1667}