1use std::collections::BTreeMap;
39
40use super::identity::NodeIdentity;
41use super::ownership::{
42 CatalogVersion, CollectionId, OwnershipEpoch, RangeBounds, RangeId, RangeOwnership,
43 ShardOwnershipCatalog,
44};
45use super::routing::RoutingHint;
46
47#[derive(Debug, Clone, PartialEq, Eq)]
56pub struct TopologyRange {
57 collection: CollectionId,
58 range_id: RangeId,
59 bounds: RangeBounds,
60 owner: NodeIdentity,
61 replicas: Vec<NodeIdentity>,
62 epoch: OwnershipEpoch,
63 version: CatalogVersion,
64}
65
66impl TopologyRange {
67 fn from_ownership(range: &RangeOwnership) -> Self {
68 Self {
69 collection: range.collection().clone(),
70 range_id: range.range_id(),
71 bounds: range.bounds().clone(),
72 owner: range.owner().clone(),
73 replicas: range.replicas().to_vec(),
74 epoch: range.epoch(),
75 version: range.version(),
76 }
77 }
78
79 pub fn collection(&self) -> &CollectionId {
80 &self.collection
81 }
82
83 pub fn range_id(&self) -> RangeId {
84 self.range_id
85 }
86
87 pub fn bounds(&self) -> &RangeBounds {
88 &self.bounds
89 }
90
91 pub fn owner(&self) -> &NodeIdentity {
92 &self.owner
93 }
94
95 pub fn replicas(&self) -> &[NodeIdentity] {
96 &self.replicas
97 }
98
99 pub fn epoch(&self) -> OwnershipEpoch {
103 self.epoch
104 }
105
106 pub fn version(&self) -> CatalogVersion {
107 self.version
108 }
109
110 fn key(&self) -> (CollectionId, RangeId) {
111 (self.collection.clone(), self.range_id)
112 }
113}
114
115#[derive(Debug, Clone, PartialEq, Eq)]
125pub struct TopologySnapshot {
126 version: CatalogVersion,
127 ranges: Vec<TopologyRange>,
128}
129
130impl TopologySnapshot {
131 pub fn version(&self) -> CatalogVersion {
133 self.version
134 }
135
136 pub fn ranges(&self) -> &[TopologyRange] {
138 &self.ranges
139 }
140
141 pub fn range(&self, collection: &CollectionId, range_id: RangeId) -> Option<&TopologyRange> {
143 self.ranges
144 .iter()
145 .find(|r| r.collection() == collection && r.range_id() == range_id)
146 }
147
148 pub fn route(&self, collection: &CollectionId, key: &[u8]) -> Option<&TopologyRange> {
152 self.ranges
153 .iter()
154 .find(|r| r.collection() == collection && r.bounds().contains(key))
155 }
156}
157
158impl ShardOwnershipCatalog {
159 pub fn topology_snapshot(&self) -> TopologySnapshot {
167 let ranges: Vec<TopologyRange> =
168 self.entries().map(TopologyRange::from_ownership).collect();
169 let version = ranges
170 .iter()
171 .map(TopologyRange::version)
172 .max()
173 .unwrap_or_else(CatalogVersion::initial);
174 TopologySnapshot { version, ranges }
175 }
176}
177
178#[derive(Debug, Clone, Copy, PartialEq, Eq)]
180pub enum RefreshOutcome {
181 Applied { ranges_changed: usize },
184 Ignored,
187}
188
189impl RefreshOutcome {
190 pub fn was_applied(self) -> bool {
191 matches!(self, RefreshOutcome::Applied { .. })
192 }
193}
194
195#[derive(Debug, Clone, Copy, PartialEq, Eq)]
198pub enum HintOutcome {
199 Corrected,
202 AlreadyCurrent,
205 UnknownRange,
210}
211
212#[derive(Debug, Clone, PartialEq, Eq)]
218pub enum TopologyUpdate {
219 Full(TopologySnapshot),
221 Range(TopologyRange),
223}
224
225#[derive(Debug, Clone, PartialEq, Eq)]
239pub struct ClientTopology {
240 version: CatalogVersion,
241 ranges: BTreeMap<(CollectionId, RangeId), TopologyRange>,
242 needs_refresh: bool,
243}
244
245impl ClientTopology {
246 pub fn from_snapshot(snapshot: TopologySnapshot) -> Self {
248 let mut cache = Self {
249 version: snapshot.version(),
250 ranges: BTreeMap::new(),
251 needs_refresh: false,
252 };
253 for range in snapshot.ranges {
254 cache.ranges.insert(range.key(), range);
255 }
256 cache
257 }
258
259 pub fn version(&self) -> CatalogVersion {
263 self.version
264 }
265
266 pub fn needs_refresh(&self) -> bool {
271 self.needs_refresh
272 }
273
274 pub fn route(&self, collection: &CollectionId, key: &[u8]) -> Option<&TopologyRange> {
277 self.ranges
278 .values()
279 .find(|r| r.collection() == collection && r.bounds().contains(key))
280 }
281
282 pub fn resolve(&self, collection: &CollectionId, key: &[u8]) -> Option<&NodeIdentity> {
284 self.route(collection, key).map(TopologyRange::owner)
285 }
286
287 pub fn range(&self, collection: &CollectionId, range_id: RangeId) -> Option<&TopologyRange> {
289 self.ranges.get(&(collection.clone(), range_id))
290 }
291
292 pub fn apply_refresh(&mut self, snapshot: TopologySnapshot) -> RefreshOutcome {
301 if !self.ranges.is_empty() && snapshot.version() <= self.version {
302 return RefreshOutcome::Ignored;
303 }
304 let mut changed = 0usize;
305 let mut next: BTreeMap<(CollectionId, RangeId), TopologyRange> = BTreeMap::new();
306 for range in snapshot.ranges {
307 let key = range.key();
308 if self.ranges.get(&key) != Some(&range) {
309 changed += 1;
310 }
311 next.insert(key, range);
312 }
313 self.ranges = next;
314 self.version = snapshot.version;
315 self.needs_refresh = false;
316 RefreshOutcome::Applied {
317 ranges_changed: changed,
318 }
319 }
320
321 pub fn apply_update(&mut self, update: TopologyUpdate) -> RefreshOutcome {
331 match update {
332 TopologyUpdate::Full(snapshot) => self.apply_refresh(snapshot),
333 TopologyUpdate::Range(range) => {
334 let key = range.key();
335 let newer = match self.ranges.get(&key) {
336 Some(current) => range.version() > current.version(),
337 None => true,
338 };
339 if !newer {
340 return RefreshOutcome::Ignored;
341 }
342 if range.version() > self.version {
343 self.version = range.version();
344 }
345 self.ranges.insert(key, range);
346 RefreshOutcome::Applied { ranges_changed: 1 }
347 }
348 }
349 }
350
351 pub fn apply_hint(&mut self, hint: &RoutingHint) -> HintOutcome {
362 let key = (hint.collection().clone(), hint.range_id());
363 match self.ranges.get_mut(&key) {
364 Some(range) => {
365 if hint.version() <= range.version {
366 return HintOutcome::AlreadyCurrent;
367 }
368 range.owner = hint.owner().clone();
369 range.epoch = hint.epoch();
370 range.version = hint.version();
371 self.needs_refresh = true;
372 HintOutcome::Corrected
373 }
374 None => {
375 self.needs_refresh = true;
376 HintOutcome::UnknownRange
377 }
378 }
379 }
380}
381
382#[cfg(test)]
383mod tests {
384 use super::*;
385 use crate::cluster::ownership::{PlacementMetadata, RangeBound, ShardKeyMode};
386 use crate::cluster::routing::{RequestOperation, RouteDecision, RoutedRequest, RoutingPolicy};
387
388 fn collection(name: &str) -> CollectionId {
389 CollectionId::new(name).unwrap()
390 }
391
392 fn ident(cn: &str) -> NodeIdentity {
393 NodeIdentity::from_certificate_subject(cn).unwrap()
394 }
395
396 fn full_range(coll: &CollectionId, id: u64, owner: &str, replicas: &[&str]) -> RangeOwnership {
397 RangeOwnership::establish(
398 coll.clone(),
399 RangeId::new(id),
400 ShardKeyMode::Hash,
401 RangeBounds::full(),
402 ident(owner),
403 replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
404 PlacementMetadata::with_replication_factor(3),
405 )
406 }
407
408 fn split_range(
409 coll: &CollectionId,
410 id: u64,
411 lower: RangeBound,
412 upper: RangeBound,
413 owner: &str,
414 ) -> RangeOwnership {
415 RangeOwnership::establish(
416 coll.clone(),
417 RangeId::new(id),
418 ShardKeyMode::Ordered,
419 RangeBounds::new(lower, upper).unwrap(),
420 ident(owner),
421 Vec::<NodeIdentity>::new(),
422 PlacementMetadata::with_replication_factor(1),
423 )
424 }
425
426 fn catalog_with(ranges: impl IntoIterator<Item = RangeOwnership>) -> ShardOwnershipCatalog {
427 let mut catalog = ShardOwnershipCatalog::new();
428 for range in ranges {
429 catalog.apply_update(range).unwrap();
430 }
431 catalog
432 }
433
434 #[test]
437 fn snapshot_exposes_routing_metadata_for_direct_routing() {
438 let orders = collection("orders");
439 let catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &["CN=node-b"])]);
440
441 let snapshot = catalog.topology_snapshot();
442 assert_eq!(snapshot.version(), CatalogVersion::initial());
443 assert_eq!(snapshot.ranges().len(), 1);
444
445 let range = snapshot
446 .route(&orders, b"any-key")
447 .expect("full range covers all keys");
448 assert_eq!(range.owner(), &ident("CN=node-a"));
449 assert_eq!(range.replicas(), &[ident("CN=node-b")]);
450 assert_eq!(range.epoch(), OwnershipEpoch::initial());
451 assert_eq!(range.range_id(), RangeId::new(1));
452 }
453
454 #[test]
457 fn snapshot_routes_keys_to_distinct_owners() {
458 let parts = collection("parts");
459 let catalog = catalog_with([
460 split_range(
461 &parts,
462 1,
463 RangeBound::Min,
464 RangeBound::key(b"m"),
465 "CN=node-a",
466 ),
467 split_range(
468 &parts,
469 2,
470 RangeBound::key(b"m"),
471 RangeBound::Max,
472 "CN=node-b",
473 ),
474 ]);
475 let snapshot = catalog.topology_snapshot();
476
477 assert_eq!(
478 snapshot.route(&parts, b"apple").unwrap().owner(),
479 &ident("CN=node-a")
480 );
481 assert_eq!(
482 snapshot.route(&parts, b"zebra").unwrap().owner(),
483 &ident("CN=node-b")
484 );
485 }
486
487 #[test]
489 fn client_resolves_owner_from_polled_snapshot() {
490 let orders = collection("orders");
491 let catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &[])]);
492 let client = ClientTopology::from_snapshot(catalog.topology_snapshot());
493
494 assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-a"));
495 assert!(!client.needs_refresh());
496 }
497
498 #[test]
501 fn refresh_is_monotonic() {
502 let orders = collection("orders");
503 let mut catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &["CN=node-b"])]);
504 let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
505 let v1 = client.version();
506
507 let r = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
509 catalog
510 .apply_update(r.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
511 .unwrap();
512 let fresh = catalog.topology_snapshot();
513 assert!(fresh.version() > v1);
514
515 assert_eq!(
516 client.apply_refresh(fresh.clone()),
517 RefreshOutcome::Applied { ranges_changed: 1 }
518 );
519 assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-b"));
520
521 assert_eq!(client.apply_refresh(fresh), RefreshOutcome::Ignored);
523 }
524
525 #[test]
528 fn redirect_hint_corrects_cache_but_is_not_authoritative() {
529 let orders = collection("orders");
530 let mut catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &["CN=node-b"])]);
531 let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
532
533 let r = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
535 catalog
536 .apply_update(r.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
537 .unwrap();
538
539 let stale_owner = client.resolve(&orders, b"k").unwrap().clone();
541 assert_eq!(stale_owner, ident("CN=node-a"));
542 let request =
543 RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::Transaction);
544 let hint = match catalog.plan_route(&stale_owner, &request, &RoutingPolicy::forwarding()) {
545 RouteDecision::Redirect { hint, .. } => hint,
546 other => panic!("expected redirect, got {other:?}"),
547 };
548
549 assert_eq!(client.apply_hint(&hint), HintOutcome::Corrected);
551 assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-b"));
552 assert!(
553 client.needs_refresh(),
554 "a hint is advisory, not authoritative"
555 );
556
557 assert!(client
559 .apply_refresh(catalog.topology_snapshot())
560 .was_applied());
561 assert!(!client.needs_refresh());
562 let range = client.range(&orders, RangeId::new(1)).unwrap();
564 assert_eq!(range.replicas(), &[ident("CN=node-a")]);
565 }
566
567 #[test]
570 fn hint_for_unknown_range_does_not_invent_topology() {
571 let orders = collection("orders");
572 let other = collection("other");
573 let catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &[])]);
574 let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
575
576 let foreign = catalog_with([full_range(&other, 9, "CN=node-z", &[])]);
578 let request =
579 RoutedRequest::new(other.clone(), b"k".to_vec(), RequestOperation::Transaction);
580 let hint = foreign
581 .plan_route(&ident("CN=node-b"), &request, &RoutingPolicy::forwarding())
582 .hint()
583 .cloned()
584 .unwrap();
585
586 assert_eq!(client.apply_hint(&hint), HintOutcome::UnknownRange);
587 assert!(
588 client.range(&other, RangeId::new(9)).is_none(),
589 "no phantom range"
590 );
591 assert!(client.needs_refresh());
592 }
593
594 #[test]
597 fn stale_hint_is_ignored_after_authoritative_catch_up() {
598 let orders = collection("orders");
599 let mut catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &["CN=node-b"])]);
600 let request =
601 RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::Transaction);
602 let early_hint = catalog
604 .plan_route(&ident("CN=node-b"), &request, &RoutingPolicy::forwarding())
605 .hint()
606 .cloned()
607 .unwrap();
608
609 let r = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
611 catalog
612 .apply_update(r.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
613 .unwrap();
614 let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
615
616 assert_eq!(client.apply_hint(&early_hint), HintOutcome::AlreadyCurrent);
618 assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-b"));
619 assert!(!client.needs_refresh());
620 }
621
622 #[test]
624 fn push_full_snapshot_applies_like_a_poll() {
625 let orders = collection("orders");
626 let mut catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &[])]);
627 let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
628
629 let r = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
630 catalog
631 .apply_update(r.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
632 .unwrap();
633 let update = TopologyUpdate::Full(catalog.topology_snapshot());
634
635 assert!(client.apply_update(update).was_applied());
636 assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-b"));
637 }
638
639 #[test]
641 fn push_range_delta_advances_one_range() {
642 let parts = collection("parts");
643 let mut catalog = catalog_with([
644 split_range(
645 &parts,
646 1,
647 RangeBound::Min,
648 RangeBound::key(b"m"),
649 "CN=node-a",
650 ),
651 split_range(
652 &parts,
653 2,
654 RangeBound::key(b"m"),
655 RangeBound::Max,
656 "CN=node-b",
657 ),
658 ]);
659 let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
660
661 let r2 = catalog.range(&parts, RangeId::new(2)).unwrap().clone();
663 catalog
664 .apply_update(r2.transfer_to(ident("CN=node-c"), Vec::<NodeIdentity>::new()))
665 .unwrap();
666 let moved = catalog
667 .topology_snapshot()
668 .range(&parts, RangeId::new(2))
669 .unwrap()
670 .clone();
671
672 assert_eq!(
673 client.apply_update(TopologyUpdate::Range(moved)),
674 RefreshOutcome::Applied { ranges_changed: 1 }
675 );
676 assert_eq!(
678 client.resolve(&parts, b"apple").unwrap(),
679 &ident("CN=node-a")
680 );
681 assert_eq!(
682 client.resolve(&parts, b"zebra").unwrap(),
683 &ident("CN=node-c")
684 );
685 }
686
687 #[test]
691 fn missed_push_still_converges_via_hint_and_poll() {
692 let orders = collection("orders");
693 let mut catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &["CN=node-b"])]);
694 let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
695
696 let r = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
699 catalog
700 .apply_update(r.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
701 .unwrap();
702 let _dropped_push = TopologyUpdate::Full(catalog.topology_snapshot());
703
704 assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-a"));
706
707 let request =
709 RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::Transaction);
710 let hint = catalog
711 .plan_route(&ident("CN=node-a"), &request, &RoutingPolicy::forwarding())
712 .hint()
713 .cloned()
714 .unwrap();
715 assert_eq!(client.apply_hint(&hint), HintOutcome::Corrected);
716 assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-b"));
717 assert!(client.needs_refresh());
718
719 assert!(client
721 .apply_refresh(catalog.topology_snapshot())
722 .was_applied());
723 assert!(!client.needs_refresh());
724 }
725
726 #[test]
729 fn out_of_order_push_keeps_newest() {
730 let orders = collection("orders");
731 let mut catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &["CN=node-b"])]);
732 let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
733
734 let r1 = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
736 catalog
737 .apply_update(r1.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
738 .unwrap();
739 let push_v2 = catalog
740 .topology_snapshot()
741 .range(&orders, RangeId::new(1))
742 .unwrap()
743 .clone();
744
745 let r2 = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
747 catalog
748 .apply_update(r2.transfer_to(ident("CN=node-c"), [ident("CN=node-b")]))
749 .unwrap();
750 let push_v3 = catalog
751 .topology_snapshot()
752 .range(&orders, RangeId::new(1))
753 .unwrap()
754 .clone();
755
756 assert!(client
758 .apply_update(TopologyUpdate::Range(push_v3))
759 .was_applied());
760 assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-c"));
761 assert_eq!(
763 client.apply_update(TopologyUpdate::Range(push_v2)),
764 RefreshOutcome::Ignored
765 );
766 assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-c"));
767 }
768}