1use std::collections::BTreeMap;
45
46use super::identity::NodeIdentity;
47use super::ownership::{CollectionId, OwnershipEpoch, RangeId, ShardOwnershipCatalog};
48use super::ownership_transition::CommitWatermark;
49
50#[derive(Debug, Clone, PartialEq, Eq)]
55pub struct KeyTarget {
56 collection: CollectionId,
57 key: Vec<u8>,
58}
59
60impl KeyTarget {
61 pub fn new(collection: CollectionId, key: impl Into<Vec<u8>>) -> Self {
62 Self {
63 collection,
64 key: key.into(),
65 }
66 }
67
68 pub fn collection(&self) -> &CollectionId {
69 &self.collection
70 }
71
72 pub fn key(&self) -> &[u8] {
73 &self.key
74 }
75}
76
77#[derive(Debug, Clone, PartialEq, Eq)]
80pub struct ResolvedTarget {
81 collection: CollectionId,
82 key: Vec<u8>,
83 range_id: RangeId,
84 owner: NodeIdentity,
85 epoch: OwnershipEpoch,
86}
87
88impl ResolvedTarget {
89 pub fn collection(&self) -> &CollectionId {
90 &self.collection
91 }
92
93 pub fn key(&self) -> &[u8] {
94 &self.key
95 }
96
97 pub fn range_id(&self) -> RangeId {
98 self.range_id
99 }
100
101 pub fn owner(&self) -> &NodeIdentity {
102 &self.owner
103 }
104
105 pub fn epoch(&self) -> OwnershipEpoch {
106 self.epoch
107 }
108}
109
110#[derive(Debug, Clone, PartialEq, Eq)]
114pub struct RangeParticipant {
115 collection: CollectionId,
116 range_id: RangeId,
117 epoch: OwnershipEpoch,
118}
119
120impl RangeParticipant {
121 pub fn collection(&self) -> &CollectionId {
122 &self.collection
123 }
124
125 pub fn range_id(&self) -> RangeId {
126 self.range_id
127 }
128
129 pub fn epoch(&self) -> OwnershipEpoch {
130 self.epoch
131 }
132}
133
134#[derive(Debug, Clone, PartialEq, Eq)]
139pub struct WriterParticipation {
140 writer: NodeIdentity,
141 ranges: Vec<RangeParticipant>,
142}
143
144impl WriterParticipation {
145 pub fn writer(&self) -> &NodeIdentity {
146 &self.writer
147 }
148
149 pub fn ranges(&self) -> &[RangeParticipant] {
150 &self.ranges
151 }
152}
153
154#[derive(Debug, Clone, PartialEq, Eq)]
159pub struct WriteTransactionPlan {
160 participation: WriterParticipation,
161}
162
163impl WriteTransactionPlan {
164 pub fn writer(&self) -> &NodeIdentity {
166 self.participation.writer()
167 }
168
169 pub fn ranges(&self) -> &[RangeParticipant] {
171 self.participation.ranges()
172 }
173}
174
175#[derive(Debug, Clone, PartialEq, Eq)]
177pub enum WriteTransactionReject {
178 Empty,
180 Unroutable {
183 collection: CollectionId,
184 key: Vec<u8>,
185 },
186 CrossRange { writers: Vec<WriterParticipation> },
191}
192
193impl std::fmt::Display for WriteTransactionReject {
194 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195 match self {
196 Self::Empty => write!(f, "write transaction names no targets"),
197 Self::Unroutable { collection, key } => write!(
198 f,
199 "no range of collection {collection} covers key {} — re-resolve routing",
200 DisplayKey(key)
201 ),
202 Self::CrossRange { writers } => {
203 write!(
204 f,
205 "cross-range write transaction spans {} writers and is unsupported: ",
206 writers.len()
207 )?;
208 for (i, w) in writers.iter().enumerate() {
209 if i > 0 {
210 write!(f, ", ")?;
211 }
212 write!(f, "{} owns ", w.writer())?;
213 for (j, r) in w.ranges().iter().enumerate() {
214 if j > 0 {
215 write!(f, "+")?;
216 }
217 write!(f, "{}/{}", r.collection(), r.range_id())?;
218 }
219 }
220 Ok(())
221 }
222 }
223 }
224}
225
226impl std::error::Error for WriteTransactionReject {}
227
228#[derive(Debug, Clone, PartialEq, Eq)]
230pub struct ReadLeg {
231 owner: NodeIdentity,
232 targets: Vec<ResolvedTarget>,
233}
234
235impl ReadLeg {
236 pub fn owner(&self) -> &NodeIdentity {
237 &self.owner
238 }
239
240 pub fn targets(&self) -> &[ResolvedTarget] {
241 &self.targets
242 }
243}
244
245#[derive(Debug, Clone, PartialEq, Eq)]
252pub struct ReadFanout {
253 legs: Vec<ReadLeg>,
254}
255
256impl ReadFanout {
257 pub fn legs(&self) -> &[ReadLeg] {
259 &self.legs
260 }
261
262 pub fn is_cross_range(&self) -> bool {
264 self.legs.len() > 1
265 }
266}
267
268#[derive(Debug, Clone, PartialEq, Eq)]
270pub enum ReadFanoutReject {
271 Empty,
273 Unroutable {
275 collection: CollectionId,
276 key: Vec<u8>,
277 },
278}
279
280impl std::fmt::Display for ReadFanoutReject {
281 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
282 match self {
283 Self::Empty => write!(f, "read fanout names no targets"),
284 Self::Unroutable { collection, key } => write!(
285 f,
286 "no range of collection {collection} covers key {} — re-resolve routing",
287 DisplayKey(key)
288 ),
289 }
290 }
291}
292
293impl std::error::Error for ReadFanoutReject {}
294
295#[derive(Debug, Clone, Default, PartialEq, Eq)]
303pub struct GlobalReadWatermark {
304 marks: BTreeMap<(CollectionId, RangeId), CommitWatermark>,
305}
306
307impl GlobalReadWatermark {
308 pub fn new() -> Self {
309 Self::default()
310 }
311
312 pub fn with(
314 mut self,
315 collection: CollectionId,
316 range_id: RangeId,
317 watermark: CommitWatermark,
318 ) -> Self {
319 self.marks.insert((collection, range_id), watermark);
320 self
321 }
322
323 pub fn insert(
325 &mut self,
326 collection: CollectionId,
327 range_id: RangeId,
328 watermark: CommitWatermark,
329 ) {
330 self.marks.insert((collection, range_id), watermark);
331 }
332
333 pub fn covers(&self, collection: &CollectionId, range_id: RangeId) -> Option<CommitWatermark> {
336 self.marks.get(&(collection.clone(), range_id)).copied()
337 }
338}
339
340#[derive(Debug, Clone, PartialEq, Eq)]
343pub struct ConsistentReadLeg {
344 owner: NodeIdentity,
345 targets: Vec<PinnedTarget>,
346}
347
348impl ConsistentReadLeg {
349 pub fn owner(&self) -> &NodeIdentity {
350 &self.owner
351 }
352
353 pub fn targets(&self) -> &[PinnedTarget] {
354 &self.targets
355 }
356}
357
358#[derive(Debug, Clone, PartialEq, Eq)]
360pub struct PinnedTarget {
361 target: ResolvedTarget,
362 watermark: CommitWatermark,
363}
364
365impl PinnedTarget {
366 pub fn target(&self) -> &ResolvedTarget {
367 &self.target
368 }
369
370 pub fn watermark(&self) -> CommitWatermark {
371 self.watermark
372 }
373}
374
375#[derive(Debug, Clone, PartialEq, Eq)]
378pub struct ConsistentReadPlan {
379 legs: Vec<ConsistentReadLeg>,
380}
381
382impl ConsistentReadPlan {
383 pub fn legs(&self) -> &[ConsistentReadLeg] {
386 &self.legs
387 }
388}
389
390#[derive(Debug, Clone, PartialEq, Eq)]
392pub enum ConsistentReadReject {
393 Empty,
395 Unroutable {
397 collection: CollectionId,
398 key: Vec<u8>,
399 },
400 NoSafeSnapshot,
404 WatermarkGap {
407 collection: CollectionId,
408 range_id: RangeId,
409 },
410}
411
412impl std::fmt::Display for ConsistentReadReject {
413 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
414 match self {
415 Self::Empty => write!(f, "consistent read names no targets"),
416 Self::Unroutable { collection, key } => write!(
417 f,
418 "no range of collection {collection} covers key {} — re-resolve routing",
419 DisplayKey(key)
420 ),
421 Self::NoSafeSnapshot => write!(
422 f,
423 "consistent cross-range read requires a global safe snapshot/watermark, none supplied"
424 ),
425 Self::WatermarkGap {
426 collection,
427 range_id,
428 } => write!(
429 f,
430 "safe snapshot does not cover {collection}/{range_id}; cannot serve a consistent read"
431 ),
432 }
433 }
434}
435
436impl std::error::Error for ConsistentReadReject {}
437
438struct DisplayKey<'a>(&'a [u8]);
440
441impl std::fmt::Display for DisplayKey<'_> {
442 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
443 write!(f, "0x")?;
444 for b in self.0 {
445 write!(f, "{b:02x}")?;
446 }
447 Ok(())
448 }
449}
450
451impl ShardOwnershipCatalog {
452 fn resolve_targets(
455 &self,
456 targets: &[KeyTarget],
457 ) -> Result<Vec<ResolvedTarget>, (CollectionId, Vec<u8>)> {
458 let mut resolved = Vec::with_capacity(targets.len());
459 for t in targets {
460 match self.route_shard_key(t.collection(), t.key()) {
461 Some(range) => resolved.push(ResolvedTarget {
462 collection: t.collection().clone(),
463 key: t.key().to_vec(),
464 range_id: range.range_id(),
465 owner: range.owner().clone(),
466 epoch: range.epoch(),
467 }),
468 None => return Err((t.collection().clone(), t.key().to_vec())),
469 }
470 }
471 Ok(resolved)
472 }
473
474 pub fn plan_write_transaction(
491 &self,
492 targets: &[KeyTarget],
493 ) -> Result<WriteTransactionPlan, WriteTransactionReject> {
494 if targets.is_empty() {
495 return Err(WriteTransactionReject::Empty);
496 }
497 let resolved = self
498 .resolve_targets(targets)
499 .map_err(|(collection, key)| WriteTransactionReject::Unroutable { collection, key })?;
500
501 let writers = group_by_owner(&resolved);
502 if writers.len() == 1 {
503 let (writer, ranges) = writers.into_iter().next().expect("exactly one writer");
504 Ok(WriteTransactionPlan {
505 participation: WriterParticipation { writer, ranges },
506 })
507 } else {
508 Err(WriteTransactionReject::CrossRange {
509 writers: writers
510 .into_iter()
511 .map(|(writer, ranges)| WriterParticipation { writer, ranges })
512 .collect(),
513 })
514 }
515 }
516
517 pub fn plan_read_fanout(&self, targets: &[KeyTarget]) -> Result<ReadFanout, ReadFanoutReject> {
526 if targets.is_empty() {
527 return Err(ReadFanoutReject::Empty);
528 }
529 let resolved = self
530 .resolve_targets(targets)
531 .map_err(|(collection, key)| ReadFanoutReject::Unroutable { collection, key })?;
532
533 Ok(ReadFanout {
534 legs: group_targets_by_owner(resolved),
535 })
536 }
537
538 pub fn plan_consistent_read(
554 &self,
555 targets: &[KeyTarget],
556 snapshot: Option<&GlobalReadWatermark>,
557 ) -> Result<ConsistentReadPlan, ConsistentReadReject> {
558 if targets.is_empty() {
559 return Err(ConsistentReadReject::Empty);
560 }
561 let resolved = self
562 .resolve_targets(targets)
563 .map_err(|(collection, key)| ConsistentReadReject::Unroutable { collection, key })?;
564
565 let snapshot = snapshot.ok_or(ConsistentReadReject::NoSafeSnapshot)?;
566
567 let mut pinned = Vec::with_capacity(resolved.len());
570 for target in resolved {
571 let watermark = snapshot
572 .covers(target.collection(), target.range_id())
573 .ok_or_else(|| ConsistentReadReject::WatermarkGap {
574 collection: target.collection().clone(),
575 range_id: target.range_id(),
576 })?;
577 pinned.push(PinnedTarget { target, watermark });
578 }
579
580 Ok(ConsistentReadPlan {
581 legs: group_pinned_by_owner(pinned),
582 })
583 }
584}
585
586fn group_by_owner(resolved: &[ResolvedTarget]) -> Vec<(NodeIdentity, Vec<RangeParticipant>)> {
589 let mut by_owner: BTreeMap<NodeIdentity, BTreeMap<RangeId, RangeParticipant>> = BTreeMap::new();
590 for t in resolved {
591 by_owner
592 .entry(t.owner().clone())
593 .or_default()
594 .entry(t.range_id())
595 .or_insert_with(|| RangeParticipant {
596 collection: t.collection().clone(),
597 range_id: t.range_id(),
598 epoch: t.epoch(),
599 });
600 }
601 by_owner
602 .into_iter()
603 .map(|(owner, ranges)| (owner, ranges.into_values().collect()))
604 .collect()
605}
606
607fn group_targets_by_owner(resolved: Vec<ResolvedTarget>) -> Vec<ReadLeg> {
609 let mut by_owner: BTreeMap<NodeIdentity, Vec<ResolvedTarget>> = BTreeMap::new();
610 for t in resolved {
611 by_owner.entry(t.owner().clone()).or_default().push(t);
612 }
613 by_owner
614 .into_iter()
615 .map(|(owner, targets)| ReadLeg { owner, targets })
616 .collect()
617}
618
619fn group_pinned_by_owner(pinned: Vec<PinnedTarget>) -> Vec<ConsistentReadLeg> {
622 let mut by_owner: BTreeMap<NodeIdentity, Vec<PinnedTarget>> = BTreeMap::new();
623 for p in pinned {
624 by_owner
625 .entry(p.target().owner().clone())
626 .or_default()
627 .push(p);
628 }
629 by_owner
630 .into_iter()
631 .map(|(owner, targets)| ConsistentReadLeg { owner, targets })
632 .collect()
633}
634
635#[cfg(test)]
636mod tests {
637 use super::*;
638 use crate::cluster::ownership::{PlacementMetadata, RangeBound, RangeBounds, ShardKeyMode};
639
640 fn collection(name: &str) -> CollectionId {
641 CollectionId::new(name).unwrap()
642 }
643
644 fn ident(cn: &str) -> NodeIdentity {
645 NodeIdentity::from_certificate_subject(cn).unwrap()
646 }
647
648 fn bounds(lower: &[u8], upper: &[u8]) -> RangeBounds {
649 RangeBounds::new(RangeBound::key(lower), RangeBound::key(upper)).unwrap()
650 }
651
652 fn range(
654 coll: &CollectionId,
655 id: u64,
656 bnds: RangeBounds,
657 owner: &str,
658 ) -> super::super::ownership::RangeOwnership {
659 super::super::ownership::RangeOwnership::establish(
660 coll.clone(),
661 RangeId::new(id),
662 ShardKeyMode::Ordered,
663 bnds,
664 ident(owner),
665 [ident("CN=replica-1")],
666 PlacementMetadata::with_replication_factor(3),
667 )
668 }
669
670 fn two_range_catalog() -> (ShardOwnershipCatalog, CollectionId) {
673 let orders = collection("orders");
674 let mut catalog = ShardOwnershipCatalog::new();
675 catalog
676 .apply_update(range(&orders, 1, bounds(b"a", b"m"), "CN=node-a"))
677 .unwrap();
678 catalog
679 .apply_update(range(
680 &orders,
681 2,
682 RangeBounds::new(RangeBound::key(b"m"), RangeBound::Max).unwrap(),
683 "CN=node-b",
684 ))
685 .unwrap();
686 (catalog, orders)
687 }
688
689 fn target(coll: &CollectionId, key: &[u8]) -> KeyTarget {
690 KeyTarget::new(coll.clone(), key.to_vec())
691 }
692
693 #[test]
696 fn single_writer_transaction_succeeds() {
697 let orders = collection("orders");
698 let mut catalog = ShardOwnershipCatalog::new();
699 catalog
701 .apply_update(range(&orders, 1, bounds(b"a", b"m"), "CN=node-a"))
702 .unwrap();
703 catalog
704 .apply_update(range(
705 &orders,
706 2,
707 RangeBounds::new(RangeBound::key(b"m"), RangeBound::Max).unwrap(),
708 "CN=node-a",
709 ))
710 .unwrap();
711
712 let plan = catalog
713 .plan_write_transaction(&[target(&orders, b"alice"), target(&orders, b"zeb")])
714 .expect("single-writer transaction is admitted");
715 assert_eq!(plan.writer(), &ident("CN=node-a"));
716 let ids: Vec<u64> = plan.ranges().iter().map(|r| r.range_id().value()).collect();
718 assert_eq!(ids, vec![1, 2]);
719 assert_eq!(plan.ranges()[0].epoch(), OwnershipEpoch::initial());
720 }
721
722 #[test]
724 fn single_range_transaction_succeeds() {
725 let (catalog, orders) = two_range_catalog();
726 let plan = catalog
727 .plan_write_transaction(&[target(&orders, b"alice"), target(&orders, b"bob")])
728 .expect("single-range transaction is admitted");
729 assert_eq!(plan.writer(), &ident("CN=node-a"));
730 assert_eq!(plan.ranges().len(), 1);
731 assert_eq!(plan.ranges()[0].range_id(), RangeId::new(1));
732 }
733
734 #[test]
737 fn cross_range_write_transaction_is_rejected() {
738 let (catalog, orders) = two_range_catalog();
739 let err = catalog
740 .plan_write_transaction(&[target(&orders, b"alice"), target(&orders, b"zeb")])
741 .expect_err("cross-writer transaction is rejected");
742 match err {
743 WriteTransactionReject::CrossRange { writers } => {
744 assert_eq!(writers.len(), 2);
745 assert_eq!(writers[0].writer(), &ident("CN=node-a"));
746 assert_eq!(writers[1].writer(), &ident("CN=node-b"));
747 assert_eq!(writers[0].ranges()[0].range_id(), RangeId::new(1));
748 assert_eq!(writers[1].ranges()[0].range_id(), RangeId::new(2));
749 }
750 other => panic!("expected CrossRange, got {other:?}"),
751 }
752 }
753
754 #[test]
755 fn empty_write_transaction_is_rejected() {
756 let catalog = ShardOwnershipCatalog::new();
757 assert_eq!(
758 catalog.plan_write_transaction(&[]),
759 Err(WriteTransactionReject::Empty)
760 );
761 }
762
763 #[test]
764 fn unroutable_write_transaction_is_rejected() {
765 let catalog = ShardOwnershipCatalog::new();
766 let orders = collection("orders");
767 match catalog.plan_write_transaction(&[target(&orders, b"x")]) {
768 Err(WriteTransactionReject::Unroutable { collection, key }) => {
769 assert_eq!(collection, orders);
770 assert_eq!(key, b"x");
771 }
772 other => panic!("expected Unroutable, got {other:?}"),
773 }
774 }
775
776 #[test]
778 fn read_fanout_collects_per_owner_legs() {
779 let (catalog, orders) = two_range_catalog();
780 let fanout = catalog
781 .plan_read_fanout(&[
782 target(&orders, b"alice"),
783 target(&orders, b"zeb"),
784 target(&orders, b"bob"),
785 ])
786 .expect("fanout planned");
787 assert!(fanout.is_cross_range());
788 assert_eq!(fanout.legs().len(), 2);
789 let a = &fanout.legs()[0];
791 assert_eq!(a.owner(), &ident("CN=node-a"));
792 assert_eq!(a.targets().len(), 2);
793 let b = &fanout.legs()[1];
794 assert_eq!(b.owner(), &ident("CN=node-b"));
795 assert_eq!(b.targets().len(), 1);
796 assert_eq!(b.targets()[0].key(), b"zeb");
797 }
798
799 #[test]
800 fn single_owner_read_is_not_cross_range() {
801 let (catalog, orders) = two_range_catalog();
802 let fanout = catalog
803 .plan_read_fanout(&[target(&orders, b"alice"), target(&orders, b"bob")])
804 .expect("fanout planned");
805 assert!(!fanout.is_cross_range());
806 assert_eq!(fanout.legs().len(), 1);
807 }
808
809 #[test]
810 fn unroutable_read_fanout_is_rejected() {
811 let catalog = ShardOwnershipCatalog::new();
812 let orders = collection("orders");
813 match catalog.plan_read_fanout(&[target(&orders, b"x")]) {
814 Err(ReadFanoutReject::Unroutable { collection, .. }) => {
815 assert_eq!(collection, orders)
816 }
817 other => panic!("expected Unroutable, got {other:?}"),
818 }
819 }
820
821 #[test]
823 fn consistent_read_without_snapshot_is_rejected() {
824 let (catalog, orders) = two_range_catalog();
825 assert_eq!(
826 catalog
827 .plan_consistent_read(&[target(&orders, b"alice"), target(&orders, b"zeb")], None),
828 Err(ConsistentReadReject::NoSafeSnapshot)
829 );
830 }
831
832 #[test]
834 fn consistent_read_with_incomplete_snapshot_is_rejected() {
835 let (catalog, orders) = two_range_catalog();
836 let snapshot = GlobalReadWatermark::new().with(
838 orders.clone(),
839 RangeId::new(1),
840 CommitWatermark::new(1, 100),
841 );
842 match catalog.plan_consistent_read(
843 &[target(&orders, b"alice"), target(&orders, b"zeb")],
844 Some(&snapshot),
845 ) {
846 Err(ConsistentReadReject::WatermarkGap {
847 collection,
848 range_id,
849 }) => {
850 assert_eq!(collection, orders);
851 assert_eq!(range_id, RangeId::new(2));
852 }
853 other => panic!("expected WatermarkGap, got {other:?}"),
854 }
855 }
856
857 #[test]
860 fn consistent_read_with_full_snapshot_succeeds() {
861 let (catalog, orders) = two_range_catalog();
862 let snapshot = GlobalReadWatermark::new()
863 .with(
864 orders.clone(),
865 RangeId::new(1),
866 CommitWatermark::new(1, 100),
867 )
868 .with(
869 orders.clone(),
870 RangeId::new(2),
871 CommitWatermark::new(1, 250),
872 );
873 let plan = catalog
874 .plan_consistent_read(
875 &[target(&orders, b"alice"), target(&orders, b"zeb")],
876 Some(&snapshot),
877 )
878 .expect("consistent read planned");
879 assert_eq!(plan.legs().len(), 2);
880 let a = &plan.legs()[0];
881 assert_eq!(a.owner(), &ident("CN=node-a"));
882 assert_eq!(a.targets()[0].watermark(), CommitWatermark::new(1, 100));
883 let b = &plan.legs()[1];
884 assert_eq!(b.owner(), &ident("CN=node-b"));
885 assert_eq!(b.targets()[0].watermark(), CommitWatermark::new(1, 250));
886 }
887
888 #[test]
889 fn empty_consistent_read_is_rejected() {
890 let catalog = ShardOwnershipCatalog::new();
891 assert_eq!(
892 catalog.plan_consistent_read(&[], None),
893 Err(ConsistentReadReject::Empty)
894 );
895 }
896
897 #[test]
899 fn cross_range_rejection_message_names_writers() {
900 let (catalog, orders) = two_range_catalog();
901 let err = catalog
902 .plan_write_transaction(&[target(&orders, b"alice"), target(&orders, b"zeb")])
903 .unwrap_err();
904 let msg = err.to_string();
905 assert!(msg.contains("cross-range write transaction"));
906 assert!(msg.contains("CN=node-a"));
907 assert!(msg.contains("CN=node-b"));
908 }
909}