Skip to main content

reddb_server/cluster/
topology.rs

1//! Topology refresh and routing-hint client contract (issue #994, PRD #987, ADR 0037).
2//!
3//! Any-node routing ([`plan_route`](ShardOwnershipCatalog::plan_route), issue
4//! #993) lets a request land on any data member and still do something correct.
5//! This module is the *client-facing* half of that story: the contract a driver
6//! uses to learn the cluster's shape, route directly to range owners, and stay
7//! correct as ownership moves. Three mechanisms, in strict priority of authority:
8//!
9//! 1. **Polling** is the baseline and the source of authority. A driver fetches a
10//!    [`TopologySnapshot`] — every range's bounds, owner, replicas, ownership
11//!    epoch, and catalog version — and caches it in a [`ClientTopology`]. This is
12//!    the only path that establishes *authoritative* topology, and a driver that
13//!    only ever polls is always eventually correct.
14//!
15//! 2. **Routing hints** ([`RoutingHint`](super::routing::RoutingHint), carried on
16//!    a redirect response from issue #993) are an *advisory correction*. When a
17//!    write reaches a stale owner the response names the current owner+epoch; the
18//!    driver applies that hint to stop hammering the stale node, but the hint is
19//!    explicitly **not** authoritative — it cannot introduce ranges, it carries no
20//!    replica set, and applying one raises [`needs_refresh`](ClientTopology::needs_refresh)
21//!    so the driver knows to reconcile against an authoritative poll. This is
22//!    ADR 0037's "stale ownership responses remain the mandatory correctness path"
23//!    expressed on the client: correctness never *depends* on a hint, a hint only
24//!    *accelerates* convergence.
25//!
26//! 3. **Push / subscription updates** ([`TopologyUpdate`]) are an optional
27//!    accelerator where the transport supports them. A pushed snapshot or
28//!    single-range delta flows through the *same monotonic apply path* as a poll,
29//!    so a driver that misses a push is never wrong — the next poll (or the next
30//!    redirect hint) carries it forward. Push is never mandatory for correctness.
31//!
32//! Like the rest of the cluster module this is a pure data/decision layer with no
33//! I/O: [`topology_snapshot`](ShardOwnershipCatalog::topology_snapshot) projects a
34//! catalog into a driver-facing payload, and [`ClientTopology`] models exactly how
35//! a driver folds polls, hints, and pushes together. The transport that serialises
36//! a snapshot onto the wire or pushes a delta is a separate concern on top.
37
38use std::collections::BTreeMap;
39
40use super::identity::NodeIdentity;
41use super::ownership::{
42    CatalogVersion, CollectionId, OwnershipEpoch, RangeBounds, RangeId, RangeOwnership,
43    ShardKeyMode, ShardOwnershipCatalog,
44};
45use super::routing::RoutingHint;
46use super::slot::hash_shard_key_to_range_key;
47
48/// One range's routing metadata as a driver sees it.
49///
50/// Carries everything a driver needs to route a key directly to its owner and
51/// fence the write at the right epoch: the half-open [`bounds`](Self::bounds) for
52/// client-side range routing, the [`owner`](Self::owner) to send to, the
53/// [`replicas`](Self::replicas) for read fan-out, the [`epoch`](Self::epoch) to
54/// stamp a write at, and the [`version`](Self::version) used to decide whether an
55/// incoming update is newer than what is cached.
56#[derive(Debug, Clone, PartialEq, Eq)]
57pub struct TopologyRange {
58    collection: CollectionId,
59    range_id: RangeId,
60    shard_key_mode: ShardKeyMode,
61    bounds: RangeBounds,
62    owner: NodeIdentity,
63    replicas: Vec<NodeIdentity>,
64    epoch: OwnershipEpoch,
65    version: CatalogVersion,
66}
67
68impl TopologyRange {
69    fn from_ownership(range: &RangeOwnership) -> Self {
70        Self {
71            collection: range.collection().clone(),
72            range_id: range.range_id(),
73            shard_key_mode: range.shard_key_mode(),
74            bounds: range.bounds().clone(),
75            owner: range.owner().clone(),
76            replicas: range.replicas().to_vec(),
77            epoch: range.epoch(),
78            version: range.version(),
79        }
80    }
81
82    pub fn collection(&self) -> &CollectionId {
83        &self.collection
84    }
85
86    pub fn range_id(&self) -> RangeId {
87        self.range_id
88    }
89
90    pub fn shard_key_mode(&self) -> ShardKeyMode {
91        self.shard_key_mode
92    }
93
94    pub fn bounds(&self) -> &RangeBounds {
95        &self.bounds
96    }
97
98    pub fn owner(&self) -> &NodeIdentity {
99        &self.owner
100    }
101
102    pub fn replicas(&self) -> &[NodeIdentity] {
103        &self.replicas
104    }
105
106    /// The epoch a driver should stamp a write to this range at — the same epoch
107    /// the owner's [`admit_public_write`](ShardOwnershipCatalog::admit_public_write)
108    /// gate will check (issue #990).
109    pub fn epoch(&self) -> OwnershipEpoch {
110        self.epoch
111    }
112
113    pub fn version(&self) -> CatalogVersion {
114        self.version
115    }
116
117    fn key(&self) -> (CollectionId, RangeId) {
118        (self.collection.clone(), self.range_id)
119    }
120}
121
122/// A point-in-time, driver-facing projection of the ownership catalog — the
123/// payload a topology poll returns.
124///
125/// The [`version`](Self::version) is the snapshot's high-water mark: the
126/// **maximum** catalog version across its ranges (or [`CatalogVersion::initial`]
127/// for an empty cluster). It is monotonic, but not a complete generation number:
128/// two different ranges can independently advance to the same version. Drivers
129/// therefore use it as a cheap stale-snapshot guard and still compare per-range
130/// content for same-version full refreshes.
131#[derive(Debug, Clone, PartialEq, Eq)]
132pub struct TopologySnapshot {
133    version: CatalogVersion,
134    ranges: Vec<TopologyRange>,
135}
136
137impl TopologySnapshot {
138    /// The snapshot generation — the high-water catalog version across its ranges.
139    pub fn version(&self) -> CatalogVersion {
140        self.version
141    }
142
143    /// Every range in the snapshot, in `(collection, range_id)` order.
144    pub fn ranges(&self) -> &[TopologyRange] {
145        &self.ranges
146    }
147
148    /// Look up one range by identity.
149    pub fn range(&self, collection: &CollectionId, range_id: RangeId) -> Option<&TopologyRange> {
150        self.ranges
151            .iter()
152            .find(|r| r.collection() == collection && r.range_id() == range_id)
153    }
154
155    /// Route a normalized range key to the range that owns it, by the same
156    /// half-open containment predicate the server uses.
157    pub fn route(&self, collection: &CollectionId, key: &[u8]) -> Option<&TopologyRange> {
158        self.ranges
159            .iter()
160            .find(|r| r.collection() == collection && r.bounds().contains(key))
161    }
162
163    /// Route a logical shard key through the collection's shard-key mode.
164    pub fn route_shard_key(
165        &self,
166        collection: &CollectionId,
167        shard_key: &[u8],
168    ) -> Option<&TopologyRange> {
169        match self.shard_key_mode(collection)? {
170            ShardKeyMode::Ordered => self.route(collection, shard_key),
171            ShardKeyMode::Hash => {
172                let range_key = hash_shard_key_to_range_key(shard_key);
173                self.route(collection, &range_key)
174            }
175        }
176    }
177
178    fn shard_key_mode(&self, collection: &CollectionId) -> Option<ShardKeyMode> {
179        self.ranges
180            .iter()
181            .find(|range| range.collection() == collection)
182            .map(TopologyRange::shard_key_mode)
183    }
184}
185
186impl ShardOwnershipCatalog {
187    /// Project the catalog into a driver-facing [`TopologySnapshot`] — the payload
188    /// a topology poll serves (issue #994).
189    ///
190    /// The snapshot carries every range's bounds, owner, replicas, ownership
191    /// epoch, and catalog version, and stamps a generation
192    /// ([`version`](TopologySnapshot::version)) drivers use to tell a newer
193    /// snapshot from a stale one.
194    pub fn topology_snapshot(&self) -> TopologySnapshot {
195        let ranges: Vec<TopologyRange> =
196            self.entries().map(TopologyRange::from_ownership).collect();
197        let version = ranges
198            .iter()
199            .map(TopologyRange::version)
200            .max()
201            .unwrap_or_else(CatalogVersion::initial);
202        TopologySnapshot { version, ranges }
203    }
204}
205
206/// The result of folding a polled or pushed snapshot/delta into a [`ClientTopology`].
207#[derive(Debug, Clone, Copy, PartialEq, Eq)]
208pub enum RefreshOutcome {
209    /// The incoming data was strictly newer and was adopted; `ranges_changed`
210    /// ranges were added or advanced.
211    Applied { ranges_changed: usize },
212    /// The incoming data was not newer than what is cached and was ignored — the
213    /// monotonicity guard that makes out-of-order and duplicate delivery safe.
214    Ignored,
215}
216
217impl RefreshOutcome {
218    pub fn was_applied(self) -> bool {
219        matches!(self, RefreshOutcome::Applied { .. })
220    }
221}
222
223/// The result of applying an advisory [`RoutingHint`](super::routing::RoutingHint)
224/// correction to a [`ClientTopology`].
225#[derive(Debug, Clone, Copy, PartialEq, Eq)]
226pub enum HintOutcome {
227    /// The hint named a newer owner/epoch for a known range; the cached owner was
228    /// corrected and [`needs_refresh`](ClientTopology::needs_refresh) was raised.
229    Corrected,
230    /// The cache already held this range at or beyond the hint's version; nothing
231    /// changed (an authoritative apply had already overtaken the hint).
232    AlreadyCurrent,
233    /// The hint named a range the cache does not know. A hint is never
234    /// authoritative enough to *introduce* a range, so no range was created;
235    /// [`needs_refresh`](ClientTopology::needs_refresh) was raised so the driver
236    /// polls for the authoritative topology.
237    UnknownRange,
238}
239
240/// A topology change delivered over a push / subscription transport.
241///
242/// Both variants flow through the same monotonic apply path as a poll
243/// ([`ClientTopology::apply_update`]), so a missed or out-of-order push is never a
244/// correctness problem — it is only a missed *acceleration*.
245#[derive(Debug, Clone, PartialEq, Eq)]
246pub enum TopologyUpdate {
247    /// A full snapshot push — identical in effect to a poll.
248    Full(TopologySnapshot),
249    /// A single range advanced; carries just that range's new metadata.
250    Range(TopologyRange),
251}
252
253/// A driver's cached view of cluster topology, and the contract for keeping it
254/// correct (issue #994).
255///
256/// Holds the ranges the driver believes in, the authoritative
257/// [`version`](Self::version) of the last *polled or pushed* snapshot, and a
258/// [`needs_refresh`](Self::needs_refresh) flag that is raised whenever the driver
259/// is running on an advisory hint correction rather than authoritative topology.
260///
261/// The three inputs compose by authority: [`apply_refresh`](Self::apply_refresh)
262/// and [`apply_update`](Self::apply_update) are authoritative and monotonic;
263/// [`apply_hint`](Self::apply_hint) is advisory and only ever corrects a *known*
264/// range's owner/epoch. Authority always wins — an authoritative apply that is
265/// newer overwrites a prior hint correction and clears `needs_refresh`.
266#[derive(Debug, Clone, PartialEq, Eq)]
267pub struct ClientTopology {
268    version: CatalogVersion,
269    ranges: BTreeMap<(CollectionId, RangeId), TopologyRange>,
270    needs_refresh: bool,
271}
272
273impl ClientTopology {
274    /// Seed a cache from an initial topology poll.
275    pub fn from_snapshot(snapshot: TopologySnapshot) -> Self {
276        let mut cache = Self {
277            version: snapshot.version(),
278            ranges: BTreeMap::new(),
279            needs_refresh: false,
280        };
281        for range in snapshot.ranges {
282            cache.ranges.insert(range.key(), range);
283        }
284        cache
285    }
286
287    /// The authoritative generation of this cache — the version of the most recent
288    /// snapshot or range delta adopted. Advisory hint corrections do **not** move
289    /// it, so it always reflects authoritative topology.
290    pub fn version(&self) -> CatalogVersion {
291        self.version
292    }
293
294    /// Whether the cache is running on an advisory hint correction and should poll
295    /// for authoritative topology. Raised by [`apply_hint`](Self::apply_hint);
296    /// cleared by an authoritative [`apply_refresh`](Self::apply_refresh) (or a
297    /// full-snapshot [`apply_update`](Self::apply_update)).
298    pub fn needs_refresh(&self) -> bool {
299        self.needs_refresh
300    }
301
302    /// The cached range owning a normalized range key.
303    pub fn route(&self, collection: &CollectionId, key: &[u8]) -> Option<&TopologyRange> {
304        self.ranges
305            .values()
306            .find(|r| r.collection() == collection && r.bounds().contains(key))
307    }
308
309    /// The owner a driver should send a request for `key` to — the routing answer.
310    pub fn resolve(&self, collection: &CollectionId, key: &[u8]) -> Option<&NodeIdentity> {
311        self.route_shard_key(collection, key)
312            .map(TopologyRange::owner)
313    }
314
315    /// The cached range owning a logical shard key.
316    pub fn route_shard_key(
317        &self,
318        collection: &CollectionId,
319        shard_key: &[u8],
320    ) -> Option<&TopologyRange> {
321        match self.shard_key_mode(collection)? {
322            ShardKeyMode::Ordered => self.route(collection, shard_key),
323            ShardKeyMode::Hash => {
324                let range_key = hash_shard_key_to_range_key(shard_key);
325                self.route(collection, &range_key)
326            }
327        }
328    }
329
330    fn shard_key_mode(&self, collection: &CollectionId) -> Option<ShardKeyMode> {
331        self.ranges
332            .values()
333            .find(|range| range.collection() == collection)
334            .map(TopologyRange::shard_key_mode)
335    }
336
337    /// One cached range by identity.
338    pub fn range(&self, collection: &CollectionId, range_id: RangeId) -> Option<&TopologyRange> {
339        self.ranges.get(&(collection.clone(), range_id))
340    }
341
342    /// Adopt a freshly polled snapshot — the authoritative refresh path.
343    ///
344    /// Monotonic: the snapshot is adopted if its generation advances the cache's
345    /// authoritative [`version`](Self::version), or if it carries same-generation
346    /// range content that does not roll any cached range backwards. An adopted
347    /// snapshot replaces the cached ranges wholesale and clears
348    /// [`needs_refresh`](Self::needs_refresh); an older or duplicate one is
349    /// [`Ignored`](RefreshOutcome::Ignored).
350    pub fn apply_refresh(&mut self, snapshot: TopologySnapshot) -> RefreshOutcome {
351        if !self.ranges.is_empty() && snapshot.version() < self.version {
352            return RefreshOutcome::Ignored;
353        }
354        if self.snapshot_rolls_back_any_range(&snapshot) {
355            return RefreshOutcome::Ignored;
356        }
357        let mut changed = 0usize;
358        let mut next: BTreeMap<(CollectionId, RangeId), TopologyRange> = BTreeMap::new();
359        for range in snapshot.ranges {
360            let key = range.key();
361            if self.ranges.get(&key) != Some(&range) {
362                changed += 1;
363            }
364            next.insert(key, range);
365        }
366        if !self.ranges.is_empty() && snapshot.version <= self.version && changed == 0 {
367            return RefreshOutcome::Ignored;
368        }
369        self.ranges = next;
370        self.version = snapshot.version;
371        self.needs_refresh = false;
372        RefreshOutcome::Applied {
373            ranges_changed: changed,
374        }
375    }
376
377    fn snapshot_rolls_back_any_range(&self, snapshot: &TopologySnapshot) -> bool {
378        snapshot.ranges().iter().any(|incoming| {
379            self.ranges
380                .get(&incoming.key())
381                .is_some_and(|current| incoming.version() < current.version())
382        })
383    }
384
385    /// Fold a pushed topology update in — the optional push/subscription path.
386    ///
387    /// A [`Full`](TopologyUpdate::Full) push is exactly an
388    /// [`apply_refresh`](Self::apply_refresh). A [`Range`](TopologyUpdate::Range)
389    /// delta advances a single range when its version is newer than the cached
390    /// one (and bumps the authoritative version to match), or is
391    /// [`Ignored`](RefreshOutcome::Ignored) when it is not newer. Because both go
392    /// through the same monotonic guard, a missed push is never a correctness
393    /// problem — a later poll or delta carries the change forward.
394    pub fn apply_update(&mut self, update: TopologyUpdate) -> RefreshOutcome {
395        match update {
396            TopologyUpdate::Full(snapshot) => self.apply_refresh(snapshot),
397            TopologyUpdate::Range(range) => {
398                let key = range.key();
399                let newer = match self.ranges.get(&key) {
400                    Some(current) => range.version() > current.version(),
401                    None => true,
402                };
403                if !newer {
404                    return RefreshOutcome::Ignored;
405                }
406                if range.version() > self.version {
407                    self.version = range.version();
408                }
409                self.ranges.insert(key, range);
410                RefreshOutcome::Applied { ranges_changed: 1 }
411            }
412        }
413    }
414
415    /// Apply an advisory routing-hint correction from a redirect response — the
416    /// stale-ownership correctness path (issue #993, ADR 0037).
417    ///
418    /// A hint is **not** authoritative: it can only correct the owner/epoch of a
419    /// range the cache already knows, and only when it is strictly newer than the
420    /// cached range. On a correction the cached owner/epoch/version advance (the
421    /// known bounds are kept; the replica set is left as-is because a hint carries
422    /// none) and [`needs_refresh`](Self::needs_refresh) is raised so the driver
423    /// reconciles against an authoritative poll. A hint for an unknown range
424    /// creates nothing — it only raises `needs_refresh`.
425    pub fn apply_hint(&mut self, hint: &RoutingHint) -> HintOutcome {
426        let key = (hint.collection().clone(), hint.range_id());
427        match self.ranges.get_mut(&key) {
428            Some(range) => {
429                if hint.version() <= range.version {
430                    return HintOutcome::AlreadyCurrent;
431                }
432                range.owner = hint.owner().clone();
433                range.epoch = hint.epoch();
434                range.version = hint.version();
435                self.needs_refresh = true;
436                HintOutcome::Corrected
437            }
438            None => {
439                self.needs_refresh = true;
440                HintOutcome::UnknownRange
441            }
442        }
443    }
444}
445
446#[cfg(test)]
447mod tests {
448    use super::*;
449    use crate::cluster::ownership::{PlacementMetadata, RangeBound, ShardKeyMode};
450    use crate::cluster::routing::{RequestOperation, RouteDecision, RoutedRequest, RoutingPolicy};
451
452    fn collection(name: &str) -> CollectionId {
453        CollectionId::new(name).unwrap()
454    }
455
456    fn ident(cn: &str) -> NodeIdentity {
457        NodeIdentity::from_certificate_subject(cn).unwrap()
458    }
459
460    fn full_range(coll: &CollectionId, id: u64, owner: &str, replicas: &[&str]) -> RangeOwnership {
461        RangeOwnership::establish(
462            coll.clone(),
463            RangeId::new(id),
464            ShardKeyMode::Hash,
465            RangeBounds::full(),
466            ident(owner),
467            replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
468            PlacementMetadata::with_replication_factor(3),
469        )
470    }
471
472    fn split_range(
473        coll: &CollectionId,
474        id: u64,
475        lower: RangeBound,
476        upper: RangeBound,
477        owner: &str,
478    ) -> RangeOwnership {
479        RangeOwnership::establish(
480            coll.clone(),
481            RangeId::new(id),
482            ShardKeyMode::Ordered,
483            RangeBounds::new(lower, upper).unwrap(),
484            ident(owner),
485            Vec::<NodeIdentity>::new(),
486            PlacementMetadata::with_replication_factor(1),
487        )
488    }
489
490    fn single_hash_slot_bounds(key: &[u8]) -> RangeBounds {
491        let slot = super::super::slot::hash_shard_key_to_slot(key);
492        let lower = RangeBound::key(slot.range_key());
493        let upper = match slot.value().checked_add(1) {
494            Some(next) if next < super::super::slot::PRODUCTION_HASH_SLOT_COUNT => {
495                RangeBound::key(super::super::slot::HashSlot::new(next).unwrap().range_key())
496            }
497            _ => RangeBound::Max,
498        };
499        RangeBounds::new(lower, upper).unwrap()
500    }
501
502    fn hash_slot_range(
503        coll: &CollectionId,
504        id: u64,
505        shard_key: &[u8],
506        owner: &str,
507    ) -> RangeOwnership {
508        RangeOwnership::establish(
509            coll.clone(),
510            RangeId::new(id),
511            ShardKeyMode::Hash,
512            single_hash_slot_bounds(shard_key),
513            ident(owner),
514            Vec::<NodeIdentity>::new(),
515            PlacementMetadata::with_replication_factor(1),
516        )
517    }
518
519    fn catalog_with(ranges: impl IntoIterator<Item = RangeOwnership>) -> ShardOwnershipCatalog {
520        let mut catalog = ShardOwnershipCatalog::new();
521        for range in ranges {
522            catalog.apply_update(range).unwrap();
523        }
524        catalog
525    }
526
527    // AC #1: a topology payload carries enough per-range routing metadata (owner,
528    // replicas, epoch, version, bounds) for a driver to route directly.
529    #[test]
530    fn snapshot_exposes_routing_metadata_for_direct_routing() {
531        let orders = collection("orders");
532        let catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &["CN=node-b"])]);
533
534        let snapshot = catalog.topology_snapshot();
535        assert_eq!(snapshot.version(), CatalogVersion::initial());
536        assert_eq!(snapshot.ranges().len(), 1);
537
538        let range = snapshot
539            .route(&orders, b"any-key")
540            .expect("full range covers all keys");
541        assert_eq!(range.owner(), &ident("CN=node-a"));
542        assert_eq!(range.replicas(), &[ident("CN=node-b")]);
543        assert_eq!(range.epoch(), OwnershipEpoch::initial());
544        assert_eq!(range.range_id(), RangeId::new(1));
545    }
546
547    // AC #1: an ordered, multi-range collection routes keys to distinct owners
548    // entirely client-side from the snapshot.
549    #[test]
550    fn snapshot_routes_keys_to_distinct_owners() {
551        let parts = collection("parts");
552        let catalog = catalog_with([
553            split_range(
554                &parts,
555                1,
556                RangeBound::Min,
557                RangeBound::key(b"m"),
558                "CN=node-a",
559            ),
560            split_range(
561                &parts,
562                2,
563                RangeBound::key(b"m"),
564                RangeBound::Max,
565                "CN=node-b",
566            ),
567        ]);
568        let snapshot = catalog.topology_snapshot();
569
570        assert_eq!(
571            snapshot.route(&parts, b"apple").unwrap().owner(),
572            &ident("CN=node-a")
573        );
574        assert_eq!(
575            snapshot.route(&parts, b"zebra").unwrap().owner(),
576            &ident("CN=node-b")
577        );
578    }
579
580    // AC #3: a driver polls a snapshot and resolves owners from its cache.
581    #[test]
582    fn client_resolves_owner_from_polled_snapshot() {
583        let orders = collection("orders");
584        let catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &[])]);
585        let client = ClientTopology::from_snapshot(catalog.topology_snapshot());
586
587        assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-a"));
588        assert!(!client.needs_refresh());
589    }
590
591    #[test]
592    fn client_resolves_hash_collection_by_shard_key_slot() {
593        let orders = collection("orders");
594        let key = b"tenant:42";
595        let catalog = catalog_with([hash_slot_range(&orders, 1, key, "CN=node-a")]);
596        let client = ClientTopology::from_snapshot(catalog.topology_snapshot());
597
598        let routed = client
599            .route_shard_key(&orders, key)
600            .expect("hash slot range covers the logical shard key");
601        assert_eq!(routed.owner(), &ident("CN=node-a"));
602        assert_eq!(client.resolve(&orders, key).unwrap(), &ident("CN=node-a"));
603    }
604
605    // AC #3 + polling baseline: refresh is monotonic — a newer poll is adopted, a
606    // stale or duplicate poll is ignored and cannot roll the cache backwards.
607    #[test]
608    fn refresh_is_monotonic() {
609        let orders = collection("orders");
610        let mut catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &["CN=node-b"])]);
611        let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
612        let v1 = client.version();
613
614        // Ownership transfers a -> b; poll the new snapshot.
615        let r = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
616        catalog
617            .apply_update(r.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
618            .unwrap();
619        let fresh = catalog.topology_snapshot();
620        assert!(fresh.version() > v1);
621
622        assert_eq!(
623            client.apply_refresh(fresh.clone()),
624            RefreshOutcome::Applied { ranges_changed: 1 }
625        );
626        assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-b"));
627
628        // Re-applying the same snapshot is a no-op.
629        assert_eq!(client.apply_refresh(fresh), RefreshOutcome::Ignored);
630    }
631
632    #[test]
633    fn refresh_applies_same_generation_snapshot_when_another_range_changed() {
634        let parts = collection("parts");
635        let mut catalog = catalog_with([
636            split_range(
637                &parts,
638                1,
639                RangeBound::Min,
640                RangeBound::key(b"m"),
641                "CN=node-a",
642            ),
643            split_range(
644                &parts,
645                2,
646                RangeBound::key(b"m"),
647                RangeBound::Max,
648                "CN=node-b",
649            ),
650        ]);
651        let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
652
653        let r1 = catalog.range(&parts, RangeId::new(1)).unwrap().clone();
654        catalog
655            .apply_update(r1.transfer_to(ident("CN=node-c"), Vec::<NodeIdentity>::new()))
656            .unwrap();
657        assert_eq!(
658            client.apply_refresh(catalog.topology_snapshot()),
659            RefreshOutcome::Applied { ranges_changed: 1 }
660        );
661        assert_eq!(
662            client.resolve(&parts, b"apple").unwrap(),
663            &ident("CN=node-c")
664        );
665
666        let r2 = catalog.range(&parts, RangeId::new(2)).unwrap().clone();
667        catalog
668            .apply_update(r2.transfer_to(ident("CN=node-d"), Vec::<NodeIdentity>::new()))
669            .unwrap();
670        let same_generation = catalog.topology_snapshot();
671        assert_eq!(same_generation.version(), client.version());
672
673        assert_eq!(
674            client.apply_refresh(same_generation),
675            RefreshOutcome::Applied { ranges_changed: 1 }
676        );
677        assert_eq!(
678            client.resolve(&parts, b"zebra").unwrap(),
679            &ident("CN=node-d")
680        );
681    }
682
683    #[test]
684    fn equal_generation_refresh_does_not_roll_back_a_newer_range() {
685        let parts = collection("parts");
686        let base = catalog_with([
687            split_range(
688                &parts,
689                1,
690                RangeBound::Min,
691                RangeBound::key(b"m"),
692                "CN=node-a",
693            ),
694            split_range(
695                &parts,
696                2,
697                RangeBound::key(b"m"),
698                RangeBound::Max,
699                "CN=node-b",
700            ),
701        ]);
702        let mut current_catalog = base.clone();
703        let mut stale_fork = base;
704        let mut client = ClientTopology::from_snapshot(current_catalog.topology_snapshot());
705
706        let r1 = current_catalog
707            .range(&parts, RangeId::new(1))
708            .unwrap()
709            .clone();
710        current_catalog
711            .apply_update(r1.transfer_to(ident("CN=node-c"), Vec::<NodeIdentity>::new()))
712            .unwrap();
713        assert!(client
714            .apply_refresh(current_catalog.topology_snapshot())
715            .was_applied());
716
717        let r2 = stale_fork.range(&parts, RangeId::new(2)).unwrap().clone();
718        stale_fork
719            .apply_update(r2.transfer_to(ident("CN=node-d"), Vec::<NodeIdentity>::new()))
720            .unwrap();
721        let fork_snapshot = stale_fork.topology_snapshot();
722        assert_eq!(fork_snapshot.version(), client.version());
723
724        assert_eq!(client.apply_refresh(fork_snapshot), RefreshOutcome::Ignored);
725        assert_eq!(
726            client.resolve(&parts, b"apple").unwrap(),
727            &ident("CN=node-c")
728        );
729        assert_eq!(
730            client.resolve(&parts, b"zebra").unwrap(),
731            &ident("CN=node-b")
732        );
733    }
734
735    // AC #2 + AC #3: a stale-ownership redirect hint corrects the cache without
736    // being authoritative — it advances the owner but raises needs_refresh.
737    #[test]
738    fn redirect_hint_corrects_cache_but_is_not_authoritative() {
739        let orders = collection("orders");
740        let mut catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &["CN=node-b"])]);
741        let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
742
743        // Ownership moves a -> b on the server; the driver has not polled yet.
744        let r = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
745        catalog
746            .apply_update(r.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
747            .unwrap();
748
749        // The driver routes to its stale owner (node-a); the server redirects.
750        let stale_owner = client.resolve(&orders, b"k").unwrap().clone();
751        assert_eq!(stale_owner, ident("CN=node-a"));
752        let request =
753            RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::Transaction);
754        let hint = match catalog.plan_route(&stale_owner, &request, &RoutingPolicy::forwarding()) {
755            RouteDecision::Redirect { hint, .. } => hint,
756            other => panic!("expected redirect, got {other:?}"),
757        };
758
759        // Applying the hint corrects routing but flags the cache as advisory.
760        assert_eq!(client.apply_hint(&hint), HintOutcome::Corrected);
761        assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-b"));
762        assert!(
763            client.needs_refresh(),
764            "a hint is advisory, not authoritative"
765        );
766
767        // An authoritative poll reconciles and clears the advisory flag.
768        assert!(client
769            .apply_refresh(catalog.topology_snapshot())
770            .was_applied());
771        assert!(!client.needs_refresh());
772        // Authority restored the replica set a hint never carried.
773        let range = client.range(&orders, RangeId::new(1)).unwrap();
774        assert_eq!(range.replicas(), &[ident("CN=node-a")]);
775    }
776
777    // AC #2: a hint cannot introduce a range — hints are not the source of
778    // ownership authority, so an unknown-range hint only forces a refresh.
779    #[test]
780    fn hint_for_unknown_range_does_not_invent_topology() {
781        let orders = collection("orders");
782        let other = collection("other");
783        let catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &[])]);
784        let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
785
786        // Forge a hint for a collection/range the cache has never seen.
787        let foreign = catalog_with([full_range(&other, 9, "CN=node-z", &[])]);
788        let request =
789            RoutedRequest::new(other.clone(), b"k".to_vec(), RequestOperation::Transaction);
790        let hint = foreign
791            .plan_route(&ident("CN=node-b"), &request, &RoutingPolicy::forwarding())
792            .hint()
793            .cloned()
794            .unwrap();
795
796        assert_eq!(client.apply_hint(&hint), HintOutcome::UnknownRange);
797        assert!(
798            client.range(&other, RangeId::new(9)).is_none(),
799            "no phantom range"
800        );
801        assert!(client.needs_refresh());
802    }
803
804    // AC #2: an authoritative apply that already overtook the hint makes the hint
805    // a no-op — authority wins.
806    #[test]
807    fn stale_hint_is_ignored_after_authoritative_catch_up() {
808        let orders = collection("orders");
809        let mut catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &["CN=node-b"])]);
810        let request =
811            RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::Transaction);
812        // Capture an early hint while a still owns the range.
813        let early_hint = catalog
814            .plan_route(&ident("CN=node-b"), &request, &RoutingPolicy::forwarding())
815            .hint()
816            .cloned()
817            .unwrap();
818
819        // Server advances; driver polls the fresh snapshot authoritatively.
820        let r = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
821        catalog
822            .apply_update(r.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
823            .unwrap();
824        let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
825
826        // The stale early hint (owner a, epoch 1) must not roll routing back.
827        assert_eq!(client.apply_hint(&early_hint), HintOutcome::AlreadyCurrent);
828        assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-b"));
829        assert!(!client.needs_refresh());
830    }
831
832    // AC #4: a pushed full snapshot is adopted exactly like a poll.
833    #[test]
834    fn push_full_snapshot_applies_like_a_poll() {
835        let orders = collection("orders");
836        let mut catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &[])]);
837        let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
838
839        let r = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
840        catalog
841            .apply_update(r.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
842            .unwrap();
843        let update = TopologyUpdate::Full(catalog.topology_snapshot());
844
845        assert!(client.apply_update(update).was_applied());
846        assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-b"));
847    }
848
849    // AC #4: a pushed single-range delta advances just that range.
850    #[test]
851    fn push_range_delta_advances_one_range() {
852        let parts = collection("parts");
853        let mut catalog = catalog_with([
854            split_range(
855                &parts,
856                1,
857                RangeBound::Min,
858                RangeBound::key(b"m"),
859                "CN=node-a",
860            ),
861            split_range(
862                &parts,
863                2,
864                RangeBound::key(b"m"),
865                RangeBound::Max,
866                "CN=node-b",
867            ),
868        ]);
869        let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
870
871        // Only range 2 moves b -> c.
872        let r2 = catalog.range(&parts, RangeId::new(2)).unwrap().clone();
873        catalog
874            .apply_update(r2.transfer_to(ident("CN=node-c"), Vec::<NodeIdentity>::new()))
875            .unwrap();
876        let moved = catalog
877            .topology_snapshot()
878            .range(&parts, RangeId::new(2))
879            .unwrap()
880            .clone();
881
882        assert_eq!(
883            client.apply_update(TopologyUpdate::Range(moved)),
884            RefreshOutcome::Applied { ranges_changed: 1 }
885        );
886        // Range 1 untouched, range 2 advanced.
887        assert_eq!(
888            client.resolve(&parts, b"apple").unwrap(),
889            &ident("CN=node-a")
890        );
891        assert_eq!(
892            client.resolve(&parts, b"zebra").unwrap(),
893            &ident("CN=node-c")
894        );
895    }
896
897    // AC #5: behavior when push updates are missed — push is not mandatory for
898    // correctness. A dropped push leaves the cache stale, but a redirect hint and
899    // a later poll converge it anyway.
900    #[test]
901    fn missed_push_still_converges_via_hint_and_poll() {
902        let orders = collection("orders");
903        let mut catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &["CN=node-b"])]);
904        let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
905
906        // Server moves ownership a -> b. The push for this change is DROPPED:
907        // we deliberately never call apply_update with it.
908        let r = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
909        catalog
910            .apply_update(r.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
911            .unwrap();
912        let _dropped_push = TopologyUpdate::Full(catalog.topology_snapshot());
913
914        // The cache is stale and still points at node-a.
915        assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-a"));
916
917        // Correctness path: the stale request is redirected; the hint corrects.
918        let request =
919            RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::Transaction);
920        let hint = catalog
921            .plan_route(&ident("CN=node-a"), &request, &RoutingPolicy::forwarding())
922            .hint()
923            .cloned()
924            .unwrap();
925        assert_eq!(client.apply_hint(&hint), HintOutcome::Corrected);
926        assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-b"));
927        assert!(client.needs_refresh());
928
929        // Baseline poll reconciles the cache fully despite the missed push.
930        assert!(client
931            .apply_refresh(catalog.topology_snapshot())
932            .was_applied());
933        assert!(!client.needs_refresh());
934    }
935
936    // AC #4: out-of-order pushes are safe — a newer delta applied before an older
937    // one wins, and the older one is then ignored.
938    #[test]
939    fn out_of_order_push_keeps_newest() {
940        let orders = collection("orders");
941        let mut catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &["CN=node-b"])]);
942        let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
943
944        // v2: a -> b.
945        let r1 = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
946        catalog
947            .apply_update(r1.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
948            .unwrap();
949        let push_v2 = catalog
950            .topology_snapshot()
951            .range(&orders, RangeId::new(1))
952            .unwrap()
953            .clone();
954
955        // v3: b -> c.
956        let r2 = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
957        catalog
958            .apply_update(r2.transfer_to(ident("CN=node-c"), [ident("CN=node-b")]))
959            .unwrap();
960        let push_v3 = catalog
961            .topology_snapshot()
962            .range(&orders, RangeId::new(1))
963            .unwrap()
964            .clone();
965
966        // The newer push (v3) arrives first and is applied.
967        assert!(client
968            .apply_update(TopologyUpdate::Range(push_v3))
969            .was_applied());
970        assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-c"));
971        // The older push (v2) arrives late and is ignored — no rollback.
972        assert_eq!(
973            client.apply_update(TopologyUpdate::Range(push_v2)),
974            RefreshOutcome::Ignored
975        );
976        assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-c"));
977    }
978}