1use std::collections::BTreeMap;
39
40use super::identity::NodeIdentity;
41use super::ownership::{
42 CatalogVersion, CollectionId, OwnershipEpoch, RangeBounds, RangeId, RangeOwnership,
43 ShardKeyMode, ShardOwnershipCatalog,
44};
45use super::routing::RoutingHint;
46use super::slot::hash_shard_key_to_range_key;
47
48#[derive(Debug, Clone, PartialEq, Eq)]
57pub struct TopologyRange {
58 collection: CollectionId,
59 range_id: RangeId,
60 shard_key_mode: ShardKeyMode,
61 bounds: RangeBounds,
62 owner: NodeIdentity,
63 replicas: Vec<NodeIdentity>,
64 epoch: OwnershipEpoch,
65 version: CatalogVersion,
66}
67
68impl TopologyRange {
69 fn from_ownership(range: &RangeOwnership) -> Self {
70 Self {
71 collection: range.collection().clone(),
72 range_id: range.range_id(),
73 shard_key_mode: range.shard_key_mode(),
74 bounds: range.bounds().clone(),
75 owner: range.owner().clone(),
76 replicas: range.replicas().to_vec(),
77 epoch: range.epoch(),
78 version: range.version(),
79 }
80 }
81
82 pub fn collection(&self) -> &CollectionId {
83 &self.collection
84 }
85
86 pub fn range_id(&self) -> RangeId {
87 self.range_id
88 }
89
90 pub fn shard_key_mode(&self) -> ShardKeyMode {
91 self.shard_key_mode
92 }
93
94 pub fn bounds(&self) -> &RangeBounds {
95 &self.bounds
96 }
97
98 pub fn owner(&self) -> &NodeIdentity {
99 &self.owner
100 }
101
102 pub fn replicas(&self) -> &[NodeIdentity] {
103 &self.replicas
104 }
105
106 pub fn epoch(&self) -> OwnershipEpoch {
110 self.epoch
111 }
112
113 pub fn version(&self) -> CatalogVersion {
114 self.version
115 }
116
117 fn key(&self) -> (CollectionId, RangeId) {
118 (self.collection.clone(), self.range_id)
119 }
120}
121
122#[derive(Debug, Clone, PartialEq, Eq)]
132pub struct TopologySnapshot {
133 version: CatalogVersion,
134 ranges: Vec<TopologyRange>,
135}
136
137impl TopologySnapshot {
138 pub fn version(&self) -> CatalogVersion {
140 self.version
141 }
142
143 pub fn ranges(&self) -> &[TopologyRange] {
145 &self.ranges
146 }
147
148 pub fn range(&self, collection: &CollectionId, range_id: RangeId) -> Option<&TopologyRange> {
150 self.ranges
151 .iter()
152 .find(|r| r.collection() == collection && r.range_id() == range_id)
153 }
154
155 pub fn route(&self, collection: &CollectionId, key: &[u8]) -> Option<&TopologyRange> {
158 self.ranges
159 .iter()
160 .find(|r| r.collection() == collection && r.bounds().contains(key))
161 }
162
163 pub fn route_shard_key(
165 &self,
166 collection: &CollectionId,
167 shard_key: &[u8],
168 ) -> Option<&TopologyRange> {
169 match self.shard_key_mode(collection)? {
170 ShardKeyMode::Ordered => self.route(collection, shard_key),
171 ShardKeyMode::Hash => {
172 let range_key = hash_shard_key_to_range_key(shard_key);
173 self.route(collection, &range_key)
174 }
175 }
176 }
177
178 fn shard_key_mode(&self, collection: &CollectionId) -> Option<ShardKeyMode> {
179 self.ranges
180 .iter()
181 .find(|range| range.collection() == collection)
182 .map(TopologyRange::shard_key_mode)
183 }
184}
185
186impl ShardOwnershipCatalog {
187 pub fn topology_snapshot(&self) -> TopologySnapshot {
195 let ranges: Vec<TopologyRange> =
196 self.entries().map(TopologyRange::from_ownership).collect();
197 let version = ranges
198 .iter()
199 .map(TopologyRange::version)
200 .max()
201 .unwrap_or_else(CatalogVersion::initial);
202 TopologySnapshot { version, ranges }
203 }
204}
205
206#[derive(Debug, Clone, Copy, PartialEq, Eq)]
208pub enum RefreshOutcome {
209 Applied { ranges_changed: usize },
212 Ignored,
215}
216
217impl RefreshOutcome {
218 pub fn was_applied(self) -> bool {
219 matches!(self, RefreshOutcome::Applied { .. })
220 }
221}
222
223#[derive(Debug, Clone, Copy, PartialEq, Eq)]
226pub enum HintOutcome {
227 Corrected,
230 AlreadyCurrent,
233 UnknownRange,
238}
239
240#[derive(Debug, Clone, PartialEq, Eq)]
246pub enum TopologyUpdate {
247 Full(TopologySnapshot),
249 Range(TopologyRange),
251}
252
253#[derive(Debug, Clone, PartialEq, Eq)]
267pub struct ClientTopology {
268 version: CatalogVersion,
269 ranges: BTreeMap<(CollectionId, RangeId), TopologyRange>,
270 needs_refresh: bool,
271}
272
273impl ClientTopology {
274 pub fn from_snapshot(snapshot: TopologySnapshot) -> Self {
276 let mut cache = Self {
277 version: snapshot.version(),
278 ranges: BTreeMap::new(),
279 needs_refresh: false,
280 };
281 for range in snapshot.ranges {
282 cache.ranges.insert(range.key(), range);
283 }
284 cache
285 }
286
287 pub fn version(&self) -> CatalogVersion {
291 self.version
292 }
293
294 pub fn needs_refresh(&self) -> bool {
299 self.needs_refresh
300 }
301
302 pub fn route(&self, collection: &CollectionId, key: &[u8]) -> Option<&TopologyRange> {
304 self.ranges
305 .values()
306 .find(|r| r.collection() == collection && r.bounds().contains(key))
307 }
308
309 pub fn resolve(&self, collection: &CollectionId, key: &[u8]) -> Option<&NodeIdentity> {
311 self.route_shard_key(collection, key)
312 .map(TopologyRange::owner)
313 }
314
315 pub fn route_shard_key(
317 &self,
318 collection: &CollectionId,
319 shard_key: &[u8],
320 ) -> Option<&TopologyRange> {
321 match self.shard_key_mode(collection)? {
322 ShardKeyMode::Ordered => self.route(collection, shard_key),
323 ShardKeyMode::Hash => {
324 let range_key = hash_shard_key_to_range_key(shard_key);
325 self.route(collection, &range_key)
326 }
327 }
328 }
329
330 fn shard_key_mode(&self, collection: &CollectionId) -> Option<ShardKeyMode> {
331 self.ranges
332 .values()
333 .find(|range| range.collection() == collection)
334 .map(TopologyRange::shard_key_mode)
335 }
336
337 pub fn range(&self, collection: &CollectionId, range_id: RangeId) -> Option<&TopologyRange> {
339 self.ranges.get(&(collection.clone(), range_id))
340 }
341
342 pub fn apply_refresh(&mut self, snapshot: TopologySnapshot) -> RefreshOutcome {
351 if !self.ranges.is_empty() && snapshot.version() < self.version {
352 return RefreshOutcome::Ignored;
353 }
354 if self.snapshot_rolls_back_any_range(&snapshot) {
355 return RefreshOutcome::Ignored;
356 }
357 let mut changed = 0usize;
358 let mut next: BTreeMap<(CollectionId, RangeId), TopologyRange> = BTreeMap::new();
359 for range in snapshot.ranges {
360 let key = range.key();
361 if self.ranges.get(&key) != Some(&range) {
362 changed += 1;
363 }
364 next.insert(key, range);
365 }
366 if !self.ranges.is_empty() && snapshot.version <= self.version && changed == 0 {
367 return RefreshOutcome::Ignored;
368 }
369 self.ranges = next;
370 self.version = snapshot.version;
371 self.needs_refresh = false;
372 RefreshOutcome::Applied {
373 ranges_changed: changed,
374 }
375 }
376
377 fn snapshot_rolls_back_any_range(&self, snapshot: &TopologySnapshot) -> bool {
378 snapshot.ranges().iter().any(|incoming| {
379 self.ranges
380 .get(&incoming.key())
381 .is_some_and(|current| incoming.version() < current.version())
382 })
383 }
384
385 pub fn apply_update(&mut self, update: TopologyUpdate) -> RefreshOutcome {
395 match update {
396 TopologyUpdate::Full(snapshot) => self.apply_refresh(snapshot),
397 TopologyUpdate::Range(range) => {
398 let key = range.key();
399 let newer = match self.ranges.get(&key) {
400 Some(current) => range.version() > current.version(),
401 None => true,
402 };
403 if !newer {
404 return RefreshOutcome::Ignored;
405 }
406 if range.version() > self.version {
407 self.version = range.version();
408 }
409 self.ranges.insert(key, range);
410 RefreshOutcome::Applied { ranges_changed: 1 }
411 }
412 }
413 }
414
415 pub fn apply_hint(&mut self, hint: &RoutingHint) -> HintOutcome {
426 let key = (hint.collection().clone(), hint.range_id());
427 match self.ranges.get_mut(&key) {
428 Some(range) => {
429 if hint.version() <= range.version {
430 return HintOutcome::AlreadyCurrent;
431 }
432 range.owner = hint.owner().clone();
433 range.epoch = hint.epoch();
434 range.version = hint.version();
435 self.needs_refresh = true;
436 HintOutcome::Corrected
437 }
438 None => {
439 self.needs_refresh = true;
440 HintOutcome::UnknownRange
441 }
442 }
443 }
444}
445
446#[cfg(test)]
447mod tests {
448 use super::*;
449 use crate::cluster::ownership::{PlacementMetadata, RangeBound, ShardKeyMode};
450 use crate::cluster::routing::{RequestOperation, RouteDecision, RoutedRequest, RoutingPolicy};
451
452 fn collection(name: &str) -> CollectionId {
453 CollectionId::new(name).unwrap()
454 }
455
456 fn ident(cn: &str) -> NodeIdentity {
457 NodeIdentity::from_certificate_subject(cn).unwrap()
458 }
459
460 fn full_range(coll: &CollectionId, id: u64, owner: &str, replicas: &[&str]) -> RangeOwnership {
461 RangeOwnership::establish(
462 coll.clone(),
463 RangeId::new(id),
464 ShardKeyMode::Hash,
465 RangeBounds::full(),
466 ident(owner),
467 replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
468 PlacementMetadata::with_replication_factor(3),
469 )
470 }
471
472 fn split_range(
473 coll: &CollectionId,
474 id: u64,
475 lower: RangeBound,
476 upper: RangeBound,
477 owner: &str,
478 ) -> RangeOwnership {
479 RangeOwnership::establish(
480 coll.clone(),
481 RangeId::new(id),
482 ShardKeyMode::Ordered,
483 RangeBounds::new(lower, upper).unwrap(),
484 ident(owner),
485 Vec::<NodeIdentity>::new(),
486 PlacementMetadata::with_replication_factor(1),
487 )
488 }
489
490 fn single_hash_slot_bounds(key: &[u8]) -> RangeBounds {
491 let slot = super::super::slot::hash_shard_key_to_slot(key);
492 let lower = RangeBound::key(slot.range_key());
493 let upper = match slot.value().checked_add(1) {
494 Some(next) if next < super::super::slot::PRODUCTION_HASH_SLOT_COUNT => {
495 RangeBound::key(super::super::slot::HashSlot::new(next).unwrap().range_key())
496 }
497 _ => RangeBound::Max,
498 };
499 RangeBounds::new(lower, upper).unwrap()
500 }
501
502 fn hash_slot_range(
503 coll: &CollectionId,
504 id: u64,
505 shard_key: &[u8],
506 owner: &str,
507 ) -> RangeOwnership {
508 RangeOwnership::establish(
509 coll.clone(),
510 RangeId::new(id),
511 ShardKeyMode::Hash,
512 single_hash_slot_bounds(shard_key),
513 ident(owner),
514 Vec::<NodeIdentity>::new(),
515 PlacementMetadata::with_replication_factor(1),
516 )
517 }
518
519 fn catalog_with(ranges: impl IntoIterator<Item = RangeOwnership>) -> ShardOwnershipCatalog {
520 let mut catalog = ShardOwnershipCatalog::new();
521 for range in ranges {
522 catalog.apply_update(range).unwrap();
523 }
524 catalog
525 }
526
527 #[test]
530 fn snapshot_exposes_routing_metadata_for_direct_routing() {
531 let orders = collection("orders");
532 let catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &["CN=node-b"])]);
533
534 let snapshot = catalog.topology_snapshot();
535 assert_eq!(snapshot.version(), CatalogVersion::initial());
536 assert_eq!(snapshot.ranges().len(), 1);
537
538 let range = snapshot
539 .route(&orders, b"any-key")
540 .expect("full range covers all keys");
541 assert_eq!(range.owner(), &ident("CN=node-a"));
542 assert_eq!(range.replicas(), &[ident("CN=node-b")]);
543 assert_eq!(range.epoch(), OwnershipEpoch::initial());
544 assert_eq!(range.range_id(), RangeId::new(1));
545 }
546
547 #[test]
550 fn snapshot_routes_keys_to_distinct_owners() {
551 let parts = collection("parts");
552 let catalog = catalog_with([
553 split_range(
554 &parts,
555 1,
556 RangeBound::Min,
557 RangeBound::key(b"m"),
558 "CN=node-a",
559 ),
560 split_range(
561 &parts,
562 2,
563 RangeBound::key(b"m"),
564 RangeBound::Max,
565 "CN=node-b",
566 ),
567 ]);
568 let snapshot = catalog.topology_snapshot();
569
570 assert_eq!(
571 snapshot.route(&parts, b"apple").unwrap().owner(),
572 &ident("CN=node-a")
573 );
574 assert_eq!(
575 snapshot.route(&parts, b"zebra").unwrap().owner(),
576 &ident("CN=node-b")
577 );
578 }
579
580 #[test]
582 fn client_resolves_owner_from_polled_snapshot() {
583 let orders = collection("orders");
584 let catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &[])]);
585 let client = ClientTopology::from_snapshot(catalog.topology_snapshot());
586
587 assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-a"));
588 assert!(!client.needs_refresh());
589 }
590
591 #[test]
592 fn client_resolves_hash_collection_by_shard_key_slot() {
593 let orders = collection("orders");
594 let key = b"tenant:42";
595 let catalog = catalog_with([hash_slot_range(&orders, 1, key, "CN=node-a")]);
596 let client = ClientTopology::from_snapshot(catalog.topology_snapshot());
597
598 let routed = client
599 .route_shard_key(&orders, key)
600 .expect("hash slot range covers the logical shard key");
601 assert_eq!(routed.owner(), &ident("CN=node-a"));
602 assert_eq!(client.resolve(&orders, key).unwrap(), &ident("CN=node-a"));
603 }
604
605 #[test]
608 fn refresh_is_monotonic() {
609 let orders = collection("orders");
610 let mut catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &["CN=node-b"])]);
611 let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
612 let v1 = client.version();
613
614 let r = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
616 catalog
617 .apply_update(r.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
618 .unwrap();
619 let fresh = catalog.topology_snapshot();
620 assert!(fresh.version() > v1);
621
622 assert_eq!(
623 client.apply_refresh(fresh.clone()),
624 RefreshOutcome::Applied { ranges_changed: 1 }
625 );
626 assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-b"));
627
628 assert_eq!(client.apply_refresh(fresh), RefreshOutcome::Ignored);
630 }
631
632 #[test]
633 fn refresh_applies_same_generation_snapshot_when_another_range_changed() {
634 let parts = collection("parts");
635 let mut catalog = catalog_with([
636 split_range(
637 &parts,
638 1,
639 RangeBound::Min,
640 RangeBound::key(b"m"),
641 "CN=node-a",
642 ),
643 split_range(
644 &parts,
645 2,
646 RangeBound::key(b"m"),
647 RangeBound::Max,
648 "CN=node-b",
649 ),
650 ]);
651 let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
652
653 let r1 = catalog.range(&parts, RangeId::new(1)).unwrap().clone();
654 catalog
655 .apply_update(r1.transfer_to(ident("CN=node-c"), Vec::<NodeIdentity>::new()))
656 .unwrap();
657 assert_eq!(
658 client.apply_refresh(catalog.topology_snapshot()),
659 RefreshOutcome::Applied { ranges_changed: 1 }
660 );
661 assert_eq!(
662 client.resolve(&parts, b"apple").unwrap(),
663 &ident("CN=node-c")
664 );
665
666 let r2 = catalog.range(&parts, RangeId::new(2)).unwrap().clone();
667 catalog
668 .apply_update(r2.transfer_to(ident("CN=node-d"), Vec::<NodeIdentity>::new()))
669 .unwrap();
670 let same_generation = catalog.topology_snapshot();
671 assert_eq!(same_generation.version(), client.version());
672
673 assert_eq!(
674 client.apply_refresh(same_generation),
675 RefreshOutcome::Applied { ranges_changed: 1 }
676 );
677 assert_eq!(
678 client.resolve(&parts, b"zebra").unwrap(),
679 &ident("CN=node-d")
680 );
681 }
682
683 #[test]
684 fn equal_generation_refresh_does_not_roll_back_a_newer_range() {
685 let parts = collection("parts");
686 let base = catalog_with([
687 split_range(
688 &parts,
689 1,
690 RangeBound::Min,
691 RangeBound::key(b"m"),
692 "CN=node-a",
693 ),
694 split_range(
695 &parts,
696 2,
697 RangeBound::key(b"m"),
698 RangeBound::Max,
699 "CN=node-b",
700 ),
701 ]);
702 let mut current_catalog = base.clone();
703 let mut stale_fork = base;
704 let mut client = ClientTopology::from_snapshot(current_catalog.topology_snapshot());
705
706 let r1 = current_catalog
707 .range(&parts, RangeId::new(1))
708 .unwrap()
709 .clone();
710 current_catalog
711 .apply_update(r1.transfer_to(ident("CN=node-c"), Vec::<NodeIdentity>::new()))
712 .unwrap();
713 assert!(client
714 .apply_refresh(current_catalog.topology_snapshot())
715 .was_applied());
716
717 let r2 = stale_fork.range(&parts, RangeId::new(2)).unwrap().clone();
718 stale_fork
719 .apply_update(r2.transfer_to(ident("CN=node-d"), Vec::<NodeIdentity>::new()))
720 .unwrap();
721 let fork_snapshot = stale_fork.topology_snapshot();
722 assert_eq!(fork_snapshot.version(), client.version());
723
724 assert_eq!(client.apply_refresh(fork_snapshot), RefreshOutcome::Ignored);
725 assert_eq!(
726 client.resolve(&parts, b"apple").unwrap(),
727 &ident("CN=node-c")
728 );
729 assert_eq!(
730 client.resolve(&parts, b"zebra").unwrap(),
731 &ident("CN=node-b")
732 );
733 }
734
735 #[test]
738 fn redirect_hint_corrects_cache_but_is_not_authoritative() {
739 let orders = collection("orders");
740 let mut catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &["CN=node-b"])]);
741 let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
742
743 let r = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
745 catalog
746 .apply_update(r.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
747 .unwrap();
748
749 let stale_owner = client.resolve(&orders, b"k").unwrap().clone();
751 assert_eq!(stale_owner, ident("CN=node-a"));
752 let request =
753 RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::Transaction);
754 let hint = match catalog.plan_route(&stale_owner, &request, &RoutingPolicy::forwarding()) {
755 RouteDecision::Redirect { hint, .. } => hint,
756 other => panic!("expected redirect, got {other:?}"),
757 };
758
759 assert_eq!(client.apply_hint(&hint), HintOutcome::Corrected);
761 assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-b"));
762 assert!(
763 client.needs_refresh(),
764 "a hint is advisory, not authoritative"
765 );
766
767 assert!(client
769 .apply_refresh(catalog.topology_snapshot())
770 .was_applied());
771 assert!(!client.needs_refresh());
772 let range = client.range(&orders, RangeId::new(1)).unwrap();
774 assert_eq!(range.replicas(), &[ident("CN=node-a")]);
775 }
776
777 #[test]
780 fn hint_for_unknown_range_does_not_invent_topology() {
781 let orders = collection("orders");
782 let other = collection("other");
783 let catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &[])]);
784 let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
785
786 let foreign = catalog_with([full_range(&other, 9, "CN=node-z", &[])]);
788 let request =
789 RoutedRequest::new(other.clone(), b"k".to_vec(), RequestOperation::Transaction);
790 let hint = foreign
791 .plan_route(&ident("CN=node-b"), &request, &RoutingPolicy::forwarding())
792 .hint()
793 .cloned()
794 .unwrap();
795
796 assert_eq!(client.apply_hint(&hint), HintOutcome::UnknownRange);
797 assert!(
798 client.range(&other, RangeId::new(9)).is_none(),
799 "no phantom range"
800 );
801 assert!(client.needs_refresh());
802 }
803
804 #[test]
807 fn stale_hint_is_ignored_after_authoritative_catch_up() {
808 let orders = collection("orders");
809 let mut catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &["CN=node-b"])]);
810 let request =
811 RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::Transaction);
812 let early_hint = catalog
814 .plan_route(&ident("CN=node-b"), &request, &RoutingPolicy::forwarding())
815 .hint()
816 .cloned()
817 .unwrap();
818
819 let r = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
821 catalog
822 .apply_update(r.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
823 .unwrap();
824 let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
825
826 assert_eq!(client.apply_hint(&early_hint), HintOutcome::AlreadyCurrent);
828 assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-b"));
829 assert!(!client.needs_refresh());
830 }
831
832 #[test]
834 fn push_full_snapshot_applies_like_a_poll() {
835 let orders = collection("orders");
836 let mut catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &[])]);
837 let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
838
839 let r = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
840 catalog
841 .apply_update(r.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
842 .unwrap();
843 let update = TopologyUpdate::Full(catalog.topology_snapshot());
844
845 assert!(client.apply_update(update).was_applied());
846 assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-b"));
847 }
848
849 #[test]
851 fn push_range_delta_advances_one_range() {
852 let parts = collection("parts");
853 let mut catalog = catalog_with([
854 split_range(
855 &parts,
856 1,
857 RangeBound::Min,
858 RangeBound::key(b"m"),
859 "CN=node-a",
860 ),
861 split_range(
862 &parts,
863 2,
864 RangeBound::key(b"m"),
865 RangeBound::Max,
866 "CN=node-b",
867 ),
868 ]);
869 let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
870
871 let r2 = catalog.range(&parts, RangeId::new(2)).unwrap().clone();
873 catalog
874 .apply_update(r2.transfer_to(ident("CN=node-c"), Vec::<NodeIdentity>::new()))
875 .unwrap();
876 let moved = catalog
877 .topology_snapshot()
878 .range(&parts, RangeId::new(2))
879 .unwrap()
880 .clone();
881
882 assert_eq!(
883 client.apply_update(TopologyUpdate::Range(moved)),
884 RefreshOutcome::Applied { ranges_changed: 1 }
885 );
886 assert_eq!(
888 client.resolve(&parts, b"apple").unwrap(),
889 &ident("CN=node-a")
890 );
891 assert_eq!(
892 client.resolve(&parts, b"zebra").unwrap(),
893 &ident("CN=node-c")
894 );
895 }
896
897 #[test]
901 fn missed_push_still_converges_via_hint_and_poll() {
902 let orders = collection("orders");
903 let mut catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &["CN=node-b"])]);
904 let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
905
906 let r = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
909 catalog
910 .apply_update(r.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
911 .unwrap();
912 let _dropped_push = TopologyUpdate::Full(catalog.topology_snapshot());
913
914 assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-a"));
916
917 let request =
919 RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::Transaction);
920 let hint = catalog
921 .plan_route(&ident("CN=node-a"), &request, &RoutingPolicy::forwarding())
922 .hint()
923 .cloned()
924 .unwrap();
925 assert_eq!(client.apply_hint(&hint), HintOutcome::Corrected);
926 assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-b"));
927 assert!(client.needs_refresh());
928
929 assert!(client
931 .apply_refresh(catalog.topology_snapshot())
932 .was_applied());
933 assert!(!client.needs_refresh());
934 }
935
936 #[test]
939 fn out_of_order_push_keeps_newest() {
940 let orders = collection("orders");
941 let mut catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &["CN=node-b"])]);
942 let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
943
944 let r1 = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
946 catalog
947 .apply_update(r1.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
948 .unwrap();
949 let push_v2 = catalog
950 .topology_snapshot()
951 .range(&orders, RangeId::new(1))
952 .unwrap()
953 .clone();
954
955 let r2 = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
957 catalog
958 .apply_update(r2.transfer_to(ident("CN=node-c"), [ident("CN=node-b")]))
959 .unwrap();
960 let push_v3 = catalog
961 .topology_snapshot()
962 .range(&orders, RangeId::new(1))
963 .unwrap()
964 .clone();
965
966 assert!(client
968 .apply_update(TopologyUpdate::Range(push_v3))
969 .was_applied());
970 assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-c"));
971 assert_eq!(
973 client.apply_update(TopologyUpdate::Range(push_v2)),
974 RefreshOutcome::Ignored
975 );
976 assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-c"));
977 }
978}