Skip to main content

reddb_server/cluster/
topology.rs

1//! Topology refresh and routing-hint client contract (issue #994, PRD #987, ADR 0037).
2//!
3//! Any-node routing ([`plan_route`](ShardOwnershipCatalog::plan_route), issue
4//! #993) lets a request land on any data member and still do something correct.
5//! This module is the *client-facing* half of that story: the contract a driver
6//! uses to learn the cluster's shape, route directly to range owners, and stay
7//! correct as ownership moves. Three mechanisms, in strict priority of authority:
8//!
9//! 1. **Polling** is the baseline and the source of authority. A driver fetches a
10//!    [`TopologySnapshot`] — every range's bounds, owner, replicas, ownership
11//!    epoch, and catalog version — and caches it in a [`ClientTopology`]. This is
12//!    the only path that establishes *authoritative* topology, and a driver that
13//!    only ever polls is always eventually correct.
14//!
15//! 2. **Routing hints** ([`RoutingHint`](super::routing::RoutingHint), carried on
16//!    a redirect response from issue #993) are an *advisory correction*. When a
17//!    write reaches a stale owner the response names the current owner+epoch; the
18//!    driver applies that hint to stop hammering the stale node, but the hint is
19//!    explicitly **not** authoritative — it cannot introduce ranges, it carries no
20//!    replica set, and applying one raises [`needs_refresh`](ClientTopology::needs_refresh)
21//!    so the driver knows to reconcile against an authoritative poll. This is
22//!    ADR 0037's "stale ownership responses remain the mandatory correctness path"
23//!    expressed on the client: correctness never *depends* on a hint, a hint only
24//!    *accelerates* convergence.
25//!
26//! 3. **Push / subscription updates** ([`TopologyUpdate`]) are an optional
27//!    accelerator where the transport supports them. A pushed snapshot or
28//!    single-range delta flows through the *same monotonic apply path* as a poll,
29//!    so a driver that misses a push is never wrong — the next poll (or the next
30//!    redirect hint) carries it forward. Push is never mandatory for correctness.
31//!
32//! Like the rest of the cluster module this is a pure data/decision layer with no
33//! I/O: [`topology_snapshot`](ShardOwnershipCatalog::topology_snapshot) projects a
34//! catalog into a driver-facing payload, and [`ClientTopology`] models exactly how
35//! a driver folds polls, hints, and pushes together. The transport that serialises
36//! a snapshot onto the wire or pushes a delta is a separate concern on top.
37
38use std::collections::BTreeMap;
39
40use super::identity::NodeIdentity;
41use super::ownership::{
42    CatalogVersion, CollectionId, OwnershipEpoch, RangeBounds, RangeId, RangeOwnership,
43    ShardOwnershipCatalog,
44};
45use super::routing::RoutingHint;
46
47/// One range's routing metadata as a driver sees it.
48///
49/// Carries everything a driver needs to route a key directly to its owner and
50/// fence the write at the right epoch: the half-open [`bounds`](Self::bounds) for
51/// client-side range routing, the [`owner`](Self::owner) to send to, the
52/// [`replicas`](Self::replicas) for read fan-out, the [`epoch`](Self::epoch) to
53/// stamp a write at, and the [`version`](Self::version) used to decide whether an
54/// incoming update is newer than what is cached.
55#[derive(Debug, Clone, PartialEq, Eq)]
56pub struct TopologyRange {
57    collection: CollectionId,
58    range_id: RangeId,
59    bounds: RangeBounds,
60    owner: NodeIdentity,
61    replicas: Vec<NodeIdentity>,
62    epoch: OwnershipEpoch,
63    version: CatalogVersion,
64}
65
66impl TopologyRange {
67    fn from_ownership(range: &RangeOwnership) -> Self {
68        Self {
69            collection: range.collection().clone(),
70            range_id: range.range_id(),
71            bounds: range.bounds().clone(),
72            owner: range.owner().clone(),
73            replicas: range.replicas().to_vec(),
74            epoch: range.epoch(),
75            version: range.version(),
76        }
77    }
78
79    pub fn collection(&self) -> &CollectionId {
80        &self.collection
81    }
82
83    pub fn range_id(&self) -> RangeId {
84        self.range_id
85    }
86
87    pub fn bounds(&self) -> &RangeBounds {
88        &self.bounds
89    }
90
91    pub fn owner(&self) -> &NodeIdentity {
92        &self.owner
93    }
94
95    pub fn replicas(&self) -> &[NodeIdentity] {
96        &self.replicas
97    }
98
99    /// The epoch a driver should stamp a write to this range at — the same epoch
100    /// the owner's [`admit_public_write`](ShardOwnershipCatalog::admit_public_write)
101    /// gate will check (issue #990).
102    pub fn epoch(&self) -> OwnershipEpoch {
103        self.epoch
104    }
105
106    pub fn version(&self) -> CatalogVersion {
107        self.version
108    }
109
110    fn key(&self) -> (CollectionId, RangeId) {
111        (self.collection.clone(), self.range_id)
112    }
113}
114
115/// A point-in-time, driver-facing projection of the ownership catalog — the
116/// payload a topology poll returns.
117///
118/// The [`version`](Self::version) is the snapshot's generation: the **maximum**
119/// catalog version across its ranges (or [`CatalogVersion::initial`] for an empty
120/// cluster). Because every accepted catalog edit strictly advances a range's
121/// version, this max is a monotonic high-water mark — a snapshot with a strictly
122/// greater version is strictly newer, which is exactly what a driver compares to
123/// decide whether to adopt a freshly polled or pushed snapshot.
124#[derive(Debug, Clone, PartialEq, Eq)]
125pub struct TopologySnapshot {
126    version: CatalogVersion,
127    ranges: Vec<TopologyRange>,
128}
129
130impl TopologySnapshot {
131    /// The snapshot generation — the high-water catalog version across its ranges.
132    pub fn version(&self) -> CatalogVersion {
133        self.version
134    }
135
136    /// Every range in the snapshot, in `(collection, range_id)` order.
137    pub fn ranges(&self) -> &[TopologyRange] {
138        &self.ranges
139    }
140
141    /// Look up one range by identity.
142    pub fn range(&self, collection: &CollectionId, range_id: RangeId) -> Option<&TopologyRange> {
143        self.ranges
144            .iter()
145            .find(|r| r.collection() == collection && r.range_id() == range_id)
146    }
147
148    /// Route a key to the range that owns it, by the same half-open containment
149    /// predicate the server uses — so a driver can resolve owners locally without
150    /// a round trip.
151    pub fn route(&self, collection: &CollectionId, key: &[u8]) -> Option<&TopologyRange> {
152        self.ranges
153            .iter()
154            .find(|r| r.collection() == collection && r.bounds().contains(key))
155    }
156}
157
158impl ShardOwnershipCatalog {
159    /// Project the catalog into a driver-facing [`TopologySnapshot`] — the payload
160    /// a topology poll serves (issue #994).
161    ///
162    /// The snapshot carries every range's bounds, owner, replicas, ownership
163    /// epoch, and catalog version, and stamps a generation
164    /// ([`version`](TopologySnapshot::version)) drivers use to tell a newer
165    /// snapshot from a stale one.
166    pub fn topology_snapshot(&self) -> TopologySnapshot {
167        let ranges: Vec<TopologyRange> =
168            self.entries().map(TopologyRange::from_ownership).collect();
169        let version = ranges
170            .iter()
171            .map(TopologyRange::version)
172            .max()
173            .unwrap_or_else(CatalogVersion::initial);
174        TopologySnapshot { version, ranges }
175    }
176}
177
178/// The result of folding a polled or pushed snapshot/delta into a [`ClientTopology`].
179#[derive(Debug, Clone, Copy, PartialEq, Eq)]
180pub enum RefreshOutcome {
181    /// The incoming data was strictly newer and was adopted; `ranges_changed`
182    /// ranges were added or advanced.
183    Applied { ranges_changed: usize },
184    /// The incoming data was not newer than what is cached and was ignored — the
185    /// monotonicity guard that makes out-of-order and duplicate delivery safe.
186    Ignored,
187}
188
189impl RefreshOutcome {
190    pub fn was_applied(self) -> bool {
191        matches!(self, RefreshOutcome::Applied { .. })
192    }
193}
194
195/// The result of applying an advisory [`RoutingHint`](super::routing::RoutingHint)
196/// correction to a [`ClientTopology`].
197#[derive(Debug, Clone, Copy, PartialEq, Eq)]
198pub enum HintOutcome {
199    /// The hint named a newer owner/epoch for a known range; the cached owner was
200    /// corrected and [`needs_refresh`](ClientTopology::needs_refresh) was raised.
201    Corrected,
202    /// The cache already held this range at or beyond the hint's version; nothing
203    /// changed (an authoritative apply had already overtaken the hint).
204    AlreadyCurrent,
205    /// The hint named a range the cache does not know. A hint is never
206    /// authoritative enough to *introduce* a range, so no range was created;
207    /// [`needs_refresh`](ClientTopology::needs_refresh) was raised so the driver
208    /// polls for the authoritative topology.
209    UnknownRange,
210}
211
212/// A topology change delivered over a push / subscription transport.
213///
214/// Both variants flow through the same monotonic apply path as a poll
215/// ([`ClientTopology::apply_update`]), so a missed or out-of-order push is never a
216/// correctness problem — it is only a missed *acceleration*.
217#[derive(Debug, Clone, PartialEq, Eq)]
218pub enum TopologyUpdate {
219    /// A full snapshot push — identical in effect to a poll.
220    Full(TopologySnapshot),
221    /// A single range advanced; carries just that range's new metadata.
222    Range(TopologyRange),
223}
224
225/// A driver's cached view of cluster topology, and the contract for keeping it
226/// correct (issue #994).
227///
228/// Holds the ranges the driver believes in, the authoritative
229/// [`version`](Self::version) of the last *polled or pushed* snapshot, and a
230/// [`needs_refresh`](Self::needs_refresh) flag that is raised whenever the driver
231/// is running on an advisory hint correction rather than authoritative topology.
232///
233/// The three inputs compose by authority: [`apply_refresh`](Self::apply_refresh)
234/// and [`apply_update`](Self::apply_update) are authoritative and monotonic;
235/// [`apply_hint`](Self::apply_hint) is advisory and only ever corrects a *known*
236/// range's owner/epoch. Authority always wins — an authoritative apply that is
237/// newer overwrites a prior hint correction and clears `needs_refresh`.
238#[derive(Debug, Clone, PartialEq, Eq)]
239pub struct ClientTopology {
240    version: CatalogVersion,
241    ranges: BTreeMap<(CollectionId, RangeId), TopologyRange>,
242    needs_refresh: bool,
243}
244
245impl ClientTopology {
246    /// Seed a cache from an initial topology poll.
247    pub fn from_snapshot(snapshot: TopologySnapshot) -> Self {
248        let mut cache = Self {
249            version: snapshot.version(),
250            ranges: BTreeMap::new(),
251            needs_refresh: false,
252        };
253        for range in snapshot.ranges {
254            cache.ranges.insert(range.key(), range);
255        }
256        cache
257    }
258
259    /// The authoritative generation of this cache — the version of the most recent
260    /// snapshot or range delta adopted. Advisory hint corrections do **not** move
261    /// it, so it always reflects authoritative topology.
262    pub fn version(&self) -> CatalogVersion {
263        self.version
264    }
265
266    /// Whether the cache is running on an advisory hint correction and should poll
267    /// for authoritative topology. Raised by [`apply_hint`](Self::apply_hint);
268    /// cleared by an authoritative [`apply_refresh`](Self::apply_refresh) (or a
269    /// full-snapshot [`apply_update`](Self::apply_update)).
270    pub fn needs_refresh(&self) -> bool {
271        self.needs_refresh
272    }
273
274    /// The cached range owning `key`, by the same half-open routing predicate the
275    /// server uses.
276    pub fn route(&self, collection: &CollectionId, key: &[u8]) -> Option<&TopologyRange> {
277        self.ranges
278            .values()
279            .find(|r| r.collection() == collection && r.bounds().contains(key))
280    }
281
282    /// The owner a driver should send a request for `key` to — the routing answer.
283    pub fn resolve(&self, collection: &CollectionId, key: &[u8]) -> Option<&NodeIdentity> {
284        self.route(collection, key).map(TopologyRange::owner)
285    }
286
287    /// One cached range by identity.
288    pub fn range(&self, collection: &CollectionId, range_id: RangeId) -> Option<&TopologyRange> {
289        self.ranges.get(&(collection.clone(), range_id))
290    }
291
292    /// Adopt a freshly polled snapshot — the authoritative refresh path.
293    ///
294    /// Monotonic: the snapshot is adopted only if its generation strictly advances
295    /// the cache's authoritative [`version`](Self::version) (or the cache is
296    /// empty). A newer snapshot replaces the cached ranges wholesale and clears
297    /// [`needs_refresh`](Self::needs_refresh); an equal-or-older one is
298    /// [`Ignored`](RefreshOutcome::Ignored), so a late or duplicated poll can never
299    /// roll the cache backwards.
300    pub fn apply_refresh(&mut self, snapshot: TopologySnapshot) -> RefreshOutcome {
301        if !self.ranges.is_empty() && snapshot.version() <= self.version {
302            return RefreshOutcome::Ignored;
303        }
304        let mut changed = 0usize;
305        let mut next: BTreeMap<(CollectionId, RangeId), TopologyRange> = BTreeMap::new();
306        for range in snapshot.ranges {
307            let key = range.key();
308            if self.ranges.get(&key) != Some(&range) {
309                changed += 1;
310            }
311            next.insert(key, range);
312        }
313        self.ranges = next;
314        self.version = snapshot.version;
315        self.needs_refresh = false;
316        RefreshOutcome::Applied {
317            ranges_changed: changed,
318        }
319    }
320
321    /// Fold a pushed topology update in — the optional push/subscription path.
322    ///
323    /// A [`Full`](TopologyUpdate::Full) push is exactly an
324    /// [`apply_refresh`](Self::apply_refresh). A [`Range`](TopologyUpdate::Range)
325    /// delta advances a single range when its version is newer than the cached
326    /// one (and bumps the authoritative version to match), or is
327    /// [`Ignored`](RefreshOutcome::Ignored) when it is not newer. Because both go
328    /// through the same monotonic guard, a missed push is never a correctness
329    /// problem — a later poll or delta carries the change forward.
330    pub fn apply_update(&mut self, update: TopologyUpdate) -> RefreshOutcome {
331        match update {
332            TopologyUpdate::Full(snapshot) => self.apply_refresh(snapshot),
333            TopologyUpdate::Range(range) => {
334                let key = range.key();
335                let newer = match self.ranges.get(&key) {
336                    Some(current) => range.version() > current.version(),
337                    None => true,
338                };
339                if !newer {
340                    return RefreshOutcome::Ignored;
341                }
342                if range.version() > self.version {
343                    self.version = range.version();
344                }
345                self.ranges.insert(key, range);
346                RefreshOutcome::Applied { ranges_changed: 1 }
347            }
348        }
349    }
350
351    /// Apply an advisory routing-hint correction from a redirect response — the
352    /// stale-ownership correctness path (issue #993, ADR 0037).
353    ///
354    /// A hint is **not** authoritative: it can only correct the owner/epoch of a
355    /// range the cache already knows, and only when it is strictly newer than the
356    /// cached range. On a correction the cached owner/epoch/version advance (the
357    /// known bounds are kept; the replica set is left as-is because a hint carries
358    /// none) and [`needs_refresh`](Self::needs_refresh) is raised so the driver
359    /// reconciles against an authoritative poll. A hint for an unknown range
360    /// creates nothing — it only raises `needs_refresh`.
361    pub fn apply_hint(&mut self, hint: &RoutingHint) -> HintOutcome {
362        let key = (hint.collection().clone(), hint.range_id());
363        match self.ranges.get_mut(&key) {
364            Some(range) => {
365                if hint.version() <= range.version {
366                    return HintOutcome::AlreadyCurrent;
367                }
368                range.owner = hint.owner().clone();
369                range.epoch = hint.epoch();
370                range.version = hint.version();
371                self.needs_refresh = true;
372                HintOutcome::Corrected
373            }
374            None => {
375                self.needs_refresh = true;
376                HintOutcome::UnknownRange
377            }
378        }
379    }
380}
381
382#[cfg(test)]
383mod tests {
384    use super::*;
385    use crate::cluster::ownership::{PlacementMetadata, RangeBound, ShardKeyMode};
386    use crate::cluster::routing::{RequestOperation, RouteDecision, RoutedRequest, RoutingPolicy};
387
388    fn collection(name: &str) -> CollectionId {
389        CollectionId::new(name).unwrap()
390    }
391
392    fn ident(cn: &str) -> NodeIdentity {
393        NodeIdentity::from_certificate_subject(cn).unwrap()
394    }
395
396    fn full_range(coll: &CollectionId, id: u64, owner: &str, replicas: &[&str]) -> RangeOwnership {
397        RangeOwnership::establish(
398            coll.clone(),
399            RangeId::new(id),
400            ShardKeyMode::Hash,
401            RangeBounds::full(),
402            ident(owner),
403            replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
404            PlacementMetadata::with_replication_factor(3),
405        )
406    }
407
408    fn split_range(
409        coll: &CollectionId,
410        id: u64,
411        lower: RangeBound,
412        upper: RangeBound,
413        owner: &str,
414    ) -> RangeOwnership {
415        RangeOwnership::establish(
416            coll.clone(),
417            RangeId::new(id),
418            ShardKeyMode::Ordered,
419            RangeBounds::new(lower, upper).unwrap(),
420            ident(owner),
421            Vec::<NodeIdentity>::new(),
422            PlacementMetadata::with_replication_factor(1),
423        )
424    }
425
426    fn catalog_with(ranges: impl IntoIterator<Item = RangeOwnership>) -> ShardOwnershipCatalog {
427        let mut catalog = ShardOwnershipCatalog::new();
428        for range in ranges {
429            catalog.apply_update(range).unwrap();
430        }
431        catalog
432    }
433
434    // AC #1: a topology payload carries enough per-range routing metadata (owner,
435    // replicas, epoch, version, bounds) for a driver to route directly.
436    #[test]
437    fn snapshot_exposes_routing_metadata_for_direct_routing() {
438        let orders = collection("orders");
439        let catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &["CN=node-b"])]);
440
441        let snapshot = catalog.topology_snapshot();
442        assert_eq!(snapshot.version(), CatalogVersion::initial());
443        assert_eq!(snapshot.ranges().len(), 1);
444
445        let range = snapshot
446            .route(&orders, b"any-key")
447            .expect("full range covers all keys");
448        assert_eq!(range.owner(), &ident("CN=node-a"));
449        assert_eq!(range.replicas(), &[ident("CN=node-b")]);
450        assert_eq!(range.epoch(), OwnershipEpoch::initial());
451        assert_eq!(range.range_id(), RangeId::new(1));
452    }
453
454    // AC #1: an ordered, multi-range collection routes keys to distinct owners
455    // entirely client-side from the snapshot.
456    #[test]
457    fn snapshot_routes_keys_to_distinct_owners() {
458        let parts = collection("parts");
459        let catalog = catalog_with([
460            split_range(
461                &parts,
462                1,
463                RangeBound::Min,
464                RangeBound::key(b"m"),
465                "CN=node-a",
466            ),
467            split_range(
468                &parts,
469                2,
470                RangeBound::key(b"m"),
471                RangeBound::Max,
472                "CN=node-b",
473            ),
474        ]);
475        let snapshot = catalog.topology_snapshot();
476
477        assert_eq!(
478            snapshot.route(&parts, b"apple").unwrap().owner(),
479            &ident("CN=node-a")
480        );
481        assert_eq!(
482            snapshot.route(&parts, b"zebra").unwrap().owner(),
483            &ident("CN=node-b")
484        );
485    }
486
487    // AC #3: a driver polls a snapshot and resolves owners from its cache.
488    #[test]
489    fn client_resolves_owner_from_polled_snapshot() {
490        let orders = collection("orders");
491        let catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &[])]);
492        let client = ClientTopology::from_snapshot(catalog.topology_snapshot());
493
494        assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-a"));
495        assert!(!client.needs_refresh());
496    }
497
498    // AC #3 + polling baseline: refresh is monotonic — a newer poll is adopted, a
499    // stale or duplicate poll is ignored and cannot roll the cache backwards.
500    #[test]
501    fn refresh_is_monotonic() {
502        let orders = collection("orders");
503        let mut catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &["CN=node-b"])]);
504        let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
505        let v1 = client.version();
506
507        // Ownership transfers a -> b; poll the new snapshot.
508        let r = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
509        catalog
510            .apply_update(r.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
511            .unwrap();
512        let fresh = catalog.topology_snapshot();
513        assert!(fresh.version() > v1);
514
515        assert_eq!(
516            client.apply_refresh(fresh.clone()),
517            RefreshOutcome::Applied { ranges_changed: 1 }
518        );
519        assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-b"));
520
521        // Re-applying the same snapshot is a no-op.
522        assert_eq!(client.apply_refresh(fresh), RefreshOutcome::Ignored);
523    }
524
525    // AC #2 + AC #3: a stale-ownership redirect hint corrects the cache without
526    // being authoritative — it advances the owner but raises needs_refresh.
527    #[test]
528    fn redirect_hint_corrects_cache_but_is_not_authoritative() {
529        let orders = collection("orders");
530        let mut catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &["CN=node-b"])]);
531        let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
532
533        // Ownership moves a -> b on the server; the driver has not polled yet.
534        let r = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
535        catalog
536            .apply_update(r.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
537            .unwrap();
538
539        // The driver routes to its stale owner (node-a); the server redirects.
540        let stale_owner = client.resolve(&orders, b"k").unwrap().clone();
541        assert_eq!(stale_owner, ident("CN=node-a"));
542        let request =
543            RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::Transaction);
544        let hint = match catalog.plan_route(&stale_owner, &request, &RoutingPolicy::forwarding()) {
545            RouteDecision::Redirect { hint, .. } => hint,
546            other => panic!("expected redirect, got {other:?}"),
547        };
548
549        // Applying the hint corrects routing but flags the cache as advisory.
550        assert_eq!(client.apply_hint(&hint), HintOutcome::Corrected);
551        assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-b"));
552        assert!(
553            client.needs_refresh(),
554            "a hint is advisory, not authoritative"
555        );
556
557        // An authoritative poll reconciles and clears the advisory flag.
558        assert!(client
559            .apply_refresh(catalog.topology_snapshot())
560            .was_applied());
561        assert!(!client.needs_refresh());
562        // Authority restored the replica set a hint never carried.
563        let range = client.range(&orders, RangeId::new(1)).unwrap();
564        assert_eq!(range.replicas(), &[ident("CN=node-a")]);
565    }
566
567    // AC #2: a hint cannot introduce a range — hints are not the source of
568    // ownership authority, so an unknown-range hint only forces a refresh.
569    #[test]
570    fn hint_for_unknown_range_does_not_invent_topology() {
571        let orders = collection("orders");
572        let other = collection("other");
573        let catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &[])]);
574        let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
575
576        // Forge a hint for a collection/range the cache has never seen.
577        let foreign = catalog_with([full_range(&other, 9, "CN=node-z", &[])]);
578        let request =
579            RoutedRequest::new(other.clone(), b"k".to_vec(), RequestOperation::Transaction);
580        let hint = foreign
581            .plan_route(&ident("CN=node-b"), &request, &RoutingPolicy::forwarding())
582            .hint()
583            .cloned()
584            .unwrap();
585
586        assert_eq!(client.apply_hint(&hint), HintOutcome::UnknownRange);
587        assert!(
588            client.range(&other, RangeId::new(9)).is_none(),
589            "no phantom range"
590        );
591        assert!(client.needs_refresh());
592    }
593
594    // AC #2: an authoritative apply that already overtook the hint makes the hint
595    // a no-op — authority wins.
596    #[test]
597    fn stale_hint_is_ignored_after_authoritative_catch_up() {
598        let orders = collection("orders");
599        let mut catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &["CN=node-b"])]);
600        let request =
601            RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::Transaction);
602        // Capture an early hint while a still owns the range.
603        let early_hint = catalog
604            .plan_route(&ident("CN=node-b"), &request, &RoutingPolicy::forwarding())
605            .hint()
606            .cloned()
607            .unwrap();
608
609        // Server advances; driver polls the fresh snapshot authoritatively.
610        let r = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
611        catalog
612            .apply_update(r.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
613            .unwrap();
614        let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
615
616        // The stale early hint (owner a, epoch 1) must not roll routing back.
617        assert_eq!(client.apply_hint(&early_hint), HintOutcome::AlreadyCurrent);
618        assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-b"));
619        assert!(!client.needs_refresh());
620    }
621
622    // AC #4: a pushed full snapshot is adopted exactly like a poll.
623    #[test]
624    fn push_full_snapshot_applies_like_a_poll() {
625        let orders = collection("orders");
626        let mut catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &[])]);
627        let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
628
629        let r = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
630        catalog
631            .apply_update(r.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
632            .unwrap();
633        let update = TopologyUpdate::Full(catalog.topology_snapshot());
634
635        assert!(client.apply_update(update).was_applied());
636        assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-b"));
637    }
638
639    // AC #4: a pushed single-range delta advances just that range.
640    #[test]
641    fn push_range_delta_advances_one_range() {
642        let parts = collection("parts");
643        let mut catalog = catalog_with([
644            split_range(
645                &parts,
646                1,
647                RangeBound::Min,
648                RangeBound::key(b"m"),
649                "CN=node-a",
650            ),
651            split_range(
652                &parts,
653                2,
654                RangeBound::key(b"m"),
655                RangeBound::Max,
656                "CN=node-b",
657            ),
658        ]);
659        let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
660
661        // Only range 2 moves b -> c.
662        let r2 = catalog.range(&parts, RangeId::new(2)).unwrap().clone();
663        catalog
664            .apply_update(r2.transfer_to(ident("CN=node-c"), Vec::<NodeIdentity>::new()))
665            .unwrap();
666        let moved = catalog
667            .topology_snapshot()
668            .range(&parts, RangeId::new(2))
669            .unwrap()
670            .clone();
671
672        assert_eq!(
673            client.apply_update(TopologyUpdate::Range(moved)),
674            RefreshOutcome::Applied { ranges_changed: 1 }
675        );
676        // Range 1 untouched, range 2 advanced.
677        assert_eq!(
678            client.resolve(&parts, b"apple").unwrap(),
679            &ident("CN=node-a")
680        );
681        assert_eq!(
682            client.resolve(&parts, b"zebra").unwrap(),
683            &ident("CN=node-c")
684        );
685    }
686
687    // AC #5: behavior when push updates are missed — push is not mandatory for
688    // correctness. A dropped push leaves the cache stale, but a redirect hint and
689    // a later poll converge it anyway.
690    #[test]
691    fn missed_push_still_converges_via_hint_and_poll() {
692        let orders = collection("orders");
693        let mut catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &["CN=node-b"])]);
694        let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
695
696        // Server moves ownership a -> b. The push for this change is DROPPED:
697        // we deliberately never call apply_update with it.
698        let r = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
699        catalog
700            .apply_update(r.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
701            .unwrap();
702        let _dropped_push = TopologyUpdate::Full(catalog.topology_snapshot());
703
704        // The cache is stale and still points at node-a.
705        assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-a"));
706
707        // Correctness path: the stale request is redirected; the hint corrects.
708        let request =
709            RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::Transaction);
710        let hint = catalog
711            .plan_route(&ident("CN=node-a"), &request, &RoutingPolicy::forwarding())
712            .hint()
713            .cloned()
714            .unwrap();
715        assert_eq!(client.apply_hint(&hint), HintOutcome::Corrected);
716        assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-b"));
717        assert!(client.needs_refresh());
718
719        // Baseline poll reconciles the cache fully despite the missed push.
720        assert!(client
721            .apply_refresh(catalog.topology_snapshot())
722            .was_applied());
723        assert!(!client.needs_refresh());
724    }
725
726    // AC #4: out-of-order pushes are safe — a newer delta applied before an older
727    // one wins, and the older one is then ignored.
728    #[test]
729    fn out_of_order_push_keeps_newest() {
730        let orders = collection("orders");
731        let mut catalog = catalog_with([full_range(&orders, 1, "CN=node-a", &["CN=node-b"])]);
732        let mut client = ClientTopology::from_snapshot(catalog.topology_snapshot());
733
734        // v2: a -> b.
735        let r1 = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
736        catalog
737            .apply_update(r1.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
738            .unwrap();
739        let push_v2 = catalog
740            .topology_snapshot()
741            .range(&orders, RangeId::new(1))
742            .unwrap()
743            .clone();
744
745        // v3: b -> c.
746        let r2 = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
747        catalog
748            .apply_update(r2.transfer_to(ident("CN=node-c"), [ident("CN=node-b")]))
749            .unwrap();
750        let push_v3 = catalog
751            .topology_snapshot()
752            .range(&orders, RangeId::new(1))
753            .unwrap()
754            .clone();
755
756        // The newer push (v3) arrives first and is applied.
757        assert!(client
758            .apply_update(TopologyUpdate::Range(push_v3))
759            .was_applied());
760        assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-c"));
761        // The older push (v2) arrives late and is ignored — no rollback.
762        assert_eq!(
763            client.apply_update(TopologyUpdate::Range(push_v2)),
764            RefreshOutcome::Ignored
765        );
766        assert_eq!(client.resolve(&orders, b"k").unwrap(), &ident("CN=node-c"));
767    }
768}