1use std::collections::BTreeMap;
47
48use super::identity::NodeIdentity;
49
50#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
58pub struct CollectionId(String);
59
60#[derive(Debug, Clone, PartialEq, Eq)]
61pub struct CollectionIdError;
62
63impl std::fmt::Display for CollectionIdError {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 write!(f, "collection id is empty")
66 }
67}
68
69impl std::error::Error for CollectionIdError {}
70
71impl CollectionId {
72 pub fn new(value: impl AsRef<str>) -> Result<Self, CollectionIdError> {
74 let value = value.as_ref().trim();
75 if value.is_empty() {
76 return Err(CollectionIdError);
77 }
78 Ok(Self(value.to_string()))
79 }
80
81 pub fn as_str(&self) -> &str {
82 &self.0
83 }
84}
85
86impl std::fmt::Display for CollectionId {
87 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 f.write_str(&self.0)
89 }
90}
91
92#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
100pub struct RangeId(u64);
101
102impl RangeId {
103 pub fn new(value: u64) -> Self {
104 Self(value)
105 }
106
107 pub fn value(self) -> u64 {
108 self.0
109 }
110}
111
112impl std::fmt::Display for RangeId {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 write!(f, "{}", self.0)
115 }
116}
117
118#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
127pub enum ShardKeyMode {
128 #[default]
130 Hash,
131 Ordered,
134}
135
136#[derive(Debug, Clone, PartialEq, Eq)]
144pub enum RangeBound {
145 Min,
147 Key(Vec<u8>),
149 Max,
151}
152
153impl RangeBound {
154 pub fn key(bytes: impl Into<Vec<u8>>) -> Self {
156 RangeBound::Key(bytes.into())
157 }
158
159 fn position(&self) -> Position<'_> {
163 match self {
164 RangeBound::Min => Position::Min,
165 RangeBound::Key(k) => Position::Key(k),
166 RangeBound::Max => Position::Max,
167 }
168 }
169}
170
171#[derive(PartialEq, Eq, PartialOrd, Ord)]
172enum Position<'a> {
173 Min,
174 Key(&'a [u8]),
175 Max,
176}
177
178#[derive(Debug, Clone, PartialEq, Eq)]
185pub struct RangeBounds {
186 lower: RangeBound,
187 upper: RangeBound,
188}
189
190#[derive(Debug, Clone, PartialEq, Eq)]
191pub struct RangeBoundsError;
192
193impl std::fmt::Display for RangeBoundsError {
194 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195 write!(
196 f,
197 "range lower bound must be strictly below the upper bound"
198 )
199 }
200}
201
202impl std::error::Error for RangeBoundsError {}
203
204impl RangeBounds {
205 pub fn new(lower: RangeBound, upper: RangeBound) -> Result<Self, RangeBoundsError> {
208 if lower.position() >= upper.position() {
209 return Err(RangeBoundsError);
210 }
211 Ok(Self { lower, upper })
212 }
213
214 pub fn full() -> Self {
216 Self {
217 lower: RangeBound::Min,
218 upper: RangeBound::Max,
219 }
220 }
221
222 pub fn lower(&self) -> &RangeBound {
223 &self.lower
224 }
225
226 pub fn upper(&self) -> &RangeBound {
227 &self.upper
228 }
229
230 pub fn contains(&self, key: &[u8]) -> bool {
233 let key = Position::Key(key);
234 self.lower.position() <= key && key < self.upper.position()
235 }
236
237 pub fn overlaps(&self, other: &RangeBounds) -> bool {
240 self.lower.position() < other.upper.position()
241 && other.lower.position() < self.upper.position()
242 }
243
244 pub fn split_at(&self, at: &[u8]) -> Result<(RangeBounds, RangeBounds), RangeBoundsError> {
253 let at_pos = Position::Key(at);
254 if at_pos <= self.lower.position() || at_pos >= self.upper.position() {
255 return Err(RangeBoundsError);
256 }
257 let lower = RangeBounds {
258 lower: self.lower.clone(),
259 upper: RangeBound::key(at.to_vec()),
260 };
261 let upper = RangeBounds {
262 lower: RangeBound::key(at.to_vec()),
263 upper: self.upper.clone(),
264 };
265 Ok((lower, upper))
266 }
267}
268
269#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
276pub struct CatalogVersion(u64);
277
278impl CatalogVersion {
279 pub fn initial() -> Self {
281 Self(1)
282 }
283
284 pub fn value(self) -> u64 {
285 self.0
286 }
287
288 fn next(self) -> Self {
289 Self(self.0 + 1)
290 }
291}
292
293impl std::fmt::Display for CatalogVersion {
294 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
295 write!(f, "{}", self.0)
296 }
297}
298
299#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
307pub struct OwnershipEpoch(u64);
308
309impl OwnershipEpoch {
310 pub fn initial() -> Self {
312 Self(1)
313 }
314
315 pub fn value(self) -> u64 {
316 self.0
317 }
318
319 fn next(self) -> Self {
320 Self(self.0 + 1)
321 }
322}
323
324impl std::fmt::Display for OwnershipEpoch {
325 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
326 write!(f, "{}", self.0)
327 }
328}
329
330#[derive(Debug, Clone, Default, PartialEq, Eq)]
336pub struct PlacementMetadata {
337 replication_factor: usize,
338 attributes: BTreeMap<String, String>,
339}
340
341impl PlacementMetadata {
342 pub fn with_replication_factor(replication_factor: usize) -> Self {
344 Self {
345 replication_factor,
346 attributes: BTreeMap::new(),
347 }
348 }
349
350 pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
352 self.attributes.insert(key.into(), value.into());
353 self
354 }
355
356 pub fn replication_factor(&self) -> usize {
357 self.replication_factor
358 }
359
360 pub fn attribute(&self, key: &str) -> Option<&str> {
361 self.attributes.get(key).map(String::as_str)
362 }
363}
364
365#[derive(Debug, Clone, PartialEq, Eq)]
376pub struct RangeOwnership {
377 collection: CollectionId,
378 range_id: RangeId,
379 shard_key_mode: ShardKeyMode,
380 bounds: RangeBounds,
381 owner: NodeIdentity,
382 replicas: Vec<NodeIdentity>,
383 epoch: OwnershipEpoch,
384 version: CatalogVersion,
385 placement: PlacementMetadata,
386}
387
388impl RangeOwnership {
389 #[allow(clippy::too_many_arguments)]
392 pub fn establish(
393 collection: CollectionId,
394 range_id: RangeId,
395 shard_key_mode: ShardKeyMode,
396 bounds: RangeBounds,
397 owner: NodeIdentity,
398 replicas: impl IntoIterator<Item = NodeIdentity>,
399 placement: PlacementMetadata,
400 ) -> Self {
401 Self {
402 collection,
403 range_id,
404 shard_key_mode,
405 bounds,
406 owner,
407 replicas: replicas.into_iter().collect(),
408 epoch: OwnershipEpoch::initial(),
409 version: CatalogVersion::initial(),
410 placement,
411 }
412 }
413
414 pub fn collection(&self) -> &CollectionId {
415 &self.collection
416 }
417
418 pub fn range_id(&self) -> RangeId {
419 self.range_id
420 }
421
422 pub fn shard_key_mode(&self) -> ShardKeyMode {
423 self.shard_key_mode
424 }
425
426 pub fn bounds(&self) -> &RangeBounds {
427 &self.bounds
428 }
429
430 pub fn owner(&self) -> &NodeIdentity {
431 &self.owner
432 }
433
434 pub fn replicas(&self) -> &[NodeIdentity] {
435 &self.replicas
436 }
437
438 pub fn epoch(&self) -> OwnershipEpoch {
439 self.epoch
440 }
441
442 pub fn version(&self) -> CatalogVersion {
443 self.version
444 }
445
446 pub fn placement(&self) -> &PlacementMetadata {
447 &self.placement
448 }
449
450 fn key(&self) -> (CollectionId, RangeId) {
452 (self.collection.clone(), self.range_id)
453 }
454
455 pub fn transfer_to(
459 &self,
460 new_owner: NodeIdentity,
461 new_replicas: impl IntoIterator<Item = NodeIdentity>,
462 ) -> Self {
463 Self {
464 owner: new_owner,
465 replicas: new_replicas.into_iter().collect(),
466 epoch: self.epoch.next(),
467 version: self.version.next(),
468 ..self.clone()
469 }
470 }
471
472 pub fn update_replicas(&self, new_replicas: impl IntoIterator<Item = NodeIdentity>) -> Self {
475 Self {
476 replicas: new_replicas.into_iter().collect(),
477 version: self.version.next(),
478 ..self.clone()
479 }
480 }
481
482 pub fn update_placement(&self, placement: PlacementMetadata) -> Self {
485 Self {
486 placement,
487 version: self.version.next(),
488 ..self.clone()
489 }
490 }
491
492 pub fn with_bounds(&self, bounds: RangeBounds) -> Self {
498 Self {
499 bounds,
500 version: self.version.next(),
501 ..self.clone()
502 }
503 }
504
505 pub fn role_of(&self, node: &NodeIdentity) -> RangeRole {
515 if self.owner == *node {
516 RangeRole::Owner
517 } else if self.replicas.iter().any(|replica| replica == node) {
518 RangeRole::Replica
519 } else {
520 RangeRole::NoCopy
521 }
522 }
523}
524
525#[derive(Debug, Clone, Copy, PartialEq, Eq)]
537pub enum RangeRole {
538 Owner,
541 Replica,
545 NoCopy,
547}
548
549impl RangeRole {
550 pub fn may_write_public(self) -> bool {
553 matches!(self, RangeRole::Owner)
554 }
555
556 fn label(self) -> &'static str {
557 match self {
558 RangeRole::Owner => "owner",
559 RangeRole::Replica => "replica",
560 RangeRole::NoCopy => "no-copy",
561 }
562 }
563}
564
565#[derive(Debug, Clone, Copy, PartialEq, Eq)]
567pub enum UpdateOutcome {
568 Created,
570 Updated,
572}
573
574#[derive(Debug, Clone, PartialEq, Eq)]
576pub enum CatalogError {
577 StaleVersion {
581 collection: CollectionId,
582 range_id: RangeId,
583 current: CatalogVersion,
584 attempted: CatalogVersion,
585 },
586 ShardKeyModeMismatch {
589 collection: CollectionId,
590 declared: ShardKeyMode,
591 attempted: ShardKeyMode,
592 },
593 OverlappingRange {
596 collection: CollectionId,
597 existing: RangeId,
598 attempted: RangeId,
599 },
600}
601
602impl std::fmt::Display for CatalogError {
603 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
604 match self {
605 Self::StaleVersion {
606 collection,
607 range_id,
608 current,
609 attempted,
610 } => write!(
611 f,
612 "stale catalog update for {collection}/{range_id}: current version {current}, attempted {attempted}"
613 ),
614 Self::ShardKeyModeMismatch {
615 collection,
616 declared,
617 attempted,
618 } => write!(
619 f,
620 "collection {collection} is declared {declared:?} but range uses {attempted:?}"
621 ),
622 Self::OverlappingRange {
623 collection,
624 existing,
625 attempted,
626 } => write!(
627 f,
628 "range {attempted} overlaps existing range {existing} of collection {collection}"
629 ),
630 }
631 }
632}
633
634impl std::error::Error for CatalogError {}
635
636#[derive(Debug, Clone, PartialEq, Eq)]
646pub enum RangeWriteReject {
647 NoRange { collection: CollectionId },
650 NotOwner {
657 collection: CollectionId,
658 range_id: RangeId,
659 role: RangeRole,
660 owner: NodeIdentity,
661 },
662 StaleEpoch {
667 collection: CollectionId,
668 range_id: RangeId,
669 expected: OwnershipEpoch,
670 current: OwnershipEpoch,
671 },
672}
673
674impl std::fmt::Display for RangeWriteReject {
675 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
676 match self {
677 Self::NoRange { collection } => write!(
678 f,
679 "no range of collection {collection} covers the routed key — re-resolve routing"
680 ),
681 Self::NotOwner {
682 collection,
683 range_id,
684 role,
685 owner,
686 } => write!(
687 f,
688 "this node is {} of {collection}/{range_id}, not its owner — route the write to {owner}",
689 role.label()
690 ),
691 Self::StaleEpoch {
692 collection,
693 range_id,
694 expected,
695 current,
696 } => write!(
697 f,
698 "stale ownership epoch for {collection}/{range_id}: write authorised under epoch {expected}, current is {current}"
699 ),
700 }
701 }
702}
703
704impl std::error::Error for RangeWriteReject {}
705
706#[derive(Debug, Clone, Default)]
717pub struct ShardOwnershipCatalog {
718 collections: BTreeMap<CollectionId, ShardKeyMode>,
723 ranges: BTreeMap<(CollectionId, RangeId), RangeOwnership>,
724}
725
726impl ShardOwnershipCatalog {
727 pub fn new() -> Self {
729 Self::default()
730 }
731
732 pub fn declare_collection(
740 &mut self,
741 collection: CollectionId,
742 mode: ShardKeyMode,
743 ) -> Result<(), CatalogError> {
744 match self.collections.get(&collection) {
745 Some(&declared) if declared != mode => Err(CatalogError::ShardKeyModeMismatch {
746 collection,
747 declared,
748 attempted: mode,
749 }),
750 _ => {
751 self.collections.insert(collection, mode);
752 Ok(())
753 }
754 }
755 }
756
757 pub fn shard_key_mode(&self, collection: &CollectionId) -> Option<ShardKeyMode> {
760 self.collections.get(collection).copied()
761 }
762
763 pub fn apply_update(&mut self, entry: RangeOwnership) -> Result<UpdateOutcome, CatalogError> {
774 match self.collections.get(entry.collection()) {
776 Some(&declared) if declared != entry.shard_key_mode() => {
777 return Err(CatalogError::ShardKeyModeMismatch {
778 collection: entry.collection().clone(),
779 declared,
780 attempted: entry.shard_key_mode(),
781 });
782 }
783 _ => {}
784 }
785
786 let key = entry.key();
787 match self.ranges.get(&key) {
788 Some(current) => {
789 if entry.version() <= current.version() {
790 return Err(CatalogError::StaleVersion {
791 collection: entry.collection().clone(),
792 range_id: entry.range_id(),
793 current: current.version(),
794 attempted: entry.version(),
795 });
796 }
797 self.collections
798 .insert(entry.collection().clone(), entry.shard_key_mode());
799 self.ranges.insert(key, entry);
800 Ok(UpdateOutcome::Updated)
801 }
802 None => {
803 if let Some(existing) = self
806 .ranges_for(entry.collection())
807 .find(|r| r.bounds().overlaps(entry.bounds()))
808 {
809 return Err(CatalogError::OverlappingRange {
810 collection: entry.collection().clone(),
811 existing: existing.range_id(),
812 attempted: entry.range_id(),
813 });
814 }
815 self.collections
816 .insert(entry.collection().clone(), entry.shard_key_mode());
817 self.ranges.insert(key, entry);
818 Ok(UpdateOutcome::Created)
819 }
820 }
821 }
822
823 pub fn range(&self, collection: &CollectionId, range_id: RangeId) -> Option<&RangeOwnership> {
826 self.ranges.get(&(collection.clone(), range_id))
827 }
828
829 pub fn ranges_for<'a>(
831 &'a self,
832 collection: &CollectionId,
833 ) -> impl Iterator<Item = &'a RangeOwnership> {
834 let collection = collection.clone();
835 self.ranges
836 .iter()
837 .filter(move |((c, _), _)| *c == collection)
838 .map(|(_, r)| r)
839 }
840
841 pub fn route(&self, collection: &CollectionId, key: &[u8]) -> Option<&RangeOwnership> {
846 self.ranges_for(collection)
847 .find(|r| r.bounds().contains(key))
848 }
849
850 pub fn role_at(
855 &self,
856 node: &NodeIdentity,
857 collection: &CollectionId,
858 range_id: RangeId,
859 ) -> Option<RangeRole> {
860 self.range(collection, range_id)
861 .map(|range| range.role_of(node))
862 }
863
864 pub fn admit_public_write(
880 &self,
881 node: &NodeIdentity,
882 collection: &CollectionId,
883 key: &[u8],
884 expected_epoch: OwnershipEpoch,
885 ) -> Result<&RangeOwnership, RangeWriteReject> {
886 let range = self
887 .route(collection, key)
888 .ok_or_else(|| RangeWriteReject::NoRange {
889 collection: collection.clone(),
890 })?;
891 let role = range.role_of(node);
892 if !role.may_write_public() {
893 return Err(RangeWriteReject::NotOwner {
894 collection: collection.clone(),
895 range_id: range.range_id(),
896 role,
897 owner: range.owner().clone(),
898 });
899 }
900 if expected_epoch != range.epoch() {
901 return Err(RangeWriteReject::StaleEpoch {
902 collection: collection.clone(),
903 range_id: range.range_id(),
904 expected: expected_epoch,
905 current: range.epoch(),
906 });
907 }
908 Ok(range)
909 }
910
911 pub fn range_count(&self) -> usize {
913 self.ranges.len()
914 }
915
916 pub fn entries(&self) -> impl Iterator<Item = &RangeOwnership> {
920 self.ranges.values()
921 }
922}
923
924#[cfg(test)]
925mod tests {
926 use super::*;
927
928 fn collection(name: &str) -> CollectionId {
929 CollectionId::new(name).unwrap()
930 }
931
932 fn ident(cn: &str) -> NodeIdentity {
933 NodeIdentity::from_certificate_subject(cn).unwrap()
934 }
935
936 fn bounds(lower: &[u8], upper: &[u8]) -> RangeBounds {
937 RangeBounds::new(RangeBound::key(lower), RangeBound::key(upper)).unwrap()
938 }
939
940 fn hash_range(coll: &CollectionId, id: u64, bnds: RangeBounds, owner: &str) -> RangeOwnership {
942 RangeOwnership::establish(
943 coll.clone(),
944 RangeId::new(id),
945 ShardKeyMode::Hash,
946 bnds,
947 ident(owner),
948 [ident("CN=replica-1")],
949 PlacementMetadata::with_replication_factor(3),
950 )
951 }
952
953 #[test]
954 fn empty_catalog_creation() {
955 let catalog = ShardOwnershipCatalog::new();
956 assert_eq!(catalog.range_count(), 0);
957 assert!(catalog.shard_key_mode(&collection("orders")).is_none());
958 }
959
960 #[test]
961 fn hash_is_the_default_shard_key_mode() {
962 assert_eq!(ShardKeyMode::default(), ShardKeyMode::Hash);
965
966 let mut catalog = ShardOwnershipCatalog::new();
967 let orders = collection("orders");
968 catalog
969 .apply_update(hash_range(&orders, 1, RangeBounds::full(), "CN=node-a"))
970 .unwrap();
971 assert_eq!(catalog.shard_key_mode(&orders), Some(ShardKeyMode::Hash));
972 }
973
974 #[test]
975 fn hash_range_entry_routes_to_owner() {
976 let mut catalog = ShardOwnershipCatalog::new();
977 let orders = collection("orders");
978
979 catalog
981 .apply_update(hash_range(
982 &orders,
983 1,
984 RangeBounds::new(RangeBound::Min, RangeBound::key([0x80])).unwrap(),
985 "CN=node-a",
986 ))
987 .unwrap();
988 catalog
989 .apply_update(hash_range(
990 &orders,
991 2,
992 RangeBounds::new(RangeBound::key([0x80]), RangeBound::Max).unwrap(),
993 "CN=node-b",
994 ))
995 .unwrap();
996
997 assert_eq!(
999 catalog.route(&orders, &[0x10]).unwrap().owner(),
1000 &ident("CN=node-a")
1001 );
1002 assert_eq!(
1003 catalog.route(&orders, &[0x80]).unwrap().owner(),
1004 &ident("CN=node-b")
1005 );
1006 assert_eq!(
1007 catalog.route(&orders, &[0xff]).unwrap().owner(),
1008 &ident("CN=node-b")
1009 );
1010 let r = catalog.route(&orders, &[0x10]).unwrap();
1012 assert_eq!(r.replicas(), &[ident("CN=replica-1")]);
1013 assert_eq!(r.epoch(), OwnershipEpoch::initial());
1014 }
1015
1016 #[test]
1017 fn ordered_mode_can_be_declared_and_routed() {
1018 let mut catalog = ShardOwnershipCatalog::new();
1019 let events = collection("events");
1020 catalog
1021 .declare_collection(events.clone(), ShardKeyMode::Ordered)
1022 .unwrap();
1023 assert_eq!(catalog.shard_key_mode(&events), Some(ShardKeyMode::Ordered));
1024
1025 catalog
1027 .apply_update(RangeOwnership::establish(
1028 events.clone(),
1029 RangeId::new(1),
1030 ShardKeyMode::Ordered,
1031 bounds(b"a", b"m"),
1032 ident("CN=node-a"),
1033 [],
1034 PlacementMetadata::with_replication_factor(3),
1035 ))
1036 .unwrap();
1037 catalog
1038 .apply_update(RangeOwnership::establish(
1039 events.clone(),
1040 RangeId::new(2),
1041 ShardKeyMode::Ordered,
1042 bounds(b"m", b"z"),
1043 ident("CN=node-b"),
1044 [],
1045 PlacementMetadata::with_replication_factor(3),
1046 ))
1047 .unwrap();
1048
1049 assert_eq!(
1050 catalog.route(&events, b"alpha").unwrap().owner(),
1051 &ident("CN=node-a")
1052 );
1053 assert_eq!(
1054 catalog.route(&events, b"mike").unwrap().owner(),
1055 &ident("CN=node-b")
1056 );
1057 assert!(catalog.route(&events, b"zzz").is_none());
1059 }
1060
1061 #[test]
1062 fn declaring_a_conflicting_mode_is_rejected() {
1063 let mut catalog = ShardOwnershipCatalog::new();
1064 let events = collection("events");
1065 catalog
1066 .declare_collection(events.clone(), ShardKeyMode::Ordered)
1067 .unwrap();
1068 catalog
1070 .declare_collection(events.clone(), ShardKeyMode::Ordered)
1071 .unwrap();
1072 let err = catalog
1074 .declare_collection(events.clone(), ShardKeyMode::Hash)
1075 .unwrap_err();
1076 assert_eq!(
1077 err,
1078 CatalogError::ShardKeyModeMismatch {
1079 collection: events.clone(),
1080 declared: ShardKeyMode::Ordered,
1081 attempted: ShardKeyMode::Hash,
1082 }
1083 );
1084 let err = catalog
1086 .apply_update(hash_range(&events, 1, RangeBounds::full(), "CN=node-a"))
1087 .unwrap_err();
1088 assert!(matches!(err, CatalogError::ShardKeyModeMismatch { .. }));
1089 }
1090
1091 #[test]
1092 fn version_bumps_on_owner_transfer_and_epoch_fences() {
1093 let mut catalog = ShardOwnershipCatalog::new();
1094 let orders = collection("orders");
1095 catalog
1096 .apply_update(hash_range(&orders, 1, RangeBounds::full(), "CN=node-a"))
1097 .unwrap();
1098
1099 let current = catalog.range(&orders, RangeId::new(1)).unwrap();
1100 assert_eq!(current.version(), CatalogVersion::initial());
1101 assert_eq!(current.epoch(), OwnershipEpoch::initial());
1102
1103 let moved = current.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]);
1105 let outcome = catalog.apply_update(moved).unwrap();
1106 assert_eq!(outcome, UpdateOutcome::Updated);
1107
1108 let after = catalog.range(&orders, RangeId::new(1)).unwrap();
1109 assert_eq!(after.owner(), &ident("CN=node-b"));
1110 assert_eq!(after.version().value(), 2);
1111 assert_eq!(after.epoch().value(), 2); let replicas_changed = after.update_replicas([ident("CN=node-c")]);
1115 catalog.apply_update(replicas_changed).unwrap();
1116 let after2 = catalog.range(&orders, RangeId::new(1)).unwrap();
1117 assert_eq!(after2.version().value(), 3);
1118 assert_eq!(after2.epoch().value(), 2); assert_eq!(after2.replicas(), &[ident("CN=node-c")]);
1120 }
1121
1122 #[test]
1123 fn stale_update_is_rejected_and_leaves_catalog_unchanged() {
1124 let mut catalog = ShardOwnershipCatalog::new();
1125 let orders = collection("orders");
1126 catalog
1127 .apply_update(hash_range(&orders, 1, RangeBounds::full(), "CN=node-a"))
1128 .unwrap();
1129
1130 let v1 = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
1131 catalog
1133 .apply_update(v1.transfer_to(ident("CN=node-b"), []))
1134 .unwrap();
1135 assert_eq!(
1136 catalog.range(&orders, RangeId::new(1)).unwrap().owner(),
1137 &ident("CN=node-b")
1138 );
1139
1140 let err = catalog.apply_update(v1.clone()).unwrap_err();
1143 assert_eq!(
1144 err,
1145 CatalogError::StaleVersion {
1146 collection: orders.clone(),
1147 range_id: RangeId::new(1),
1148 current: CatalogVersion::initial().next(),
1149 attempted: CatalogVersion::initial(),
1150 }
1151 );
1152 assert_eq!(
1154 catalog.range(&orders, RangeId::new(1)).unwrap().owner(),
1155 &ident("CN=node-b")
1156 );
1157 assert_eq!(
1158 catalog
1159 .range(&orders, RangeId::new(1))
1160 .unwrap()
1161 .version()
1162 .value(),
1163 2
1164 );
1165 }
1166
1167 #[test]
1168 fn overlapping_range_creation_is_rejected() {
1169 let mut catalog = ShardOwnershipCatalog::new();
1170 let orders = collection("orders");
1171 catalog
1172 .apply_update(hash_range(
1173 &orders,
1174 1,
1175 bounds(&[0x00], &[0x80]),
1176 "CN=node-a",
1177 ))
1178 .unwrap();
1179 let err = catalog
1181 .apply_update(hash_range(
1182 &orders,
1183 2,
1184 bounds(&[0x40], &[0xc0]),
1185 "CN=node-b",
1186 ))
1187 .unwrap_err();
1188 assert_eq!(
1189 err,
1190 CatalogError::OverlappingRange {
1191 collection: orders.clone(),
1192 existing: RangeId::new(1),
1193 attempted: RangeId::new(2),
1194 }
1195 );
1196 assert_eq!(catalog.range_count(), 1);
1197 }
1198
1199 #[test]
1200 fn catalog_replicates_to_data_members_with_read_visibility() {
1201 let orders = collection("orders");
1204 let mut leader = ShardOwnershipCatalog::new();
1205 let mut data_member = ShardOwnershipCatalog::new();
1206
1207 let create = hash_range(&orders, 1, RangeBounds::full(), "CN=node-a");
1209 leader.apply_update(create.clone()).unwrap();
1210 assert_eq!(
1211 data_member.apply_update(create).unwrap(),
1212 UpdateOutcome::Created
1213 );
1214
1215 assert_eq!(
1217 data_member.route(&orders, b"any-key").unwrap().owner(),
1218 &ident("CN=node-a")
1219 );
1220
1221 let v2 = leader
1223 .range(&orders, RangeId::new(1))
1224 .unwrap()
1225 .transfer_to(ident("CN=node-b"), []);
1226 leader.apply_update(v2.clone()).unwrap();
1227 assert_eq!(
1228 data_member.apply_update(v2.clone()).unwrap(),
1229 UpdateOutcome::Updated
1230 );
1231 assert_eq!(
1232 data_member.route(&orders, b"any-key").unwrap().owner(),
1233 &ident("CN=node-b")
1234 );
1235
1236 let err = data_member.apply_update(v2).unwrap_err();
1239 assert!(matches!(err, CatalogError::StaleVersion { .. }));
1240 assert_eq!(
1241 data_member
1242 .range(&orders, RangeId::new(1))
1243 .unwrap()
1244 .version()
1245 .value(),
1246 2
1247 );
1248 }
1249
1250 #[test]
1251 fn range_bounds_reject_empty_or_inverted() {
1252 assert!(RangeBounds::new(RangeBound::key([0x10]), RangeBound::key([0x10])).is_err());
1253 assert!(RangeBounds::new(RangeBound::key([0x20]), RangeBound::key([0x10])).is_err());
1254 assert!(RangeBounds::new(RangeBound::Max, RangeBound::Min).is_err());
1255 assert!(RangeBounds::full().contains(b"anything"));
1256 }
1257
1258 fn range_with(
1264 coll: &CollectionId,
1265 id: u64,
1266 bnds: RangeBounds,
1267 owner: &str,
1268 replicas: &[&str],
1269 ) -> RangeOwnership {
1270 RangeOwnership::establish(
1271 coll.clone(),
1272 RangeId::new(id),
1273 ShardKeyMode::Hash,
1274 bnds,
1275 ident(owner),
1276 replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
1277 PlacementMetadata::with_replication_factor(3),
1278 )
1279 }
1280
1281 #[test]
1282 fn role_of_distinguishes_owner_replica_and_no_copy() {
1283 let orders = collection("orders");
1284 let range = range_with(&orders, 1, RangeBounds::full(), "CN=node-a", &["CN=node-b"]);
1285
1286 assert_eq!(range.role_of(&ident("CN=node-a")), RangeRole::Owner);
1287 assert_eq!(range.role_of(&ident("CN=node-b")), RangeRole::Replica);
1288 assert_eq!(range.role_of(&ident("CN=node-c")), RangeRole::NoCopy);
1289 assert!(RangeRole::Owner.may_write_public());
1290 assert!(!RangeRole::Replica.may_write_public());
1291 assert!(!RangeRole::NoCopy.may_write_public());
1292 }
1293
1294 #[test]
1295 fn role_is_per_range_not_a_global_node_role() {
1296 let mut catalog = ShardOwnershipCatalog::new();
1299 let orders = collection("orders");
1300 catalog
1301 .apply_update(range_with(
1302 &orders,
1303 1,
1304 RangeBounds::new(RangeBound::Min, RangeBound::key([0x80])).unwrap(),
1305 "CN=node-a",
1306 &["CN=node-b"],
1307 ))
1308 .unwrap();
1309 catalog
1310 .apply_update(range_with(
1311 &orders,
1312 2,
1313 RangeBounds::new(RangeBound::key([0x80]), RangeBound::Max).unwrap(),
1314 "CN=node-b",
1315 &["CN=node-a"],
1316 ))
1317 .unwrap();
1318
1319 let node_a = ident("CN=node-a");
1320 assert_eq!(
1321 catalog.role_at(&node_a, &orders, RangeId::new(1)),
1322 Some(RangeRole::Owner)
1323 );
1324 assert_eq!(
1325 catalog.role_at(&node_a, &orders, RangeId::new(2)),
1326 Some(RangeRole::Replica)
1327 );
1328 assert_eq!(catalog.role_at(&node_a, &orders, RangeId::new(99)), None);
1330 assert_eq!(
1332 catalog.role_at(&node_a, &collection("ghost"), RangeId::new(1)),
1333 None
1334 );
1335 }
1336
1337 #[test]
1338 fn public_write_admitted_on_owner_at_matching_epoch() {
1339 let mut catalog = ShardOwnershipCatalog::new();
1340 let orders = collection("orders");
1341 catalog
1342 .apply_update(range_with(
1343 &orders,
1344 1,
1345 RangeBounds::full(),
1346 "CN=node-a",
1347 &["CN=node-b"],
1348 ))
1349 .unwrap();
1350
1351 let admitted = catalog
1352 .admit_public_write(
1353 &ident("CN=node-a"),
1354 &orders,
1355 b"k",
1356 OwnershipEpoch::initial(),
1357 )
1358 .expect("owner at current epoch may write");
1359 assert_eq!(admitted.owner(), &ident("CN=node-a"));
1360 assert_eq!(admitted.range_id(), RangeId::new(1));
1361 }
1362
1363 #[test]
1364 fn public_write_rejected_on_replica_with_routing_error() {
1365 let mut catalog = ShardOwnershipCatalog::new();
1366 let orders = collection("orders");
1367 catalog
1368 .apply_update(range_with(
1369 &orders,
1370 1,
1371 RangeBounds::full(),
1372 "CN=node-a",
1373 &["CN=node-b"],
1374 ))
1375 .unwrap();
1376
1377 let err = catalog
1380 .admit_public_write(
1381 &ident("CN=node-b"),
1382 &orders,
1383 b"k",
1384 OwnershipEpoch::initial(),
1385 )
1386 .unwrap_err();
1387 match err {
1388 RangeWriteReject::NotOwner {
1389 role, ref owner, ..
1390 } => {
1391 assert_eq!(role, RangeRole::Replica);
1392 assert_eq!(owner, &ident("CN=node-a"));
1393 }
1394 other => panic!("expected NotOwner(Replica), got {other:?}"),
1395 }
1396 assert!(err.to_string().contains("route the write to"));
1398 }
1399
1400 #[test]
1401 fn public_write_rejected_on_no_copy_holder() {
1402 let mut catalog = ShardOwnershipCatalog::new();
1403 let orders = collection("orders");
1404 catalog
1405 .apply_update(range_with(
1406 &orders,
1407 1,
1408 RangeBounds::full(),
1409 "CN=node-a",
1410 &["CN=node-b"],
1411 ))
1412 .unwrap();
1413
1414 let err = catalog
1416 .admit_public_write(
1417 &ident("CN=node-c"),
1418 &orders,
1419 b"k",
1420 OwnershipEpoch::initial(),
1421 )
1422 .unwrap_err();
1423 match err {
1424 RangeWriteReject::NotOwner { role, .. } => assert_eq!(role, RangeRole::NoCopy),
1425 other => panic!("expected NotOwner(NoCopy), got {other:?}"),
1426 }
1427 }
1428
1429 #[test]
1430 fn public_write_rejected_on_stale_ownership_epoch() {
1431 let mut catalog = ShardOwnershipCatalog::new();
1435 let orders = collection("orders");
1436 let v1 = range_with(&orders, 1, RangeBounds::full(), "CN=node-a", &["CN=node-b"]);
1437 let original_epoch = v1.epoch();
1438 catalog.apply_update(v1.clone()).unwrap();
1439
1440 let v2 = v1.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]);
1441 catalog.apply_update(v2.clone()).unwrap();
1442 let v3 = v2.transfer_to(ident("CN=node-a"), [ident("CN=node-b")]);
1443 catalog.apply_update(v3.clone()).unwrap();
1444
1445 assert_ne!(original_epoch, v3.epoch());
1447 let err = catalog
1448 .admit_public_write(&ident("CN=node-a"), &orders, b"k", original_epoch)
1449 .unwrap_err();
1450 match err {
1451 RangeWriteReject::StaleEpoch {
1452 expected, current, ..
1453 } => {
1454 assert_eq!(expected, original_epoch);
1455 assert_eq!(current, v3.epoch());
1456 }
1457 other => panic!("expected StaleEpoch, got {other:?}"),
1458 }
1459 assert!(catalog
1461 .admit_public_write(&ident("CN=node-a"), &orders, b"k", v3.epoch())
1462 .is_ok());
1463 }
1464
1465 #[test]
1466 fn public_write_rejected_when_no_range_covers_the_key() {
1467 let catalog = ShardOwnershipCatalog::new();
1468 let orders = collection("orders");
1469 let err = catalog
1470 .admit_public_write(
1471 &ident("CN=node-a"),
1472 &orders,
1473 b"k",
1474 OwnershipEpoch::initial(),
1475 )
1476 .unwrap_err();
1477 assert!(matches!(err, RangeWriteReject::NoRange { .. }));
1478 }
1479
1480 #[test]
1481 fn internal_apply_path_stays_privileged_for_a_public_write_replica() {
1482 use crate::replication::cdc::{ChangeOperation, ChangeRecord, RangeAuthority};
1487
1488 let mut catalog = ShardOwnershipCatalog::new();
1489 let orders = collection("orders");
1490 catalog
1491 .apply_update(range_with(
1492 &orders,
1493 7,
1494 RangeBounds::full(),
1495 "CN=node-a",
1496 &["CN=node-b"],
1497 ))
1498 .unwrap();
1499
1500 assert!(matches!(
1502 catalog
1503 .admit_public_write(
1504 &ident("CN=node-b"),
1505 &orders,
1506 b"k",
1507 OwnershipEpoch::initial()
1508 )
1509 .unwrap_err(),
1510 RangeWriteReject::NotOwner {
1511 role: RangeRole::Replica,
1512 ..
1513 }
1514 ));
1515
1516 let record = ChangeRecord {
1519 term: 1,
1520 lsn: 1,
1521 timestamp: 0,
1522 operation: ChangeOperation::Insert,
1523 collection: orders.as_str().to_string(),
1524 entity_id: 1,
1525 entity_kind: "row".to_string(),
1526 entity_bytes: Some(vec![1]),
1527 metadata: None,
1528 refresh_records: None,
1529 range_id: None,
1530 ownership_epoch: None,
1531 }
1532 .with_range_authority(7, OwnershipEpoch::initial().value());
1533 let fence = RangeAuthority {
1534 range_id: 7,
1535 min_term: 1,
1536 min_ownership_epoch: OwnershipEpoch::initial().value(),
1537 };
1538 assert!(
1539 fence.admit(&record).is_ok(),
1540 "replica internal apply must remain privileged for the owner's changes"
1541 );
1542 }
1543}