Skip to main content

reddb_server/cluster/
routing.rs

1//! Any-node routing and stale-ownership responses (issue #993, PRD #987, ADR 0037).
2//!
3//! ADR 0037 makes any-node routing **mandatory**: a client may send a request to
4//! *any* data member, and that member must do something correct even when it is
5//! not the owner of the range the request targets. PRD #987 spells out the two
6//! correct things a non-owner may do:
7//!
8//! * **Forward** a *simple, safe* operation internally to the range owner, so the
9//!   client never has to learn the topology to make progress; or
10//! * **Redirect** — return enough routing information (current owner, ownership
11//!   epoch, catalog version) for the client/router to refresh its topology and
12//!   retry against the owner itself.
13//!
14//! The split between the two is deliberate and is the heart of this module.
15//! Hidden internal forwarding is only ever safe for operations whose semantics do
16//! not depend on *where* they run: a single-key point read or write. Anything
17//! whose correctness is bound to a session or a long-lived stream —
18//! transactions, streaming/cursor operations, oversized payloads, or operations
19//! the caller has explicitly marked unsafe — must **not** be silently relayed.
20//! For those the only safe answer is an honest redirect so the client opens its
21//! transaction / stream / large transfer directly against the owner. This mirrors
22//! ADR 0037's "routing must consult ownership metadata with an epoch/version and
23//! handle stale routing responses" — and crucially it never weakens the
24//! fencing-below-routing guarantee, because a forwarded write still lands on the
25//! owner's [`admit_public_write`](ShardOwnershipCatalog::admit_public_write) gate
26//! (issue #990) at the owner's *current* epoch.
27//!
28//! Like the rest of the cluster module this is a pure decision layer with no I/O:
29//! [`plan_route`](ShardOwnershipCatalog::plan_route) maps a
30//! ([`RoutedRequest`], local [`NodeIdentity`], [`RoutingPolicy`]) triple to a
31//! [`RouteDecision`], so the any-node routing contract is exercised
32//! deterministically. The transport that actually forwards bytes or writes a
33//! redirect onto the wire is a separate concern layered on top of this.
34
35use super::identity::NodeIdentity;
36use super::ownership::{
37    CatalogVersion, CollectionId, OwnershipEpoch, RangeId, RangeOwnership, RangeRole,
38    ShardOwnershipCatalog,
39};
40
41/// Default ceiling on the payload a non-owner will relay internally.
42///
43/// Forwarding copies the whole payload across an extra internal hop, so a large
44/// transfer is cheaper and clearer to redirect: the client sends it once,
45/// directly to the owner. The MVP budget is 1 MiB; operators can widen or narrow
46/// it per [`RoutingPolicy`].
47pub const DEFAULT_MAX_FORWARD_PAYLOAD: usize = 1024 * 1024;
48
49/// What a request asks the cluster to do, reduced to just what routing needs to
50/// decide forward-vs-redirect.
51///
52/// Only [`SafePointOp`](Self::SafePointOp) is eligible for hidden internal
53/// forwarding. The other classes are exactly PRD #987's "must not be silently
54/// forwarded" set: their correctness is tied to running on the owner directly, so
55/// a non-owner must redirect rather than relay them.
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum RequestOperation {
58    /// A single-key point read or write — the one class safe to forward to the
59    /// owner, because its result does not depend on which node relays it.
60    SafePointOp,
61    /// A multi-statement transaction. Atomicity and session state must be
62    /// established on the owner directly; never hidden-forwarded.
63    Transaction,
64    /// A streaming / cursor operation (scan, subscribe, change feed). The stream
65    /// must originate on the owner; never hidden-forwarded.
66    Streaming,
67    /// An operation the caller has explicitly flagged as unsafe to forward.
68    ExplicitlyUnsafe,
69}
70
71impl RequestOperation {
72    /// `Ok` if this class is *in principle* forwardable (only
73    /// [`SafePointOp`](Self::SafePointOp)); otherwise the [`RedirectReason`] that
74    /// explains why it must be redirected instead.
75    fn forwardable(self) -> Result<(), RedirectReason> {
76        match self {
77            RequestOperation::SafePointOp => Ok(()),
78            RequestOperation::Transaction => Err(RedirectReason::Transaction),
79            RequestOperation::Streaming => Err(RedirectReason::Streaming),
80            RequestOperation::ExplicitlyUnsafe => Err(RedirectReason::ExplicitlyUnsafe),
81        }
82    }
83}
84
85/// A request as it arrives at some data member, abstracted to what routing reads:
86/// which collection/key it targets, what kind of operation it is, and how large
87/// its payload is.
88#[derive(Debug, Clone, PartialEq, Eq)]
89pub struct RoutedRequest {
90    collection: CollectionId,
91    key: Vec<u8>,
92    operation: RequestOperation,
93    payload_len: usize,
94}
95
96impl RoutedRequest {
97    /// A request with no meaningful payload (e.g. a point read or delete).
98    pub fn new(
99        collection: CollectionId,
100        key: impl Into<Vec<u8>>,
101        operation: RequestOperation,
102    ) -> Self {
103        Self {
104            collection,
105            key: key.into(),
106            operation,
107            payload_len: 0,
108        }
109    }
110
111    /// Declare the request's payload size, so the forward-size budget can apply.
112    pub fn with_payload_len(mut self, payload_len: usize) -> Self {
113        self.payload_len = payload_len;
114        self
115    }
116
117    pub fn collection(&self) -> &CollectionId {
118        &self.collection
119    }
120
121    pub fn key(&self) -> &[u8] {
122        &self.key
123    }
124
125    pub fn operation(&self) -> RequestOperation {
126        self.operation
127    }
128
129    pub fn payload_len(&self) -> usize {
130        self.payload_len
131    }
132}
133
134/// How a data member handles requests it does not own.
135///
136/// "Any-node routing is mandatory" (ADR 0037), but *forwarding* is a policy
137/// choice: a deployment (or a single node) may prefer to push routing work onto
138/// topology-aware clients and redirect everything instead of relaying. When
139/// forwarding is disabled, even a safe point op gets a redirect — this is PRD
140/// #987's "when forwarding is not selected" path.
141#[derive(Debug, Clone, Copy, PartialEq, Eq)]
142pub struct RoutingPolicy {
143    forwarding_enabled: bool,
144    max_forward_payload: usize,
145}
146
147impl RoutingPolicy {
148    /// Forward safe point ops to the owner; redirect everything else. Uses the
149    /// [`DEFAULT_MAX_FORWARD_PAYLOAD`] budget.
150    pub fn forwarding() -> Self {
151        Self {
152            forwarding_enabled: true,
153            max_forward_payload: DEFAULT_MAX_FORWARD_PAYLOAD,
154        }
155    }
156
157    /// Never forward — always redirect a non-owner request with a routing hint.
158    /// The client/router refreshes topology and retries against the owner.
159    pub fn redirect_only() -> Self {
160        Self {
161            forwarding_enabled: false,
162            max_forward_payload: 0,
163        }
164    }
165
166    /// Override the maximum payload eligible for internal forwarding.
167    pub fn with_max_forward_payload(mut self, max_forward_payload: usize) -> Self {
168        self.max_forward_payload = max_forward_payload;
169        self
170    }
171
172    pub fn forwarding_enabled(&self) -> bool {
173        self.forwarding_enabled
174    }
175
176    pub fn max_forward_payload(&self) -> usize {
177        self.max_forward_payload
178    }
179}
180
181impl Default for RoutingPolicy {
182    fn default() -> Self {
183        Self::forwarding()
184    }
185}
186
187/// The routing information a non-owner hands back so the caller can reach the
188/// owner — the payload of both a forward (where to relay) and a redirect (where
189/// to retry).
190///
191/// It carries the owner's [`NodeIdentity`] plus the [`OwnershipEpoch`] and
192/// [`CatalogVersion`] the decision was made at. The epoch/version let a
193/// topology-aware client tell *stale* hints from fresh ones and avoid retry loops
194/// against ownership that has since moved again (ADR 0037: routing "must consult
195/// ownership metadata with an epoch/version").
196#[derive(Debug, Clone, PartialEq, Eq)]
197pub struct RoutingHint {
198    collection: CollectionId,
199    range_id: RangeId,
200    owner: NodeIdentity,
201    epoch: OwnershipEpoch,
202    version: CatalogVersion,
203}
204
205impl RoutingHint {
206    fn from_range(collection: &CollectionId, range: &RangeOwnership) -> Self {
207        Self {
208            collection: collection.clone(),
209            range_id: range.range_id(),
210            owner: range.owner().clone(),
211            epoch: range.epoch(),
212            version: range.version(),
213        }
214    }
215
216    pub fn collection(&self) -> &CollectionId {
217        &self.collection
218    }
219
220    pub fn range_id(&self) -> RangeId {
221        self.range_id
222    }
223
224    pub fn owner(&self) -> &NodeIdentity {
225        &self.owner
226    }
227
228    pub fn epoch(&self) -> OwnershipEpoch {
229        self.epoch
230    }
231
232    pub fn version(&self) -> CatalogVersion {
233        self.version
234    }
235}
236
237impl std::fmt::Display for RoutingHint {
238    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
239        write!(
240            f,
241            "{}/{} owned by {} at epoch {} (catalog version {})",
242            self.collection, self.range_id, self.owner, self.epoch, self.version
243        )
244    }
245}
246
247/// Why a non-owner redirected a request instead of forwarding it.
248///
249/// Every redirect happens because the local node is not the owner; the reason
250/// explains why the safe-forward path was *not* taken for this particular
251/// request, so an operator (or a client deciding how to retry) can tell a routine
252/// "open your transaction on the owner" from a "this node won't relay" policy.
253#[derive(Debug, Clone, Copy, PartialEq, Eq)]
254pub enum RedirectReason {
255    /// This node's [`RoutingPolicy`] does not forward; the client must route to
256    /// the owner itself. (PRD #987 "when forwarding is not selected".)
257    ForwardingDisabled,
258    /// A multi-statement transaction — must be opened on the owner directly.
259    Transaction,
260    /// A streaming / cursor operation — must originate on the owner.
261    Streaming,
262    /// The payload exceeds the forward budget; send it once, directly to the
263    /// owner, rather than copying it across an extra internal hop.
264    LargePayload { len: usize, limit: usize },
265    /// The caller explicitly marked the operation unsafe to forward.
266    ExplicitlyUnsafe,
267}
268
269impl std::fmt::Display for RedirectReason {
270    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
271        match self {
272            Self::ForwardingDisabled => write!(f, "forwarding not selected on this node"),
273            Self::Transaction => write!(f, "transactions must be opened on the owner"),
274            Self::Streaming => write!(f, "streaming operations must originate on the owner"),
275            Self::LargePayload { len, limit } => write!(
276                f,
277                "payload {len} bytes exceeds the {limit}-byte forward budget; send directly to the owner"
278            ),
279            Self::ExplicitlyUnsafe => {
280                write!(f, "operation explicitly marked unsafe to forward")
281            }
282        }
283    }
284}
285
286/// What a data member decides to do with a request under any-node routing.
287#[derive(Debug, Clone, PartialEq, Eq)]
288pub enum RouteDecision {
289    /// The local node owns the range — execute locally. Carries the range and the
290    /// current ownership epoch the write should be stamped/fenced with (the same
291    /// epoch [`admit_public_write`](ShardOwnershipCatalog::admit_public_write)
292    /// will check).
293    Local {
294        range_id: RangeId,
295        epoch: OwnershipEpoch,
296    },
297    /// A safe point op the local node may forward internally to the owner named
298    /// in the hint. The forwarded write still passes the owner's public-write
299    /// gate at the owner's current epoch, so forwarding never bypasses fencing.
300    Forward { hint: RoutingHint },
301    /// The request is not eligible for hidden forwarding (unsafe class, oversized
302    /// payload, or forwarding disabled). Return the hint so the client refreshes
303    /// topology and retries against the owner. This is the stale/misrouted
304    /// response of acceptance criterion #2.
305    Redirect {
306        hint: RoutingHint,
307        reason: RedirectReason,
308    },
309    /// No range of the collection covers the key. The catalog the request
310    /// resolved against is empty or stale for this collection; the client must
311    /// refresh its catalog and retry — there is no owner to name yet.
312    Unroutable { collection: CollectionId },
313}
314
315impl RouteDecision {
316    /// The routing hint, if this decision carries one (forward or redirect).
317    pub fn hint(&self) -> Option<&RoutingHint> {
318        match self {
319            RouteDecision::Forward { hint } | RouteDecision::Redirect { hint, .. } => Some(hint),
320            RouteDecision::Local { .. } | RouteDecision::Unroutable { .. } => None,
321        }
322    }
323
324    /// Whether the local node should execute the request itself.
325    pub fn is_local(&self) -> bool {
326        matches!(self, RouteDecision::Local { .. })
327    }
328}
329
330impl ShardOwnershipCatalog {
331    /// Plan how `local` should handle `request` under `policy` — the any-node
332    /// routing decision (issue #993).
333    ///
334    /// Resolves the target range from the catalog, then:
335    ///
336    /// * owner of the range → [`Local`](RouteDecision::Local);
337    /// * non-owner, forwarding enabled, safe point op within budget →
338    ///   [`Forward`](RouteDecision::Forward) to the owner;
339    /// * non-owner but the op is unsafe to forward / oversized / forwarding
340    ///   disabled → [`Redirect`](RouteDecision::Redirect) with the owner+epoch
341    ///   hint;
342    /// * no range covers the key → [`Unroutable`](RouteDecision::Unroutable).
343    ///
344    /// The decision is pure: it reads the catalog and returns intent. Fencing is
345    /// still enforced below routing — a forwarded or locally-executed write lands
346    /// on [`admit_public_write`](Self::admit_public_write) at the owner's current
347    /// epoch, so a stale routing decision cannot smuggle a write past ownership.
348    pub fn plan_route(
349        &self,
350        local: &NodeIdentity,
351        request: &RoutedRequest,
352        policy: &RoutingPolicy,
353    ) -> RouteDecision {
354        let range = match self.route_shard_key(request.collection(), request.key()) {
355            Some(range) => range,
356            None => {
357                return RouteDecision::Unroutable {
358                    collection: request.collection().clone(),
359                }
360            }
361        };
362
363        if range.role_of(local) == RangeRole::Owner {
364            return RouteDecision::Local {
365                range_id: range.range_id(),
366                epoch: range.epoch(),
367            };
368        }
369
370        let hint = RoutingHint::from_range(request.collection(), range);
371
372        // Non-owner. Forward only if policy allows AND the op is a safe point op
373        // within the forward-size budget; otherwise hand back a routing hint.
374        if !policy.forwarding_enabled() {
375            return RouteDecision::Redirect {
376                hint,
377                reason: RedirectReason::ForwardingDisabled,
378            };
379        }
380        if let Err(reason) = request.operation().forwardable() {
381            return RouteDecision::Redirect { hint, reason };
382        }
383        if request.payload_len() > policy.max_forward_payload() {
384            return RouteDecision::Redirect {
385                hint,
386                reason: RedirectReason::LargePayload {
387                    len: request.payload_len(),
388                    limit: policy.max_forward_payload(),
389                },
390            };
391        }
392        RouteDecision::Forward { hint }
393    }
394}
395
396#[cfg(test)]
397mod tests {
398    use super::*;
399    use crate::cluster::ownership::{PlacementMetadata, RangeBound, RangeBounds, ShardKeyMode};
400
401    fn collection(name: &str) -> CollectionId {
402        CollectionId::new(name).unwrap()
403    }
404
405    fn ident(cn: &str) -> NodeIdentity {
406        NodeIdentity::from_certificate_subject(cn).unwrap()
407    }
408
409    /// A full-keyspace range of `coll` owned by `owner` with `replicas`.
410    fn range_with(coll: &CollectionId, id: u64, owner: &str, replicas: &[&str]) -> RangeOwnership {
411        RangeOwnership::establish(
412            coll.clone(),
413            RangeId::new(id),
414            ShardKeyMode::Hash,
415            RangeBounds::full(),
416            ident(owner),
417            replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
418            PlacementMetadata::with_replication_factor(3),
419        )
420    }
421
422    fn catalog_with(range: RangeOwnership) -> ShardOwnershipCatalog {
423        let mut catalog = ShardOwnershipCatalog::new();
424        catalog.apply_update(range).unwrap();
425        catalog
426    }
427
428    // AC #1 + direct-owner request: the owner resolves the range and executes
429    // locally.
430    #[test]
431    fn owner_executes_locally() {
432        let orders = collection("orders");
433        let catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
434        let request =
435            RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::SafePointOp);
436
437        let decision =
438            catalog.plan_route(&ident("CN=node-a"), &request, &RoutingPolicy::forwarding());
439        assert_eq!(
440            decision,
441            RouteDecision::Local {
442                range_id: RangeId::new(1),
443                epoch: OwnershipEpoch::initial(),
444            }
445        );
446        assert!(decision.is_local());
447        assert!(decision.hint().is_none());
448    }
449
450    // AC #1: any node can resolve target range ownership from the catalog, even
451    // one that holds no copy of the range.
452    #[test]
453    fn any_node_resolves_owner_from_catalog() {
454        let orders = collection("orders");
455        let catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
456        let request =
457            RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::SafePointOp);
458
459        // node-c holds no copy at all, yet can still name the owner.
460        let decision =
461            catalog.plan_route(&ident("CN=node-c"), &request, &RoutingPolicy::forwarding());
462        let hint = decision.hint().expect("non-owner carries a hint");
463        assert_eq!(hint.owner(), &ident("CN=node-a"));
464        assert_eq!(hint.range_id(), RangeId::new(1));
465        assert_eq!(hint.epoch(), OwnershipEpoch::initial());
466    }
467
468    // AC #3: a safe single-key op from a non-owner is forwarded to the owner.
469    #[test]
470    fn safe_point_op_is_forwarded_from_non_owner() {
471        let orders = collection("orders");
472        let catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
473        let request =
474            RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::SafePointOp);
475
476        // node-b is a replica → forward to the owner node-a.
477        let decision =
478            catalog.plan_route(&ident("CN=node-b"), &request, &RoutingPolicy::forwarding());
479        match decision {
480            RouteDecision::Forward { hint } => {
481                assert_eq!(hint.owner(), &ident("CN=node-a"));
482                assert_eq!(hint.epoch(), OwnershipEpoch::initial());
483            }
484            other => panic!("expected Forward, got {other:?}"),
485        }
486    }
487
488    // AC #3: a forwarded write does not bypass fencing — it still passes the
489    // owner's public-write gate (#990) at the hint's epoch.
490    #[test]
491    fn forwarded_write_still_passes_owner_public_gate() {
492        let orders = collection("orders");
493        let catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
494        let request =
495            RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::SafePointOp);
496
497        let hint =
498            match catalog.plan_route(&ident("CN=node-b"), &request, &RoutingPolicy::forwarding()) {
499                RouteDecision::Forward { hint } => hint,
500                other => panic!("expected Forward, got {other:?}"),
501            };
502        // The owner admits the relayed write at the epoch the forwarder carried.
503        let admitted = catalog
504            .admit_public_write(&ident("CN=node-a"), &orders, b"k", hint.epoch())
505            .expect("owner admits the forwarded write at the current epoch");
506        assert_eq!(admitted.owner(), &ident("CN=node-a"));
507    }
508
509    // AC #4: transactions are redirected, never hidden-forwarded.
510    #[test]
511    fn transaction_from_non_owner_is_redirected() {
512        let orders = collection("orders");
513        let catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
514        let request =
515            RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::Transaction);
516
517        let decision =
518            catalog.plan_route(&ident("CN=node-b"), &request, &RoutingPolicy::forwarding());
519        match decision {
520            RouteDecision::Redirect { hint, reason } => {
521                assert_eq!(reason, RedirectReason::Transaction);
522                assert_eq!(hint.owner(), &ident("CN=node-a"));
523            }
524            other => panic!("expected Redirect(Transaction), got {other:?}"),
525        }
526    }
527
528    // AC #4: streaming operations are redirected.
529    #[test]
530    fn streaming_from_non_owner_is_redirected() {
531        let orders = collection("orders");
532        let catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
533        let request =
534            RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::Streaming);
535
536        match catalog.plan_route(&ident("CN=node-b"), &request, &RoutingPolicy::forwarding()) {
537            RouteDecision::Redirect { reason, .. } => assert_eq!(reason, RedirectReason::Streaming),
538            other => panic!("expected Redirect(Streaming), got {other:?}"),
539        }
540    }
541
542    // AC #4: explicitly unsafe operations are redirected.
543    #[test]
544    fn explicitly_unsafe_op_is_redirected() {
545        let orders = collection("orders");
546        let catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
547        let request = RoutedRequest::new(
548            orders.clone(),
549            b"k".to_vec(),
550            RequestOperation::ExplicitlyUnsafe,
551        );
552
553        match catalog.plan_route(&ident("CN=node-b"), &request, &RoutingPolicy::forwarding()) {
554            RouteDecision::Redirect { reason, .. } => {
555                assert_eq!(reason, RedirectReason::ExplicitlyUnsafe)
556            }
557            other => panic!("expected Redirect(ExplicitlyUnsafe), got {other:?}"),
558        }
559    }
560
561    // AC #4: an over-budget payload is redirected even though its op class is
562    // safe — send it once, directly to the owner.
563    #[test]
564    fn large_payload_is_redirected_not_forwarded() {
565        let orders = collection("orders");
566        let catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
567        let policy = RoutingPolicy::forwarding().with_max_forward_payload(64);
568        let request =
569            RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::SafePointOp)
570                .with_payload_len(65);
571
572        match catalog.plan_route(&ident("CN=node-b"), &request, &policy) {
573            RouteDecision::Redirect { reason, .. } => {
574                assert_eq!(reason, RedirectReason::LargePayload { len: 65, limit: 64 })
575            }
576            other => panic!("expected Redirect(LargePayload), got {other:?}"),
577        }
578        // A payload exactly at the budget is still forwardable.
579        let at_budget =
580            RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::SafePointOp)
581                .with_payload_len(64);
582        assert!(matches!(
583            catalog.plan_route(&ident("CN=node-b"), &at_budget, &policy),
584            RouteDecision::Forward { .. }
585        ));
586    }
587
588    // AC #2 "when forwarding is not selected": redirect-only policy redirects even
589    // a safe point op.
590    #[test]
591    fn redirect_only_policy_redirects_safe_op() {
592        let orders = collection("orders");
593        let catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
594        let request =
595            RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::SafePointOp);
596
597        match catalog.plan_route(
598            &ident("CN=node-b"),
599            &request,
600            &RoutingPolicy::redirect_only(),
601        ) {
602            RouteDecision::Redirect { hint, reason } => {
603                assert_eq!(reason, RedirectReason::ForwardingDisabled);
604                assert_eq!(hint.owner(), &ident("CN=node-a"));
605                assert_eq!(hint.epoch(), OwnershipEpoch::initial());
606            }
607            other => panic!("expected Redirect(ForwardingDisabled), got {other:?}"),
608        }
609    }
610
611    // A key with no covering range is unroutable — refresh the catalog.
612    #[test]
613    fn key_with_no_range_is_unroutable() {
614        let catalog = ShardOwnershipCatalog::new();
615        let orders = collection("orders");
616        let request =
617            RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::SafePointOp);
618
619        let decision =
620            catalog.plan_route(&ident("CN=node-a"), &request, &RoutingPolicy::forwarding());
621        assert_eq!(decision, RouteDecision::Unroutable { collection: orders });
622        assert!(decision.hint().is_none());
623    }
624
625    // AC #5: stale ownership retry behavior. A client routes against an old
626    // catalog snapshot, sends to the former owner, gets a redirect carrying the
627    // new owner+epoch, refreshes, and retries successfully against the new owner.
628    #[test]
629    fn stale_ownership_redirects_then_retry_succeeds() {
630        let orders = collection("orders");
631
632        // v1: node-a owns the range; node-b is a replica.
633        let mut catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
634
635        // Ownership transfers a → b (epoch + version bump). node-a becomes a
636        // replica of the range it used to own.
637        let v1 = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
638        let v2 = v1.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]);
639        catalog.apply_update(v2).unwrap();
640
641        // A client with a stale snapshot still believes node-a is the owner and
642        // sends a transaction there. node-a is now a replica → it redirects with
643        // the *current* owner and the advanced epoch.
644        let request =
645            RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::Transaction);
646        let redirect =
647            catalog.plan_route(&ident("CN=node-a"), &request, &RoutingPolicy::forwarding());
648        let hint = match redirect {
649            RouteDecision::Redirect { hint, reason } => {
650                assert_eq!(reason, RedirectReason::Transaction);
651                hint
652            }
653            other => panic!("expected Redirect, got {other:?}"),
654        };
655        assert_eq!(hint.owner(), &ident("CN=node-b"));
656        assert_eq!(hint.epoch().value(), 2);
657        assert!(hint.epoch() > OwnershipEpoch::initial());
658
659        // The client refreshes from the hint and retries against the new owner.
660        let retry = catalog.plan_route(hint.owner(), &request, &RoutingPolicy::forwarding());
661        assert_eq!(
662            retry,
663            RouteDecision::Local {
664                range_id: RangeId::new(1),
665                epoch: hint.epoch(),
666            }
667        );
668    }
669
670    // A stale safe-op forward also tracks ownership: after a transfer, a safe op
671    // arriving at the old owner forwards to the *new* owner, not the old one.
672    #[test]
673    fn safe_op_forward_targets_current_owner_after_transfer() {
674        let orders = collection("orders");
675        let mut catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
676        let v1 = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
677        catalog
678            .apply_update(v1.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
679            .unwrap();
680
681        let request =
682            RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::SafePointOp);
683        match catalog.plan_route(&ident("CN=node-a"), &request, &RoutingPolicy::forwarding()) {
684            RouteDecision::Forward { hint } => {
685                assert_eq!(hint.owner(), &ident("CN=node-b"));
686                assert_eq!(hint.epoch().value(), 2);
687            }
688            other => panic!("expected Forward to new owner, got {other:?}"),
689        }
690    }
691
692    #[test]
693    fn routing_hint_display_names_owner_and_epoch() {
694        let orders = collection("orders");
695        let catalog = catalog_with(range_with(&orders, 4, "CN=node-a", &["CN=node-b"]));
696        let request =
697            RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::Transaction);
698        let hint = catalog
699            .plan_route(&ident("CN=node-b"), &request, &RoutingPolicy::forwarding())
700            .hint()
701            .cloned()
702            .expect("redirect carries a hint");
703        let rendered = hint.to_string();
704        assert!(rendered.contains("CN=node-a"));
705        assert!(rendered.contains("epoch 1"));
706    }
707}