Skip to main content

reddb_server/cluster/
ownership.rs

1//! The global shard ownership catalog (issue #989, PRD #987, ADR 0037).
2//!
3//! The shard ownership catalog is the **source of truth for range routing and
4//! failover** in a multi-writer cluster. Per the glossary it is *"explicit,
5//! versioned RedDB catalog state that records shard/range bounds, current writer
6//! owner, replicas, and ownership epoch/version"* and — crucially — it is
7//! *"special global control-plane state replicated to all data members rather
8//! than sharded like ordinary user collections"*.
9//!
10//! That last point shapes the whole module. The catalog is the thing that tells
11//! you *where* a collection's data lives, so it cannot itself be located by the
12//! same user-data sharding it describes — that would be circular. Instead every
13//! data member holds a full [`ShardOwnershipCatalog`] replica and routes against
14//! it locally ([`ShardOwnershipCatalog::route`]). Replication is modelled as
15//! shipping versioned [`RangeOwnership`] entries that each member applies through
16//! the same [`apply_update`](ShardOwnershipCatalog::apply_update) path that the
17//! Supervisor leader writes through — and that path rejects stale versions, so a
18//! late or out-of-order replica update can never overwrite newer ownership.
19//!
20//! ## What the catalog records
21//!
22//! One [`RangeOwnership`] entry per owned shard/range carries everything routing
23//! and fencing need:
24//!
25//! * [`CollectionId`] + [`RangeId`] — *which* range of *which* collection.
26//! * [`ShardKeyMode`] — [`Hash`](ShardKeyMode::Hash) (the default, for uniform
27//!   distribution) or [`Ordered`](ShardKeyMode::Ordered) (declared when range
28//!   locality and ordered scans matter more than hotspot resistance).
29//! * [`RangeBounds`] — the half-open `[lower, upper)` partition this entry owns.
30//! * `owner` ([`NodeIdentity`]) + `replicas` — the current single writer for the
31//!   range and its read/catch-up copies.
32//! * [`OwnershipEpoch`] + [`CatalogVersion`] — the fencing epoch (bumped on owner
33//!   change so a stale old owner is fenced) and the monotonic write version
34//!   (bumped on *every* accepted update so stale writes are rejected).
35//! * [`PlacementMetadata`] — replication factor and free-form placement
36//!   attributes (region/zone/weight) the rebalancer reads.
37//!
38//! Ownership changes are produced as *transitions* — new entries built with
39//! [`RangeOwnership::transfer_to`] / [`update_replicas`](RangeOwnership::update_replicas)
40//! / [`update_placement`](RangeOwnership::update_placement) — never arbitrary row
41//! edits, matching ADR 0037's "transitions, not arbitrary row edits".
42//!
43//! Everything here is a pure data model with no I/O, so the routing, versioning,
44//! and replication story is exercised deterministically.
45
46use std::collections::BTreeMap;
47
48use super::identity::NodeIdentity;
49
50/// A user collection's stable identity, as recorded in the catalog.
51///
52/// The catalog is keyed by collection (and range within it); this is the
53/// collection's own name, not a shard-routed handle. Resolving a
54/// [`CollectionId`] to its ranges needs only the catalog itself — no user-data
55/// sharding — which is what lets the catalog be the thing that *bootstraps*
56/// routing.
57#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
58pub struct CollectionId(String);
59
60#[derive(Debug, Clone, PartialEq, Eq)]
61pub struct CollectionIdError;
62
63impl std::fmt::Display for CollectionIdError {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        write!(f, "collection id is empty")
66    }
67}
68
69impl std::error::Error for CollectionIdError {}
70
71impl CollectionId {
72    /// Build a collection id from a non-empty name.
73    pub fn new(value: impl AsRef<str>) -> Result<Self, CollectionIdError> {
74        let value = value.as_ref().trim();
75        if value.is_empty() {
76            return Err(CollectionIdError);
77        }
78        Ok(Self(value.to_string()))
79    }
80
81    pub fn as_str(&self) -> &str {
82        &self.0
83    }
84}
85
86impl std::fmt::Display for CollectionId {
87    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        f.write_str(&self.0)
89    }
90}
91
92/// Stable identifier for one shard/range within a collection.
93///
94/// Ranges are owned at sub-collection granularity (ADR 0037), so a collection
95/// can have many of these — each is one independently-owned, independently-routed
96/// partition. The id is stable across ownership transitions: moving a range to a
97/// new owner keeps its [`RangeId`] and bumps its epoch/version, it does not mint
98/// a new range.
99#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
100pub struct RangeId(u64);
101
102impl RangeId {
103    pub fn new(value: u64) -> Self {
104        Self(value)
105    }
106
107    pub fn value(self) -> u64 {
108        self.0
109    }
110}
111
112impl std::fmt::Display for RangeId {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        write!(f, "{}", self.0)
115    }
116}
117
118/// Collection-level partitioning mode for shard/range ownership.
119///
120/// Per the glossary, *"Hash mode is the default for uniform distribution;
121/// ordered mode is declared when range locality and ordered scans matter more
122/// than automatic hotspot resistance."* The mode is fixed per collection: every
123/// range of a collection shares its mode, and an entry whose mode disagrees with
124/// its collection's declared mode is rejected
125/// ([`CatalogError::ShardKeyModeMismatch`]).
126#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
127pub enum ShardKeyMode {
128    /// Uniform hash distribution — the default. Bounds are over hash tokens.
129    #[default]
130    Hash,
131    /// Ordered key ranges, declared for range locality / ordered scans. Bounds
132    /// are over the ordered shard key itself.
133    Ordered,
134}
135
136/// One edge of a [`RangeBounds`].
137///
138/// Bounds are byte strings so the same type serves both shard key modes: a
139/// [`Hash`](ShardKeyMode::Hash) range bounds hash-token bytes, an
140/// [`Ordered`](ShardKeyMode::Ordered) range bounds the ordered key bytes
141/// directly. [`Min`](RangeBound::Min)/[`Max`](RangeBound::Max) are the open ends
142/// of the keyspace.
143#[derive(Debug, Clone, PartialEq, Eq)]
144pub enum RangeBound {
145    /// The open low end of the keyspace (everything is `>= Min`).
146    Min,
147    /// A concrete boundary key.
148    Key(Vec<u8>),
149    /// The open high end of the keyspace (everything is `< Max`).
150    Max,
151}
152
153impl RangeBound {
154    /// A boundary at the given key bytes.
155    pub fn key(bytes: impl Into<Vec<u8>>) -> Self {
156        RangeBound::Key(bytes.into())
157    }
158
159    /// Total order over keyspace positions: `Min < every Key < Max`, with keys
160    /// compared lexicographically. This is what makes both `contains` and
161    /// `overlaps` plain comparisons.
162    fn position(&self) -> Position<'_> {
163        match self {
164            RangeBound::Min => Position::Min,
165            RangeBound::Key(k) => Position::Key(k),
166            RangeBound::Max => Position::Max,
167        }
168    }
169}
170
171#[derive(PartialEq, Eq, PartialOrd, Ord)]
172enum Position<'a> {
173    Min,
174    Key(&'a [u8]),
175    Max,
176}
177
178/// The half-open `[lower, upper)` partition a range owns.
179///
180/// Half-open bounds tile the keyspace without gaps or double-cover: adjacent
181/// ranges share a boundary key that belongs to exactly one of them. `lower` must
182/// be strictly below `upper`, so an empty or inverted range cannot be
183/// constructed.
184#[derive(Debug, Clone, PartialEq, Eq)]
185pub struct RangeBounds {
186    lower: RangeBound,
187    upper: RangeBound,
188}
189
190#[derive(Debug, Clone, PartialEq, Eq)]
191pub struct RangeBoundsError;
192
193impl std::fmt::Display for RangeBoundsError {
194    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195        write!(
196            f,
197            "range lower bound must be strictly below the upper bound"
198        )
199    }
200}
201
202impl std::error::Error for RangeBoundsError {}
203
204impl RangeBounds {
205    /// Bounds from an explicit `[lower, upper)` pair. Errors if the range would
206    /// be empty or inverted (`lower >= upper`).
207    pub fn new(lower: RangeBound, upper: RangeBound) -> Result<Self, RangeBoundsError> {
208        if lower.position() >= upper.position() {
209            return Err(RangeBoundsError);
210        }
211        Ok(Self { lower, upper })
212    }
213
214    /// The whole keyspace, `[Min, Max)` — a single range covering a collection.
215    pub fn full() -> Self {
216        Self {
217            lower: RangeBound::Min,
218            upper: RangeBound::Max,
219        }
220    }
221
222    pub fn lower(&self) -> &RangeBound {
223        &self.lower
224    }
225
226    pub fn upper(&self) -> &RangeBound {
227        &self.upper
228    }
229
230    /// Does `key` fall inside this half-open range? Lower bound inclusive, upper
231    /// bound exclusive — the routing predicate.
232    pub fn contains(&self, key: &[u8]) -> bool {
233        let key = Position::Key(key);
234        self.lower.position() <= key && key < self.upper.position()
235    }
236
237    /// Do these two ranges share any key? Used to keep a collection's ranges
238    /// non-overlapping so routing resolves to exactly one owner.
239    pub fn overlaps(&self, other: &RangeBounds) -> bool {
240        self.lower.position() < other.upper.position()
241            && other.lower.position() < self.upper.position()
242    }
243
244    /// Split this `[lower, upper)` range at `at` into a lower child
245    /// `[lower, at)` and an upper child `[at, upper)`. The split point must fall
246    /// **strictly inside** the range (`lower < at < upper`); a point at or
247    /// outside a bound would carve off an empty child and is rejected with
248    /// [`RangeBoundsError`]. The two children tile the original exactly — no gap,
249    /// no overlap — which is what lets a [split-and-move](super::move_range) shrink
250    /// the retained child and create the moved child without making routing
251    /// ambiguous.
252    pub fn split_at(&self, at: &[u8]) -> Result<(RangeBounds, RangeBounds), RangeBoundsError> {
253        let at_pos = Position::Key(at);
254        if at_pos <= self.lower.position() || at_pos >= self.upper.position() {
255            return Err(RangeBoundsError);
256        }
257        let lower = RangeBounds {
258            lower: self.lower.clone(),
259            upper: RangeBound::key(at.to_vec()),
260        };
261        let upper = RangeBounds {
262            lower: RangeBound::key(at.to_vec()),
263            upper: self.upper.clone(),
264        };
265        Ok((lower, upper))
266    }
267}
268
269/// Monotonic write version of a single catalog entry.
270///
271/// Every accepted update to a range bumps its version. An update that does not
272/// strictly advance the version is stale and is rejected — this is the
273/// compare-and-advance rule that makes catalog replication safe regardless of
274/// delivery order.
275#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
276pub struct CatalogVersion(u64);
277
278impl CatalogVersion {
279    /// The version a range is created at.
280    pub fn initial() -> Self {
281        Self(1)
282    }
283
284    pub fn value(self) -> u64 {
285        self.0
286    }
287
288    fn next(self) -> Self {
289        Self(self.0 + 1)
290    }
291}
292
293impl std::fmt::Display for CatalogVersion {
294    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
295        write!(f, "{}", self.0)
296    }
297}
298
299/// Fencing epoch for a range's write authority.
300///
301/// Distinct from [`CatalogVersion`]: the version advances on *any* catalog edit,
302/// but the epoch advances only when **write authority moves** (a new owner). A
303/// WAL/logical record stamped with an epoch older than the catalog's current
304/// epoch is from a fenced old owner and must be rejected (ADR 0037, "fencing is
305/// enforced below routing").
306#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
307pub struct OwnershipEpoch(u64);
308
309impl OwnershipEpoch {
310    /// The epoch a range is created at.
311    pub fn initial() -> Self {
312        Self(1)
313    }
314
315    pub fn value(self) -> u64 {
316        self.0
317    }
318
319    fn next(self) -> Self {
320        Self(self.0 + 1)
321    }
322}
323
324impl std::fmt::Display for OwnershipEpoch {
325    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
326        write!(f, "{}", self.0)
327    }
328}
329
330/// Placement metadata the rebalancer reads when planning transitions.
331///
332/// The MVP carries the range's replication factor plus a free-form attribute map
333/// for region/zone/operator-weight hints. It is descriptive control-plane data,
334/// not an authorization source.
335#[derive(Debug, Clone, Default, PartialEq, Eq)]
336pub struct PlacementMetadata {
337    replication_factor: usize,
338    attributes: BTreeMap<String, String>,
339}
340
341impl PlacementMetadata {
342    /// Placement with a target replication factor and no attributes.
343    pub fn with_replication_factor(replication_factor: usize) -> Self {
344        Self {
345            replication_factor,
346            attributes: BTreeMap::new(),
347        }
348    }
349
350    /// Attach a placement attribute (e.g. `region` → `us-east-1`).
351    pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
352        self.attributes.insert(key.into(), value.into());
353        self
354    }
355
356    pub fn replication_factor(&self) -> usize {
357        self.replication_factor
358    }
359
360    pub fn attribute(&self, key: &str) -> Option<&str> {
361        self.attributes.get(key).map(String::as_str)
362    }
363}
364
365/// One owned shard/range: the catalog's unit of routing and fencing.
366///
367/// An entry is self-describing — it carries its collection, range id, mode,
368/// bounds, owner, replicas, epoch, version, and placement — so a data member
369/// that receives it by replication can route and fence without consulting any
370/// other state. New ownership states are produced as *transitions* off an
371/// existing entry ([`transfer_to`](Self::transfer_to),
372/// [`update_replicas`](Self::update_replicas),
373/// [`update_placement`](Self::update_placement)), each of which advances the
374/// version — and, for an owner change, the fencing epoch.
375#[derive(Debug, Clone, PartialEq, Eq)]
376pub struct RangeOwnership {
377    collection: CollectionId,
378    range_id: RangeId,
379    shard_key_mode: ShardKeyMode,
380    bounds: RangeBounds,
381    owner: NodeIdentity,
382    replicas: Vec<NodeIdentity>,
383    epoch: OwnershipEpoch,
384    version: CatalogVersion,
385    placement: PlacementMetadata,
386}
387
388impl RangeOwnership {
389    /// The initial ownership state for a freshly created range: version and
390    /// epoch both at their [`initial`](CatalogVersion::initial) values.
391    #[allow(clippy::too_many_arguments)]
392    pub fn establish(
393        collection: CollectionId,
394        range_id: RangeId,
395        shard_key_mode: ShardKeyMode,
396        bounds: RangeBounds,
397        owner: NodeIdentity,
398        replicas: impl IntoIterator<Item = NodeIdentity>,
399        placement: PlacementMetadata,
400    ) -> Self {
401        Self {
402            collection,
403            range_id,
404            shard_key_mode,
405            bounds,
406            owner,
407            replicas: replicas.into_iter().collect(),
408            epoch: OwnershipEpoch::initial(),
409            version: CatalogVersion::initial(),
410            placement,
411        }
412    }
413
414    pub fn collection(&self) -> &CollectionId {
415        &self.collection
416    }
417
418    pub fn range_id(&self) -> RangeId {
419        self.range_id
420    }
421
422    pub fn shard_key_mode(&self) -> ShardKeyMode {
423        self.shard_key_mode
424    }
425
426    pub fn bounds(&self) -> &RangeBounds {
427        &self.bounds
428    }
429
430    pub fn owner(&self) -> &NodeIdentity {
431        &self.owner
432    }
433
434    pub fn replicas(&self) -> &[NodeIdentity] {
435        &self.replicas
436    }
437
438    pub fn epoch(&self) -> OwnershipEpoch {
439        self.epoch
440    }
441
442    pub fn version(&self) -> CatalogVersion {
443        self.version
444    }
445
446    pub fn placement(&self) -> &PlacementMetadata {
447        &self.placement
448    }
449
450    /// The catalog key for this entry: `(collection, range_id)`.
451    fn key(&self) -> (CollectionId, RangeId) {
452        (self.collection.clone(), self.range_id)
453    }
454
455    /// A transition that moves write authority to `new_owner` with `new_replicas`.
456    /// Advances **both** the version (it is a catalog write) and the ownership
457    /// epoch (write authority moved, so any old owner is fenced).
458    pub fn transfer_to(
459        &self,
460        new_owner: NodeIdentity,
461        new_replicas: impl IntoIterator<Item = NodeIdentity>,
462    ) -> Self {
463        Self {
464            owner: new_owner,
465            replicas: new_replicas.into_iter().collect(),
466            epoch: self.epoch.next(),
467            version: self.version.next(),
468            ..self.clone()
469        }
470    }
471
472    /// A transition that changes only the replica set. Advances the version but
473    /// **not** the epoch: write authority did not move, so no owner is fenced.
474    pub fn update_replicas(&self, new_replicas: impl IntoIterator<Item = NodeIdentity>) -> Self {
475        Self {
476            replicas: new_replicas.into_iter().collect(),
477            version: self.version.next(),
478            ..self.clone()
479        }
480    }
481
482    /// A transition that changes only placement metadata. Advances the version
483    /// but not the epoch.
484    pub fn update_placement(&self, placement: PlacementMetadata) -> Self {
485        Self {
486            placement,
487            version: self.version.next(),
488            ..self.clone()
489        }
490    }
491
492    /// A transition that **re-bounds** the range without moving write authority —
493    /// the retained-child step of a range split, which narrows this entry to the
494    /// keys its owner keeps while a sibling entry takes the carved-off subrange.
495    /// Advances the version but **not** the epoch: the same owner keeps writing
496    /// the retained keys, so no one is fenced.
497    pub fn with_bounds(&self, bounds: RangeBounds) -> Self {
498        Self {
499            bounds,
500            version: self.version.next(),
501            ..self.clone()
502        }
503    }
504
505    /// This node's [`RangeRole`] for *this* range (issue #990).
506    ///
507    /// A data member is the single writer ([`Owner`](RangeRole::Owner)) of a
508    /// range, holds a read/catch-up copy ([`Replica`](RangeRole::Replica)), or
509    /// holds no copy at all ([`NoCopy`](RangeRole::NoCopy)). The role is
510    /// per-range, not a global node role: the same node can be owner of one
511    /// range, replica of another, and uninvolved in a third — which is why this
512    /// is the input to the ownership-aware public-write gate rather than the
513    /// instance-wide [`WriteGate`](crate::runtime::write_gate::WriteGate).
514    pub fn role_of(&self, node: &NodeIdentity) -> RangeRole {
515        if self.owner == *node {
516            RangeRole::Owner
517        } else if self.replicas.iter().any(|replica| replica == node) {
518            RangeRole::Replica
519        } else {
520            RangeRole::NoCopy
521        }
522    }
523}
524
525/// A data member's role for one specific range (issue #990, PRD #987).
526///
527/// Distinguishes the three positions a node can hold relative to a range, which
528/// the ownership-aware write gate
529/// ([`admit_public_write`](ShardOwnershipCatalog::admit_public_write)) turns
530/// into an allow/reject decision: only the current [`Owner`](Self::Owner) may
531/// take a *public* write for the range; a [`Replica`](Self::Replica) and a
532/// [`NoCopy`](Self::NoCopy) node both reject it and the caller must route to the
533/// owner. (A replica still applies the owner's changes through the privileged
534/// internal apply path — that path is gated by the range-authority fence from
535/// issue #991, not by this public gate.)
536#[derive(Debug, Clone, Copy, PartialEq, Eq)]
537pub enum RangeRole {
538    /// The current single writer for the range — the only role a public write
539    /// may land on.
540    Owner,
541    /// Holds a read/catch-up copy but is not the writer. Public writes are
542    /// rejected and routed to the owner; replicated changes still flow in via
543    /// the privileged internal apply path.
544    Replica,
545    /// Holds no copy of the range at all.
546    NoCopy,
547}
548
549impl RangeRole {
550    /// Whether this role may accept a *public* write for the range. Only the
551    /// owner may; replica and no-copy may not.
552    pub fn may_write_public(self) -> bool {
553        matches!(self, RangeRole::Owner)
554    }
555
556    fn label(self) -> &'static str {
557        match self {
558            RangeRole::Owner => "owner",
559            RangeRole::Replica => "replica",
560            RangeRole::NoCopy => "no-copy",
561        }
562    }
563}
564
565/// Whether an accepted update created a new range or advanced an existing one.
566#[derive(Debug, Clone, Copy, PartialEq, Eq)]
567pub enum UpdateOutcome {
568    /// The range did not exist and was created at this version.
569    Created,
570    /// An existing range advanced to a newer version.
571    Updated,
572}
573
574/// Why a catalog update was rejected.
575#[derive(Debug, Clone, PartialEq, Eq)]
576pub enum CatalogError {
577    /// The update's version did not strictly advance the range's current
578    /// version — a stale or out-of-order write. Carries both versions so the
579    /// caller (or a replica) can see how far behind it was.
580    StaleVersion {
581        collection: CollectionId,
582        range_id: RangeId,
583        current: CatalogVersion,
584        attempted: CatalogVersion,
585    },
586    /// The entry's shard key mode disagrees with the collection's declared mode.
587    /// A collection is hash- *or* ordered-partitioned, never both.
588    ShardKeyModeMismatch {
589        collection: CollectionId,
590        declared: ShardKeyMode,
591        attempted: ShardKeyMode,
592    },
593    /// Creating this range would overlap an existing range of the same
594    /// collection, which would make routing ambiguous.
595    OverlappingRange {
596        collection: CollectionId,
597        existing: RangeId,
598        attempted: RangeId,
599    },
600}
601
602impl std::fmt::Display for CatalogError {
603    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
604        match self {
605            Self::StaleVersion {
606                collection,
607                range_id,
608                current,
609                attempted,
610            } => write!(
611                f,
612                "stale catalog update for {collection}/{range_id}: current version {current}, attempted {attempted}"
613            ),
614            Self::ShardKeyModeMismatch {
615                collection,
616                declared,
617                attempted,
618            } => write!(
619                f,
620                "collection {collection} is declared {declared:?} but range uses {attempted:?}"
621            ),
622            Self::OverlappingRange {
623                collection,
624                existing,
625                attempted,
626            } => write!(
627                f,
628                "range {attempted} overlaps existing range {existing} of collection {collection}"
629            ),
630        }
631    }
632}
633
634impl std::error::Error for CatalogError {}
635
636/// Why an ownership-aware *public* write was rejected (issue #990).
637///
638/// This is a **routing/ownership** error, deliberately distinct from the
639/// instance-wide read-only rejection raised by
640/// [`WriteGate`](crate::runtime::write_gate::WriteGate): a replica/non-holder
641/// rejecting a public write is not "this node is read-only", it is "this node is
642/// not the authority for *this range* — route to the owner". Crucially, none of
643/// these rejections fall back to the privileged internal replica-apply path; a
644/// public write that is not for this node's owned range never reaches storage.
645#[derive(Debug, Clone, PartialEq, Eq)]
646pub enum RangeWriteReject {
647    /// No range of the collection covers the routed key, so the write cannot be
648    /// placed. The caller must (re)resolve routing against a fresher catalog.
649    NoRange { collection: CollectionId },
650    /// This node holds the range but is not its owner (a [`Replica`]), or holds
651    /// no copy at all ([`NoCopy`]). Either way a public write must be routed to
652    /// `owner`, never applied locally.
653    ///
654    /// [`Replica`]: RangeRole::Replica
655    /// [`NoCopy`]: RangeRole::NoCopy
656    NotOwner {
657        collection: CollectionId,
658        range_id: RangeId,
659        role: RangeRole,
660        owner: NodeIdentity,
661    },
662    /// This node *is* the range owner, but the write was authorised under an
663    /// ownership epoch that no longer matches the catalog — a write fenced out
664    /// because ownership has since moved (its epoch advanced). Carries both
665    /// epochs so the caller can see how far the routing decision was behind.
666    StaleEpoch {
667        collection: CollectionId,
668        range_id: RangeId,
669        expected: OwnershipEpoch,
670        current: OwnershipEpoch,
671    },
672}
673
674impl std::fmt::Display for RangeWriteReject {
675    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
676        match self {
677            Self::NoRange { collection } => write!(
678                f,
679                "no range of collection {collection} covers the routed key — re-resolve routing"
680            ),
681            Self::NotOwner {
682                collection,
683                range_id,
684                role,
685                owner,
686            } => write!(
687                f,
688                "this node is {} of {collection}/{range_id}, not its owner — route the write to {owner}",
689                role.label()
690            ),
691            Self::StaleEpoch {
692                collection,
693                range_id,
694                expected,
695                current,
696            } => write!(
697                f,
698                "stale ownership epoch for {collection}/{range_id}: write authorised under epoch {expected}, current is {current}"
699            ),
700        }
701    }
702}
703
704impl std::error::Error for RangeWriteReject {}
705
706/// The global shard ownership catalog held by every data member.
707///
708/// This single type plays both roles in ADR 0037's model: it is the authoritative
709/// state the Cluster Supervisor leader writes through, and it is the replica each
710/// data member holds and routes against. Both write through
711/// [`apply_update`](Self::apply_update), so the stale-version rejection that
712/// makes leader writes versioned is the *same* rule that makes replica
713/// application order-independent. Nothing here needs user-data sharding to find
714/// an entry: ranges are addressed directly by `(collection, range_id)`, and
715/// routing ([`route`](Self::route)) is a local scan of the replica.
716#[derive(Debug, Clone, Default)]
717pub struct ShardOwnershipCatalog {
718    /// Declared shard key mode per collection. A collection is recorded here the
719    /// moment its first range is created (or via [`declare_collection`]).
720    ///
721    /// [`declare_collection`]: Self::declare_collection
722    collections: BTreeMap<CollectionId, ShardKeyMode>,
723    ranges: BTreeMap<(CollectionId, RangeId), RangeOwnership>,
724}
725
726impl ShardOwnershipCatalog {
727    /// An empty catalog — a cluster with no collections placed yet.
728    pub fn new() -> Self {
729        Self::default()
730    }
731
732    /// Declare a collection's shard key mode up front. Hash is the default, so
733    /// this is mainly how an operator opts a collection into
734    /// [`Ordered`](ShardKeyMode::Ordered) mode before any range exists. Declaring
735    /// the same mode twice is idempotent; redeclaring a different mode for a
736    /// collection that already has a mode is a [`ShardKeyModeMismatch`].
737    ///
738    /// [`ShardKeyModeMismatch`]: CatalogError::ShardKeyModeMismatch
739    pub fn declare_collection(
740        &mut self,
741        collection: CollectionId,
742        mode: ShardKeyMode,
743    ) -> Result<(), CatalogError> {
744        match self.collections.get(&collection) {
745            Some(&declared) if declared != mode => Err(CatalogError::ShardKeyModeMismatch {
746                collection,
747                declared,
748                attempted: mode,
749            }),
750            _ => {
751                self.collections.insert(collection, mode);
752                Ok(())
753            }
754        }
755    }
756
757    /// The declared shard key mode of `collection`, if it has any ranges or was
758    /// explicitly declared.
759    pub fn shard_key_mode(&self, collection: &CollectionId) -> Option<ShardKeyMode> {
760        self.collections.get(collection).copied()
761    }
762
763    /// Apply a versioned ownership update — the single write path for both leader
764    /// writes and replica application.
765    ///
766    /// Creation (the range does not yet exist) auto-declares the collection's
767    /// mode from the entry and checks the new range does not overlap a sibling.
768    /// Updating an existing range requires the entry's version to **strictly
769    /// advance** the current version; anything else is a
770    /// [`StaleVersion`](CatalogError::StaleVersion) rejection that leaves the
771    /// catalog untouched. Either way the entry's mode must match the collection's
772    /// declared mode.
773    pub fn apply_update(&mut self, entry: RangeOwnership) -> Result<UpdateOutcome, CatalogError> {
774        // Mode must agree with the collection (auto-declared on first range).
775        match self.collections.get(entry.collection()) {
776            Some(&declared) if declared != entry.shard_key_mode() => {
777                return Err(CatalogError::ShardKeyModeMismatch {
778                    collection: entry.collection().clone(),
779                    declared,
780                    attempted: entry.shard_key_mode(),
781                });
782            }
783            _ => {}
784        }
785
786        let key = entry.key();
787        match self.ranges.get(&key) {
788            Some(current) => {
789                if entry.version() <= current.version() {
790                    return Err(CatalogError::StaleVersion {
791                        collection: entry.collection().clone(),
792                        range_id: entry.range_id(),
793                        current: current.version(),
794                        attempted: entry.version(),
795                    });
796                }
797                self.collections
798                    .insert(entry.collection().clone(), entry.shard_key_mode());
799                self.ranges.insert(key, entry);
800                Ok(UpdateOutcome::Updated)
801            }
802            None => {
803                // Creating a range: it must not overlap any sibling range of the
804                // same collection, or routing would be ambiguous.
805                if let Some(existing) = self
806                    .ranges_for(entry.collection())
807                    .find(|r| r.bounds().overlaps(entry.bounds()))
808                {
809                    return Err(CatalogError::OverlappingRange {
810                        collection: entry.collection().clone(),
811                        existing: existing.range_id(),
812                        attempted: entry.range_id(),
813                    });
814                }
815                self.collections
816                    .insert(entry.collection().clone(), entry.shard_key_mode());
817                self.ranges.insert(key, entry);
818                Ok(UpdateOutcome::Created)
819            }
820        }
821    }
822
823    /// The current ownership of one range, addressed directly by identity — no
824    /// routing required, because the catalog is what routing is built on.
825    pub fn range(&self, collection: &CollectionId, range_id: RangeId) -> Option<&RangeOwnership> {
826        self.ranges.get(&(collection.clone(), range_id))
827    }
828
829    /// Every range of `collection`, in range-id order.
830    pub fn ranges_for<'a>(
831        &'a self,
832        collection: &CollectionId,
833    ) -> impl Iterator<Item = &'a RangeOwnership> {
834        let collection = collection.clone();
835        self.ranges
836            .iter()
837            .filter(move |((c, _), _)| *c == collection)
838            .map(|(_, r)| r)
839    }
840
841    /// Route a key to the range that owns it — the catalog read every routing
842    /// decision makes. Returns the owning [`RangeOwnership`] (whose `owner`,
843    /// `epoch`, and `replicas` the caller uses to send and fence the write), or
844    /// `None` if no range covers the key yet.
845    pub fn route(&self, collection: &CollectionId, key: &[u8]) -> Option<&RangeOwnership> {
846        self.ranges_for(collection)
847            .find(|r| r.bounds().contains(key))
848    }
849
850    /// This node's [`RangeRole`] for a directly-addressed range (issue #990).
851    /// Returns `None` when no such range exists in the catalog — distinct from
852    /// [`NoCopy`](RangeRole::NoCopy), which means the range exists but this node
853    /// holds no copy of it.
854    pub fn role_at(
855        &self,
856        node: &NodeIdentity,
857        collection: &CollectionId,
858        range_id: RangeId,
859    ) -> Option<RangeRole> {
860        self.range(collection, range_id)
861            .map(|range| range.role_of(node))
862    }
863
864    /// Ownership-aware gate for a **public** write (issue #990, PRD #987).
865    ///
866    /// Routes `key` to its range, then admits the write only when `node` is the
867    /// range's current [`Owner`](RangeRole::Owner) **and** `expected_epoch`
868    /// matches the range's current ownership epoch. On success returns the owned
869    /// [`RangeOwnership`] (so the caller can proceed with the write against the
870    /// authoritative epoch); otherwise a [`RangeWriteReject`] explaining why.
871    ///
872    /// This is the public surface's gate — the counterpart of the instance-wide
873    /// [`WriteGate`](crate::runtime::write_gate::WriteGate) for multi-writer,
874    /// per-range ownership. The internal replica-apply path does **not** consult
875    /// it: replicated changes flow into a replica through the privileged apply
876    /// path (fenced by issue #991's range-authority watermark), so a node that
877    /// rejects a *public* write here can still legitimately apply the owner's
878    /// replicated changes for the very same range.
879    pub fn admit_public_write(
880        &self,
881        node: &NodeIdentity,
882        collection: &CollectionId,
883        key: &[u8],
884        expected_epoch: OwnershipEpoch,
885    ) -> Result<&RangeOwnership, RangeWriteReject> {
886        let range = self
887            .route(collection, key)
888            .ok_or_else(|| RangeWriteReject::NoRange {
889                collection: collection.clone(),
890            })?;
891        let role = range.role_of(node);
892        if !role.may_write_public() {
893            return Err(RangeWriteReject::NotOwner {
894                collection: collection.clone(),
895                range_id: range.range_id(),
896                role,
897                owner: range.owner().clone(),
898            });
899        }
900        if expected_epoch != range.epoch() {
901            return Err(RangeWriteReject::StaleEpoch {
902                collection: collection.clone(),
903                range_id: range.range_id(),
904                expected: expected_epoch,
905                current: range.epoch(),
906            });
907        }
908        Ok(range)
909    }
910
911    /// Total number of owned ranges across all collections.
912    pub fn range_count(&self) -> usize {
913        self.ranges.len()
914    }
915
916    /// All ranges, in `(collection, range_id)` order — the full catalog content
917    /// a joining member adopts as its starting replica
918    /// (see [`ControlPlaneSnapshot`](super::join::ControlPlaneSnapshot)).
919    pub fn entries(&self) -> impl Iterator<Item = &RangeOwnership> {
920        self.ranges.values()
921    }
922}
923
924#[cfg(test)]
925mod tests {
926    use super::*;
927
928    fn collection(name: &str) -> CollectionId {
929        CollectionId::new(name).unwrap()
930    }
931
932    fn ident(cn: &str) -> NodeIdentity {
933        NodeIdentity::from_certificate_subject(cn).unwrap()
934    }
935
936    fn bounds(lower: &[u8], upper: &[u8]) -> RangeBounds {
937        RangeBounds::new(RangeBound::key(lower), RangeBound::key(upper)).unwrap()
938    }
939
940    /// A hash range over `[lower, Max)` owned by `owner`.
941    fn hash_range(coll: &CollectionId, id: u64, bnds: RangeBounds, owner: &str) -> RangeOwnership {
942        RangeOwnership::establish(
943            coll.clone(),
944            RangeId::new(id),
945            ShardKeyMode::Hash,
946            bnds,
947            ident(owner),
948            [ident("CN=replica-1")],
949            PlacementMetadata::with_replication_factor(3),
950        )
951    }
952
953    #[test]
954    fn empty_catalog_creation() {
955        let catalog = ShardOwnershipCatalog::new();
956        assert_eq!(catalog.range_count(), 0);
957        assert!(catalog.shard_key_mode(&collection("orders")).is_none());
958    }
959
960    #[test]
961    fn hash_is_the_default_shard_key_mode() {
962        // The first range of a collection auto-declares its mode; a range built
963        // with the default mode lands the collection in Hash mode.
964        assert_eq!(ShardKeyMode::default(), ShardKeyMode::Hash);
965
966        let mut catalog = ShardOwnershipCatalog::new();
967        let orders = collection("orders");
968        catalog
969            .apply_update(hash_range(&orders, 1, RangeBounds::full(), "CN=node-a"))
970            .unwrap();
971        assert_eq!(catalog.shard_key_mode(&orders), Some(ShardKeyMode::Hash));
972    }
973
974    #[test]
975    fn hash_range_entry_routes_to_owner() {
976        let mut catalog = ShardOwnershipCatalog::new();
977        let orders = collection("orders");
978
979        // Two hash token ranges split at 0x80.
980        catalog
981            .apply_update(hash_range(
982                &orders,
983                1,
984                RangeBounds::new(RangeBound::Min, RangeBound::key([0x80])).unwrap(),
985                "CN=node-a",
986            ))
987            .unwrap();
988        catalog
989            .apply_update(hash_range(
990                &orders,
991                2,
992                RangeBounds::new(RangeBound::key([0x80]), RangeBound::Max).unwrap(),
993                "CN=node-b",
994            ))
995            .unwrap();
996
997        // Routing reads expose the owner for a key without any user-data sharding.
998        assert_eq!(
999            catalog.route(&orders, &[0x10]).unwrap().owner(),
1000            &ident("CN=node-a")
1001        );
1002        assert_eq!(
1003            catalog.route(&orders, &[0x80]).unwrap().owner(),
1004            &ident("CN=node-b")
1005        );
1006        assert_eq!(
1007            catalog.route(&orders, &[0xff]).unwrap().owner(),
1008            &ident("CN=node-b")
1009        );
1010        // The routing read also exposes replicas and fencing epoch.
1011        let r = catalog.route(&orders, &[0x10]).unwrap();
1012        assert_eq!(r.replicas(), &[ident("CN=replica-1")]);
1013        assert_eq!(r.epoch(), OwnershipEpoch::initial());
1014    }
1015
1016    #[test]
1017    fn ordered_mode_can_be_declared_and_routed() {
1018        let mut catalog = ShardOwnershipCatalog::new();
1019        let events = collection("events");
1020        catalog
1021            .declare_collection(events.clone(), ShardKeyMode::Ordered)
1022            .unwrap();
1023        assert_eq!(catalog.shard_key_mode(&events), Some(ShardKeyMode::Ordered));
1024
1025        // Ordered ranges bound the ordered key itself: [a, m) and [m, z).
1026        catalog
1027            .apply_update(RangeOwnership::establish(
1028                events.clone(),
1029                RangeId::new(1),
1030                ShardKeyMode::Ordered,
1031                bounds(b"a", b"m"),
1032                ident("CN=node-a"),
1033                [],
1034                PlacementMetadata::with_replication_factor(3),
1035            ))
1036            .unwrap();
1037        catalog
1038            .apply_update(RangeOwnership::establish(
1039                events.clone(),
1040                RangeId::new(2),
1041                ShardKeyMode::Ordered,
1042                bounds(b"m", b"z"),
1043                ident("CN=node-b"),
1044                [],
1045                PlacementMetadata::with_replication_factor(3),
1046            ))
1047            .unwrap();
1048
1049        assert_eq!(
1050            catalog.route(&events, b"alpha").unwrap().owner(),
1051            &ident("CN=node-a")
1052        );
1053        assert_eq!(
1054            catalog.route(&events, b"mike").unwrap().owner(),
1055            &ident("CN=node-b")
1056        );
1057        // A key outside every declared range routes nowhere.
1058        assert!(catalog.route(&events, b"zzz").is_none());
1059    }
1060
1061    #[test]
1062    fn declaring_a_conflicting_mode_is_rejected() {
1063        let mut catalog = ShardOwnershipCatalog::new();
1064        let events = collection("events");
1065        catalog
1066            .declare_collection(events.clone(), ShardKeyMode::Ordered)
1067            .unwrap();
1068        // Redeclaring the same mode is fine.
1069        catalog
1070            .declare_collection(events.clone(), ShardKeyMode::Ordered)
1071            .unwrap();
1072        // A different mode is a mismatch.
1073        let err = catalog
1074            .declare_collection(events.clone(), ShardKeyMode::Hash)
1075            .unwrap_err();
1076        assert_eq!(
1077            err,
1078            CatalogError::ShardKeyModeMismatch {
1079                collection: events.clone(),
1080                declared: ShardKeyMode::Ordered,
1081                attempted: ShardKeyMode::Hash,
1082            }
1083        );
1084        // And a range whose mode disagrees with the declared collection is rejected.
1085        let err = catalog
1086            .apply_update(hash_range(&events, 1, RangeBounds::full(), "CN=node-a"))
1087            .unwrap_err();
1088        assert!(matches!(err, CatalogError::ShardKeyModeMismatch { .. }));
1089    }
1090
1091    #[test]
1092    fn version_bumps_on_owner_transfer_and_epoch_fences() {
1093        let mut catalog = ShardOwnershipCatalog::new();
1094        let orders = collection("orders");
1095        catalog
1096            .apply_update(hash_range(&orders, 1, RangeBounds::full(), "CN=node-a"))
1097            .unwrap();
1098
1099        let current = catalog.range(&orders, RangeId::new(1)).unwrap();
1100        assert_eq!(current.version(), CatalogVersion::initial());
1101        assert_eq!(current.epoch(), OwnershipEpoch::initial());
1102
1103        // Owner transfer advances both version and fencing epoch.
1104        let moved = current.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]);
1105        let outcome = catalog.apply_update(moved).unwrap();
1106        assert_eq!(outcome, UpdateOutcome::Updated);
1107
1108        let after = catalog.range(&orders, RangeId::new(1)).unwrap();
1109        assert_eq!(after.owner(), &ident("CN=node-b"));
1110        assert_eq!(after.version().value(), 2);
1111        assert_eq!(after.epoch().value(), 2); // old owner is now fenced
1112
1113        // A replica-set change advances the version but NOT the epoch.
1114        let replicas_changed = after.update_replicas([ident("CN=node-c")]);
1115        catalog.apply_update(replicas_changed).unwrap();
1116        let after2 = catalog.range(&orders, RangeId::new(1)).unwrap();
1117        assert_eq!(after2.version().value(), 3);
1118        assert_eq!(after2.epoch().value(), 2); // write authority did not move
1119        assert_eq!(after2.replicas(), &[ident("CN=node-c")]);
1120    }
1121
1122    #[test]
1123    fn stale_update_is_rejected_and_leaves_catalog_unchanged() {
1124        let mut catalog = ShardOwnershipCatalog::new();
1125        let orders = collection("orders");
1126        catalog
1127            .apply_update(hash_range(&orders, 1, RangeBounds::full(), "CN=node-a"))
1128            .unwrap();
1129
1130        let v1 = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
1131        // Advance to v2.
1132        catalog
1133            .apply_update(v1.transfer_to(ident("CN=node-b"), []))
1134            .unwrap();
1135        assert_eq!(
1136            catalog.range(&orders, RangeId::new(1)).unwrap().owner(),
1137            &ident("CN=node-b")
1138        );
1139
1140        // Re-applying the original v1 entry (and even a fresh v1-versioned write)
1141        // is stale: version 1 does not advance past the current version 2.
1142        let err = catalog.apply_update(v1.clone()).unwrap_err();
1143        assert_eq!(
1144            err,
1145            CatalogError::StaleVersion {
1146                collection: orders.clone(),
1147                range_id: RangeId::new(1),
1148                current: CatalogVersion::initial().next(),
1149                attempted: CatalogVersion::initial(),
1150            }
1151        );
1152        // The stale write did not roll ownership back.
1153        assert_eq!(
1154            catalog.range(&orders, RangeId::new(1)).unwrap().owner(),
1155            &ident("CN=node-b")
1156        );
1157        assert_eq!(
1158            catalog
1159                .range(&orders, RangeId::new(1))
1160                .unwrap()
1161                .version()
1162                .value(),
1163            2
1164        );
1165    }
1166
1167    #[test]
1168    fn overlapping_range_creation_is_rejected() {
1169        let mut catalog = ShardOwnershipCatalog::new();
1170        let orders = collection("orders");
1171        catalog
1172            .apply_update(hash_range(
1173                &orders,
1174                1,
1175                bounds(&[0x00], &[0x80]),
1176                "CN=node-a",
1177            ))
1178            .unwrap();
1179        // A new range overlapping [0x00, 0x80) is ambiguous for routing.
1180        let err = catalog
1181            .apply_update(hash_range(
1182                &orders,
1183                2,
1184                bounds(&[0x40], &[0xc0]),
1185                "CN=node-b",
1186            ))
1187            .unwrap_err();
1188        assert_eq!(
1189            err,
1190            CatalogError::OverlappingRange {
1191                collection: orders.clone(),
1192                existing: RangeId::new(1),
1193                attempted: RangeId::new(2),
1194            }
1195        );
1196        assert_eq!(catalog.range_count(), 1);
1197    }
1198
1199    #[test]
1200    fn catalog_replicates_to_data_members_with_read_visibility() {
1201        // Leader writes the catalog; a data member holds its own replica and
1202        // applies the same versioned updates — no user-data sharding involved.
1203        let orders = collection("orders");
1204        let mut leader = ShardOwnershipCatalog::new();
1205        let mut data_member = ShardOwnershipCatalog::new();
1206
1207        // Leader creates a range; ship the entry to the data member.
1208        let create = hash_range(&orders, 1, RangeBounds::full(), "CN=node-a");
1209        leader.apply_update(create.clone()).unwrap();
1210        assert_eq!(
1211            data_member.apply_update(create).unwrap(),
1212            UpdateOutcome::Created
1213        );
1214
1215        // The data member can route locally to the same owner the leader has.
1216        assert_eq!(
1217            data_member.route(&orders, b"any-key").unwrap().owner(),
1218            &ident("CN=node-a")
1219        );
1220
1221        // Leader transfers ownership; replicate the v2 entry.
1222        let v2 = leader
1223            .range(&orders, RangeId::new(1))
1224            .unwrap()
1225            .transfer_to(ident("CN=node-b"), []);
1226        leader.apply_update(v2.clone()).unwrap();
1227        assert_eq!(
1228            data_member.apply_update(v2.clone()).unwrap(),
1229            UpdateOutcome::Updated
1230        );
1231        assert_eq!(
1232            data_member.route(&orders, b"any-key").unwrap().owner(),
1233            &ident("CN=node-b")
1234        );
1235
1236        // Out-of-order / duplicate replication: re-delivering v2 after it is
1237        // applied is stale on the replica and rejected, so it stays consistent.
1238        let err = data_member.apply_update(v2).unwrap_err();
1239        assert!(matches!(err, CatalogError::StaleVersion { .. }));
1240        assert_eq!(
1241            data_member
1242                .range(&orders, RangeId::new(1))
1243                .unwrap()
1244                .version()
1245                .value(),
1246            2
1247        );
1248    }
1249
1250    #[test]
1251    fn range_bounds_reject_empty_or_inverted() {
1252        assert!(RangeBounds::new(RangeBound::key([0x10]), RangeBound::key([0x10])).is_err());
1253        assert!(RangeBounds::new(RangeBound::key([0x20]), RangeBound::key([0x10])).is_err());
1254        assert!(RangeBounds::new(RangeBound::Max, RangeBound::Min).is_err());
1255        assert!(RangeBounds::full().contains(b"anything"));
1256    }
1257
1258    // ---------------------------------------------------------------
1259    // Issue #990 — per-range role model and ownership-aware write gate.
1260    // ---------------------------------------------------------------
1261
1262    /// A range owned by `owner` with an explicit replica set.
1263    fn range_with(
1264        coll: &CollectionId,
1265        id: u64,
1266        bnds: RangeBounds,
1267        owner: &str,
1268        replicas: &[&str],
1269    ) -> RangeOwnership {
1270        RangeOwnership::establish(
1271            coll.clone(),
1272            RangeId::new(id),
1273            ShardKeyMode::Hash,
1274            bnds,
1275            ident(owner),
1276            replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
1277            PlacementMetadata::with_replication_factor(3),
1278        )
1279    }
1280
1281    #[test]
1282    fn role_of_distinguishes_owner_replica_and_no_copy() {
1283        let orders = collection("orders");
1284        let range = range_with(&orders, 1, RangeBounds::full(), "CN=node-a", &["CN=node-b"]);
1285
1286        assert_eq!(range.role_of(&ident("CN=node-a")), RangeRole::Owner);
1287        assert_eq!(range.role_of(&ident("CN=node-b")), RangeRole::Replica);
1288        assert_eq!(range.role_of(&ident("CN=node-c")), RangeRole::NoCopy);
1289        assert!(RangeRole::Owner.may_write_public());
1290        assert!(!RangeRole::Replica.may_write_public());
1291        assert!(!RangeRole::NoCopy.may_write_public());
1292    }
1293
1294    #[test]
1295    fn role_is_per_range_not_a_global_node_role() {
1296        // node-a owns range 1 and is only a replica of range 2 — the same node
1297        // holds different roles for different ranges of the same collection.
1298        let mut catalog = ShardOwnershipCatalog::new();
1299        let orders = collection("orders");
1300        catalog
1301            .apply_update(range_with(
1302                &orders,
1303                1,
1304                RangeBounds::new(RangeBound::Min, RangeBound::key([0x80])).unwrap(),
1305                "CN=node-a",
1306                &["CN=node-b"],
1307            ))
1308            .unwrap();
1309        catalog
1310            .apply_update(range_with(
1311                &orders,
1312                2,
1313                RangeBounds::new(RangeBound::key([0x80]), RangeBound::Max).unwrap(),
1314                "CN=node-b",
1315                &["CN=node-a"],
1316            ))
1317            .unwrap();
1318
1319        let node_a = ident("CN=node-a");
1320        assert_eq!(
1321            catalog.role_at(&node_a, &orders, RangeId::new(1)),
1322            Some(RangeRole::Owner)
1323        );
1324        assert_eq!(
1325            catalog.role_at(&node_a, &orders, RangeId::new(2)),
1326            Some(RangeRole::Replica)
1327        );
1328        // A range that does not exist is None, not NoCopy.
1329        assert_eq!(catalog.role_at(&node_a, &orders, RangeId::new(99)), None);
1330        // And a collection nobody placed yet routes nowhere.
1331        assert_eq!(
1332            catalog.role_at(&node_a, &collection("ghost"), RangeId::new(1)),
1333            None
1334        );
1335    }
1336
1337    #[test]
1338    fn public_write_admitted_on_owner_at_matching_epoch() {
1339        let mut catalog = ShardOwnershipCatalog::new();
1340        let orders = collection("orders");
1341        catalog
1342            .apply_update(range_with(
1343                &orders,
1344                1,
1345                RangeBounds::full(),
1346                "CN=node-a",
1347                &["CN=node-b"],
1348            ))
1349            .unwrap();
1350
1351        let admitted = catalog
1352            .admit_public_write(
1353                &ident("CN=node-a"),
1354                &orders,
1355                b"k",
1356                OwnershipEpoch::initial(),
1357            )
1358            .expect("owner at current epoch may write");
1359        assert_eq!(admitted.owner(), &ident("CN=node-a"));
1360        assert_eq!(admitted.range_id(), RangeId::new(1));
1361    }
1362
1363    #[test]
1364    fn public_write_rejected_on_replica_with_routing_error() {
1365        let mut catalog = ShardOwnershipCatalog::new();
1366        let orders = collection("orders");
1367        catalog
1368            .apply_update(range_with(
1369                &orders,
1370                1,
1371                RangeBounds::full(),
1372                "CN=node-a",
1373                &["CN=node-b"],
1374            ))
1375            .unwrap();
1376
1377        // node-b holds a copy but is a replica — a public write must be routed
1378        // to the owner, not applied locally.
1379        let err = catalog
1380            .admit_public_write(
1381                &ident("CN=node-b"),
1382                &orders,
1383                b"k",
1384                OwnershipEpoch::initial(),
1385            )
1386            .unwrap_err();
1387        match err {
1388            RangeWriteReject::NotOwner {
1389                role, ref owner, ..
1390            } => {
1391                assert_eq!(role, RangeRole::Replica);
1392                assert_eq!(owner, &ident("CN=node-a"));
1393            }
1394            other => panic!("expected NotOwner(Replica), got {other:?}"),
1395        }
1396        // The rejection names the owner so the caller can re-route.
1397        assert!(err.to_string().contains("route the write to"));
1398    }
1399
1400    #[test]
1401    fn public_write_rejected_on_no_copy_holder() {
1402        let mut catalog = ShardOwnershipCatalog::new();
1403        let orders = collection("orders");
1404        catalog
1405            .apply_update(range_with(
1406                &orders,
1407                1,
1408                RangeBounds::full(),
1409                "CN=node-a",
1410                &["CN=node-b"],
1411            ))
1412            .unwrap();
1413
1414        // node-c holds no copy of the range at all.
1415        let err = catalog
1416            .admit_public_write(
1417                &ident("CN=node-c"),
1418                &orders,
1419                b"k",
1420                OwnershipEpoch::initial(),
1421            )
1422            .unwrap_err();
1423        match err {
1424            RangeWriteReject::NotOwner { role, .. } => assert_eq!(role, RangeRole::NoCopy),
1425            other => panic!("expected NotOwner(NoCopy), got {other:?}"),
1426        }
1427    }
1428
1429    #[test]
1430    fn public_write_rejected_on_stale_ownership_epoch() {
1431        // Ownership moved a→b and back to a, advancing the epoch twice. A write
1432        // the routing layer authorised under the original epoch must be fenced
1433        // even though node-a is, once again, the current owner.
1434        let mut catalog = ShardOwnershipCatalog::new();
1435        let orders = collection("orders");
1436        let v1 = range_with(&orders, 1, RangeBounds::full(), "CN=node-a", &["CN=node-b"]);
1437        let original_epoch = v1.epoch();
1438        catalog.apply_update(v1.clone()).unwrap();
1439
1440        let v2 = v1.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]);
1441        catalog.apply_update(v2.clone()).unwrap();
1442        let v3 = v2.transfer_to(ident("CN=node-a"), [ident("CN=node-b")]);
1443        catalog.apply_update(v3.clone()).unwrap();
1444
1445        // node-a is the current owner again, but at a newer epoch.
1446        assert_ne!(original_epoch, v3.epoch());
1447        let err = catalog
1448            .admit_public_write(&ident("CN=node-a"), &orders, b"k", original_epoch)
1449            .unwrap_err();
1450        match err {
1451            RangeWriteReject::StaleEpoch {
1452                expected, current, ..
1453            } => {
1454                assert_eq!(expected, original_epoch);
1455                assert_eq!(current, v3.epoch());
1456            }
1457            other => panic!("expected StaleEpoch, got {other:?}"),
1458        }
1459        // The same owner at the *current* epoch is admitted.
1460        assert!(catalog
1461            .admit_public_write(&ident("CN=node-a"), &orders, b"k", v3.epoch())
1462            .is_ok());
1463    }
1464
1465    #[test]
1466    fn public_write_rejected_when_no_range_covers_the_key() {
1467        let catalog = ShardOwnershipCatalog::new();
1468        let orders = collection("orders");
1469        let err = catalog
1470            .admit_public_write(
1471                &ident("CN=node-a"),
1472                &orders,
1473                b"k",
1474                OwnershipEpoch::initial(),
1475            )
1476            .unwrap_err();
1477        assert!(matches!(err, RangeWriteReject::NoRange { .. }));
1478    }
1479
1480    #[test]
1481    fn internal_apply_path_stays_privileged_for_a_public_write_replica() {
1482        // A node that rejects a *public* write because it is only a replica must
1483        // still admit the owner's replicated changes through the privileged
1484        // internal apply path — that path is gated by issue #991's range
1485        // authority fence, not by this public ownership gate.
1486        use crate::replication::cdc::{ChangeOperation, ChangeRecord, RangeAuthority};
1487
1488        let mut catalog = ShardOwnershipCatalog::new();
1489        let orders = collection("orders");
1490        catalog
1491            .apply_update(range_with(
1492                &orders,
1493                7,
1494                RangeBounds::full(),
1495                "CN=node-a",
1496                &["CN=node-b"],
1497            ))
1498            .unwrap();
1499
1500        // Public gate: node-b is a replica → rejected, never reaches storage.
1501        assert!(matches!(
1502            catalog
1503                .admit_public_write(
1504                    &ident("CN=node-b"),
1505                    &orders,
1506                    b"k",
1507                    OwnershipEpoch::initial()
1508                )
1509                .unwrap_err(),
1510            RangeWriteReject::NotOwner {
1511                role: RangeRole::Replica,
1512                ..
1513            }
1514        ));
1515
1516        // Internal apply: the owner's replicated change for the same range is
1517        // admitted by the range-authority fence on the replica.
1518        let record = ChangeRecord {
1519            term: 1,
1520            lsn: 1,
1521            timestamp: 0,
1522            operation: ChangeOperation::Insert,
1523            collection: orders.as_str().to_string(),
1524            entity_id: 1,
1525            entity_kind: "row".to_string(),
1526            entity_bytes: Some(vec![1]),
1527            metadata: None,
1528            refresh_records: None,
1529            range_id: None,
1530            ownership_epoch: None,
1531        }
1532        .with_range_authority(7, OwnershipEpoch::initial().value());
1533        let fence = RangeAuthority {
1534            range_id: 7,
1535            min_term: 1,
1536            min_ownership_epoch: OwnershipEpoch::initial().value(),
1537        };
1538        assert!(
1539            fence.admit(&record).is_ok(),
1540            "replica internal apply must remain privileged for the owner's changes"
1541        );
1542    }
1543}