Skip to main content

reddb_server/cluster/
ownership.rs

1//! The global shard ownership catalog (issue #989, PRD #987, ADR 0037).
2//!
3//! The shard ownership catalog is the **source of truth for range routing and
4//! failover** in a multi-writer cluster. Per the glossary it is *"explicit,
5//! versioned RedDB catalog state that records shard/range bounds, current writer
6//! owner, replicas, and ownership epoch/version"* and — crucially — it is
7//! *"special global control-plane state replicated to all data members rather
8//! than sharded like ordinary user collections"*.
9//!
10//! That last point shapes the whole module. The catalog is the thing that tells
11//! you *where* a collection's data lives, so it cannot itself be located by the
12//! same user-data sharding it describes — that would be circular. Instead every
13//! data member holds a full [`ShardOwnershipCatalog`] replica and routes against
14//! it locally ([`ShardOwnershipCatalog::route`]). Replication is modelled as
15//! shipping versioned [`RangeOwnership`] entries that each member applies through
16//! the same [`apply_update`](ShardOwnershipCatalog::apply_update) path that the
17//! Supervisor leader writes through — and that path rejects stale versions, so a
18//! late or out-of-order replica update can never overwrite newer ownership.
19//!
20//! ## What the catalog records
21//!
22//! One [`RangeOwnership`] entry per owned shard/range carries everything routing
23//! and fencing need:
24//!
25//! * [`CollectionId`] + [`RangeId`] — *which* range of *which* collection.
26//! * [`ShardKeyMode`] — [`Hash`](ShardKeyMode::Hash) (the default, for uniform
27//!   distribution) or [`Ordered`](ShardKeyMode::Ordered) (declared when range
28//!   locality and ordered scans matter more than hotspot resistance).
29//! * [`RangeBounds`] — the half-open `[lower, upper)` partition this entry owns.
30//! * `owner` ([`NodeIdentity`]) + `replicas` — the current single writer for the
31//!   range and its read/catch-up copies.
32//! * [`OwnershipEpoch`] + [`CatalogVersion`] — the fencing epoch (bumped on owner
33//!   change so a stale old owner is fenced) and the monotonic write version
34//!   (bumped on *every* accepted update so stale writes are rejected).
35//! * [`PlacementMetadata`] — replication factor and free-form placement
36//!   attributes (region/zone/weight) the rebalancer reads.
37//!
38//! Ownership changes are produced as *transitions* — new entries built with
39//! [`RangeOwnership::transfer_to`] / [`update_replicas`](RangeOwnership::update_replicas)
40//! / [`update_placement`](RangeOwnership::update_placement) — never arbitrary row
41//! edits, matching ADR 0037's "transitions, not arbitrary row edits".
42//!
43//! Everything here is a pure data model with no I/O, so the routing, versioning,
44//! and replication story is exercised deterministically.
45
46use std::collections::BTreeMap;
47
48use super::identity::NodeIdentity;
49use super::slot::hash_shard_key_to_range_key;
50
51/// A user collection's stable identity, as recorded in the catalog.
52///
53/// The catalog is keyed by collection (and range within it); this is the
54/// collection's own name, not a shard-routed handle. Resolving a
55/// [`CollectionId`] to its ranges needs only the catalog itself — no user-data
56/// sharding — which is what lets the catalog be the thing that *bootstraps*
57/// routing.
58#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
59pub struct CollectionId(String);
60
61#[derive(Debug, Clone, PartialEq, Eq)]
62pub struct CollectionIdError;
63
64impl std::fmt::Display for CollectionIdError {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        write!(f, "collection id is empty")
67    }
68}
69
70impl std::error::Error for CollectionIdError {}
71
72impl CollectionId {
73    /// Build a collection id from a non-empty name.
74    pub fn new(value: impl AsRef<str>) -> Result<Self, CollectionIdError> {
75        let value = value.as_ref().trim();
76        if value.is_empty() {
77            return Err(CollectionIdError);
78        }
79        Ok(Self(value.to_string()))
80    }
81
82    pub fn as_str(&self) -> &str {
83        &self.0
84    }
85}
86
87impl std::fmt::Display for CollectionId {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        f.write_str(&self.0)
90    }
91}
92
93/// Stable identifier for one shard/range within a collection.
94///
95/// Ranges are owned at sub-collection granularity (ADR 0037), so a collection
96/// can have many of these — each is one independently-owned, independently-routed
97/// partition. The id is stable across ownership transitions: moving a range to a
98/// new owner keeps its [`RangeId`] and bumps its epoch/version, it does not mint
99/// a new range.
100#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
101pub struct RangeId(u64);
102
103impl RangeId {
104    pub fn new(value: u64) -> Self {
105        Self(value)
106    }
107
108    pub fn value(self) -> u64 {
109        self.0
110    }
111}
112
113impl std::fmt::Display for RangeId {
114    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115        write!(f, "{}", self.0)
116    }
117}
118
119/// Collection-level partitioning mode for shard/range ownership.
120///
121/// Per the glossary, *"Hash mode is the default for uniform distribution;
122/// ordered mode is declared when range locality and ordered scans matter more
123/// than automatic hotspot resistance."* The mode is fixed per collection: every
124/// range of a collection shares its mode, and an entry whose mode disagrees with
125/// its collection's declared mode is rejected
126/// ([`CatalogError::ShardKeyModeMismatch`]).
127#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
128pub enum ShardKeyMode {
129    /// Uniform hash distribution — the default. Bounds are over hash tokens.
130    #[default]
131    Hash,
132    /// Ordered key ranges, declared for range locality / ordered scans. Bounds
133    /// are over the ordered shard key itself.
134    Ordered,
135}
136
137/// One edge of a [`RangeBounds`].
138///
139/// Bounds are byte strings so the same type serves both shard key modes: a
140/// [`Hash`](ShardKeyMode::Hash) range bounds hash-token bytes, an
141/// [`Ordered`](ShardKeyMode::Ordered) range bounds the ordered key bytes
142/// directly. [`Min`](RangeBound::Min)/[`Max`](RangeBound::Max) are the open ends
143/// of the keyspace.
144#[derive(Debug, Clone, PartialEq, Eq)]
145pub enum RangeBound {
146    /// The open low end of the keyspace (everything is `>= Min`).
147    Min,
148    /// A concrete boundary key.
149    Key(Vec<u8>),
150    /// The open high end of the keyspace (everything is `< Max`).
151    Max,
152}
153
154impl RangeBound {
155    /// A boundary at the given key bytes.
156    pub fn key(bytes: impl Into<Vec<u8>>) -> Self {
157        RangeBound::Key(bytes.into())
158    }
159
160    /// Total order over keyspace positions: `Min < every Key < Max`, with keys
161    /// compared lexicographically. This is what makes both `contains` and
162    /// `overlaps` plain comparisons.
163    fn position(&self) -> Position<'_> {
164        match self {
165            RangeBound::Min => Position::Min,
166            RangeBound::Key(k) => Position::Key(k),
167            RangeBound::Max => Position::Max,
168        }
169    }
170}
171
172#[derive(PartialEq, Eq, PartialOrd, Ord)]
173enum Position<'a> {
174    Min,
175    Key(&'a [u8]),
176    Max,
177}
178
179/// The half-open `[lower, upper)` partition a range owns.
180///
181/// Half-open bounds tile the keyspace without gaps or double-cover: adjacent
182/// ranges share a boundary key that belongs to exactly one of them. `lower` must
183/// be strictly below `upper`, so an empty or inverted range cannot be
184/// constructed.
185#[derive(Debug, Clone, PartialEq, Eq)]
186pub struct RangeBounds {
187    lower: RangeBound,
188    upper: RangeBound,
189}
190
191#[derive(Debug, Clone, PartialEq, Eq)]
192pub struct RangeBoundsError;
193
194impl std::fmt::Display for RangeBoundsError {
195    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
196        write!(
197            f,
198            "range lower bound must be strictly below the upper bound"
199        )
200    }
201}
202
203impl std::error::Error for RangeBoundsError {}
204
205impl RangeBounds {
206    /// Bounds from an explicit `[lower, upper)` pair. Errors if the range would
207    /// be empty or inverted (`lower >= upper`).
208    pub fn new(lower: RangeBound, upper: RangeBound) -> Result<Self, RangeBoundsError> {
209        if lower.position() >= upper.position() {
210            return Err(RangeBoundsError);
211        }
212        Ok(Self { lower, upper })
213    }
214
215    /// The whole keyspace, `[Min, Max)` — a single range covering a collection.
216    pub fn full() -> Self {
217        Self {
218            lower: RangeBound::Min,
219            upper: RangeBound::Max,
220        }
221    }
222
223    pub fn lower(&self) -> &RangeBound {
224        &self.lower
225    }
226
227    pub fn upper(&self) -> &RangeBound {
228        &self.upper
229    }
230
231    /// Does `key` fall inside this half-open range? Lower bound inclusive, upper
232    /// bound exclusive — the routing predicate.
233    pub fn contains(&self, key: &[u8]) -> bool {
234        let key = Position::Key(key);
235        self.lower.position() <= key && key < self.upper.position()
236    }
237
238    /// Do these two ranges share any key? Used to keep a collection's ranges
239    /// non-overlapping so routing resolves to exactly one owner.
240    pub fn overlaps(&self, other: &RangeBounds) -> bool {
241        self.lower.position() < other.upper.position()
242            && other.lower.position() < self.upper.position()
243    }
244
245    /// Split this `[lower, upper)` range at `at` into a lower child
246    /// `[lower, at)` and an upper child `[at, upper)`. The split point must fall
247    /// **strictly inside** the range (`lower < at < upper`); a point at or
248    /// outside a bound would carve off an empty child and is rejected with
249    /// [`RangeBoundsError`]. The two children tile the original exactly — no gap,
250    /// no overlap — which is what lets a [split-and-move](super::move_range) shrink
251    /// the retained child and create the moved child without making routing
252    /// ambiguous.
253    pub fn split_at(&self, at: &[u8]) -> Result<(RangeBounds, RangeBounds), RangeBoundsError> {
254        let at_pos = Position::Key(at);
255        if at_pos <= self.lower.position() || at_pos >= self.upper.position() {
256            return Err(RangeBoundsError);
257        }
258        let lower = RangeBounds {
259            lower: self.lower.clone(),
260            upper: RangeBound::key(at.to_vec()),
261        };
262        let upper = RangeBounds {
263            lower: RangeBound::key(at.to_vec()),
264            upper: self.upper.clone(),
265        };
266        Ok((lower, upper))
267    }
268}
269
270/// Monotonic write version of a single catalog entry.
271///
272/// Every accepted update to a range bumps its version. An update that does not
273/// strictly advance the version is stale and is rejected — this is the
274/// compare-and-advance rule that makes catalog replication safe regardless of
275/// delivery order.
276#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
277pub struct CatalogVersion(u64);
278
279impl CatalogVersion {
280    /// The version a range is created at.
281    pub fn initial() -> Self {
282        Self(1)
283    }
284
285    pub fn value(self) -> u64 {
286        self.0
287    }
288
289    fn next(self) -> Self {
290        Self(self.0 + 1)
291    }
292}
293
294impl std::fmt::Display for CatalogVersion {
295    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
296        write!(f, "{}", self.0)
297    }
298}
299
300/// Fencing epoch for a range's write authority.
301///
302/// Distinct from [`CatalogVersion`]: the version advances on *any* catalog edit,
303/// but the epoch advances only when **write authority moves** (a new owner). A
304/// WAL/logical record stamped with an epoch older than the catalog's current
305/// epoch is from a fenced old owner and must be rejected (ADR 0037, "fencing is
306/// enforced below routing").
307#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
308pub struct OwnershipEpoch(u64);
309
310impl OwnershipEpoch {
311    /// The epoch a range is created at.
312    pub fn initial() -> Self {
313        Self(1)
314    }
315
316    pub fn value(self) -> u64 {
317        self.0
318    }
319
320    fn next(self) -> Self {
321        Self(self.0 + 1)
322    }
323}
324
325impl std::fmt::Display for OwnershipEpoch {
326    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
327        write!(f, "{}", self.0)
328    }
329}
330
331/// Placement metadata the rebalancer reads when planning transitions.
332///
333/// The MVP carries the range's replication factor plus a free-form attribute map
334/// for region/zone/operator-weight hints. It is descriptive control-plane data,
335/// not an authorization source.
336#[derive(Debug, Clone, Default, PartialEq, Eq)]
337pub struct PlacementMetadata {
338    replication_factor: usize,
339    attributes: BTreeMap<String, String>,
340}
341
342impl PlacementMetadata {
343    /// Placement with a target replication factor and no attributes.
344    pub fn with_replication_factor(replication_factor: usize) -> Self {
345        Self {
346            replication_factor,
347            attributes: BTreeMap::new(),
348        }
349    }
350
351    /// Attach a placement attribute (e.g. `region` → `us-east-1`).
352    pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
353        self.attributes.insert(key.into(), value.into());
354        self
355    }
356
357    pub fn replication_factor(&self) -> usize {
358        self.replication_factor
359    }
360
361    pub fn attribute(&self, key: &str) -> Option<&str> {
362        self.attributes.get(key).map(String::as_str)
363    }
364}
365
366/// One owned shard/range: the catalog's unit of routing and fencing.
367///
368/// An entry is self-describing — it carries its collection, range id, mode,
369/// bounds, owner, replicas, epoch, version, and placement — so a data member
370/// that receives it by replication can route and fence without consulting any
371/// other state. New ownership states are produced as *transitions* off an
372/// existing entry ([`transfer_to`](Self::transfer_to),
373/// [`update_replicas`](Self::update_replicas),
374/// [`update_placement`](Self::update_placement)), each of which advances the
375/// version — and, for an owner change, the fencing epoch.
376#[derive(Debug, Clone, PartialEq, Eq)]
377pub struct RangeOwnership {
378    collection: CollectionId,
379    range_id: RangeId,
380    shard_key_mode: ShardKeyMode,
381    bounds: RangeBounds,
382    owner: NodeIdentity,
383    replicas: Vec<NodeIdentity>,
384    epoch: OwnershipEpoch,
385    version: CatalogVersion,
386    placement: PlacementMetadata,
387}
388
389impl RangeOwnership {
390    /// The initial ownership state for a freshly created range: version and
391    /// epoch both at their [`initial`](CatalogVersion::initial) values.
392    #[allow(clippy::too_many_arguments)]
393    pub fn establish(
394        collection: CollectionId,
395        range_id: RangeId,
396        shard_key_mode: ShardKeyMode,
397        bounds: RangeBounds,
398        owner: NodeIdentity,
399        replicas: impl IntoIterator<Item = NodeIdentity>,
400        placement: PlacementMetadata,
401    ) -> Self {
402        Self {
403            collection,
404            range_id,
405            shard_key_mode,
406            bounds,
407            owner,
408            replicas: replicas.into_iter().collect(),
409            epoch: OwnershipEpoch::initial(),
410            version: CatalogVersion::initial(),
411            placement,
412        }
413    }
414
415    pub fn collection(&self) -> &CollectionId {
416        &self.collection
417    }
418
419    pub fn range_id(&self) -> RangeId {
420        self.range_id
421    }
422
423    pub fn shard_key_mode(&self) -> ShardKeyMode {
424        self.shard_key_mode
425    }
426
427    pub fn bounds(&self) -> &RangeBounds {
428        &self.bounds
429    }
430
431    pub fn owner(&self) -> &NodeIdentity {
432        &self.owner
433    }
434
435    pub fn replicas(&self) -> &[NodeIdentity] {
436        &self.replicas
437    }
438
439    pub fn epoch(&self) -> OwnershipEpoch {
440        self.epoch
441    }
442
443    pub fn version(&self) -> CatalogVersion {
444        self.version
445    }
446
447    pub fn placement(&self) -> &PlacementMetadata {
448        &self.placement
449    }
450
451    /// The catalog key for this entry: `(collection, range_id)`.
452    fn key(&self) -> (CollectionId, RangeId) {
453        (self.collection.clone(), self.range_id)
454    }
455
456    /// A transition that moves write authority to `new_owner` with `new_replicas`.
457    /// Advances **both** the version (it is a catalog write) and the ownership
458    /// epoch (write authority moved, so any old owner is fenced).
459    pub fn transfer_to(
460        &self,
461        new_owner: NodeIdentity,
462        new_replicas: impl IntoIterator<Item = NodeIdentity>,
463    ) -> Self {
464        Self {
465            owner: new_owner,
466            replicas: new_replicas.into_iter().collect(),
467            epoch: self.epoch.next(),
468            version: self.version.next(),
469            ..self.clone()
470        }
471    }
472
473    /// A transition that changes only the replica set. Advances the version but
474    /// **not** the epoch: write authority did not move, so no owner is fenced.
475    pub fn update_replicas(&self, new_replicas: impl IntoIterator<Item = NodeIdentity>) -> Self {
476        Self {
477            replicas: new_replicas.into_iter().collect(),
478            version: self.version.next(),
479            ..self.clone()
480        }
481    }
482
483    /// A transition that changes only placement metadata. Advances the version
484    /// but not the epoch.
485    pub fn update_placement(&self, placement: PlacementMetadata) -> Self {
486        Self {
487            placement,
488            version: self.version.next(),
489            ..self.clone()
490        }
491    }
492
493    /// A transition that **re-bounds** the range without moving write authority —
494    /// the retained-child step of a range split, which narrows this entry to the
495    /// keys its owner keeps while a sibling entry takes the carved-off subrange.
496    /// Advances the version but **not** the epoch: the same owner keeps writing
497    /// the retained keys, so no one is fenced.
498    pub fn with_bounds(&self, bounds: RangeBounds) -> Self {
499        Self {
500            bounds,
501            version: self.version.next(),
502            ..self.clone()
503        }
504    }
505
506    /// This node's [`RangeRole`] for *this* range (issue #990).
507    ///
508    /// A data member is the single writer ([`Owner`](RangeRole::Owner)) of a
509    /// range, holds a read/catch-up copy ([`Replica`](RangeRole::Replica)), or
510    /// holds no copy at all ([`NoCopy`](RangeRole::NoCopy)). The role is
511    /// per-range, not a global node role: the same node can be owner of one
512    /// range, replica of another, and uninvolved in a third — which is why this
513    /// is the input to the ownership-aware public-write gate rather than the
514    /// instance-wide [`WriteGate`](crate::runtime::write_gate::WriteGate).
515    pub fn role_of(&self, node: &NodeIdentity) -> RangeRole {
516        if self.owner == *node {
517            RangeRole::Owner
518        } else if self.replicas.iter().any(|replica| replica == node) {
519            RangeRole::Replica
520        } else {
521            RangeRole::NoCopy
522        }
523    }
524}
525
526/// A data member's role for one specific range (issue #990, PRD #987).
527///
528/// Distinguishes the three positions a node can hold relative to a range, which
529/// the ownership-aware write gate
530/// ([`admit_public_write`](ShardOwnershipCatalog::admit_public_write)) turns
531/// into an allow/reject decision: only the current [`Owner`](Self::Owner) may
532/// take a *public* write for the range; a [`Replica`](Self::Replica) and a
533/// [`NoCopy`](Self::NoCopy) node both reject it and the caller must route to the
534/// owner. (A replica still applies the owner's changes through the privileged
535/// internal apply path — that path is gated by the range-authority fence from
536/// issue #991, not by this public gate.)
537#[derive(Debug, Clone, Copy, PartialEq, Eq)]
538pub enum RangeRole {
539    /// The current single writer for the range — the only role a public write
540    /// may land on.
541    Owner,
542    /// Holds a read/catch-up copy but is not the writer. Public writes are
543    /// rejected and routed to the owner; replicated changes still flow in via
544    /// the privileged internal apply path.
545    Replica,
546    /// Holds no copy of the range at all.
547    NoCopy,
548}
549
550impl RangeRole {
551    /// Whether this role may accept a *public* write for the range. Only the
552    /// owner may; replica and no-copy may not.
553    pub fn may_write_public(self) -> bool {
554        matches!(self, RangeRole::Owner)
555    }
556
557    fn label(self) -> &'static str {
558        match self {
559            RangeRole::Owner => "owner",
560            RangeRole::Replica => "replica",
561            RangeRole::NoCopy => "no-copy",
562        }
563    }
564}
565
566/// Whether an accepted update created a new range or advanced an existing one.
567#[derive(Debug, Clone, Copy, PartialEq, Eq)]
568pub enum UpdateOutcome {
569    /// The range did not exist and was created at this version.
570    Created,
571    /// An existing range advanced to a newer version.
572    Updated,
573}
574
575/// Why a catalog update was rejected.
576#[derive(Debug, Clone, PartialEq, Eq)]
577pub enum CatalogError {
578    /// The update's version did not strictly advance the range's current
579    /// version — a stale or out-of-order write. Carries both versions so the
580    /// caller (or a replica) can see how far behind it was.
581    StaleVersion {
582        collection: CollectionId,
583        range_id: RangeId,
584        current: CatalogVersion,
585        attempted: CatalogVersion,
586    },
587    /// The entry's shard key mode disagrees with the collection's declared mode.
588    /// A collection is hash- *or* ordered-partitioned, never both.
589    ShardKeyModeMismatch {
590        collection: CollectionId,
591        declared: ShardKeyMode,
592        attempted: ShardKeyMode,
593    },
594    /// Creating this range would overlap an existing range of the same
595    /// collection, which would make routing ambiguous.
596    OverlappingRange {
597        collection: CollectionId,
598        existing: RangeId,
599        attempted: RangeId,
600    },
601}
602
603impl std::fmt::Display for CatalogError {
604    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
605        match self {
606            Self::StaleVersion {
607                collection,
608                range_id,
609                current,
610                attempted,
611            } => write!(
612                f,
613                "stale catalog update for {collection}/{range_id}: current version {current}, attempted {attempted}"
614            ),
615            Self::ShardKeyModeMismatch {
616                collection,
617                declared,
618                attempted,
619            } => write!(
620                f,
621                "collection {collection} is declared {declared:?} but range uses {attempted:?}"
622            ),
623            Self::OverlappingRange {
624                collection,
625                existing,
626                attempted,
627            } => write!(
628                f,
629                "range {attempted} overlaps existing range {existing} of collection {collection}"
630            ),
631        }
632    }
633}
634
635impl std::error::Error for CatalogError {}
636
637/// Why an ownership-aware *public* write was rejected (issue #990).
638///
639/// This is a **routing/ownership** error, deliberately distinct from the
640/// instance-wide read-only rejection raised by
641/// [`WriteGate`](crate::runtime::write_gate::WriteGate): a replica/non-holder
642/// rejecting a public write is not "this node is read-only", it is "this node is
643/// not the authority for *this range* — route to the owner". Crucially, none of
644/// these rejections fall back to the privileged internal replica-apply path; a
645/// public write that is not for this node's owned range never reaches storage.
646#[derive(Debug, Clone, PartialEq, Eq)]
647pub enum RangeWriteReject {
648    /// No range of the collection covers the routed key, so the write cannot be
649    /// placed. The caller must (re)resolve routing against a fresher catalog.
650    NoRange { collection: CollectionId },
651    /// This node holds the range but is not its owner (a [`Replica`]), or holds
652    /// no copy at all ([`NoCopy`]). Either way a public write must be routed to
653    /// `owner`, never applied locally.
654    ///
655    /// [`Replica`]: RangeRole::Replica
656    /// [`NoCopy`]: RangeRole::NoCopy
657    NotOwner {
658        collection: CollectionId,
659        range_id: RangeId,
660        role: RangeRole,
661        owner: NodeIdentity,
662    },
663    /// This node *is* the range owner, but the write was authorised under an
664    /// ownership epoch that no longer matches the catalog — a write fenced out
665    /// because ownership has since moved (its epoch advanced). Carries both
666    /// epochs so the caller can see how far the routing decision was behind.
667    StaleEpoch {
668        collection: CollectionId,
669        range_id: RangeId,
670        expected: OwnershipEpoch,
671        current: OwnershipEpoch,
672    },
673}
674
675impl std::fmt::Display for RangeWriteReject {
676    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
677        match self {
678            Self::NoRange { collection } => write!(
679                f,
680                "no range of collection {collection} covers the routed key — re-resolve routing"
681            ),
682            Self::NotOwner {
683                collection,
684                range_id,
685                role,
686                owner,
687            } => write!(
688                f,
689                "this node is {} of {collection}/{range_id}, not its owner — route the write to {owner}",
690                role.label()
691            ),
692            Self::StaleEpoch {
693                collection,
694                range_id,
695                expected,
696                current,
697            } => write!(
698                f,
699                "stale ownership epoch for {collection}/{range_id}: write authorised under epoch {expected}, current is {current}"
700            ),
701        }
702    }
703}
704
705impl std::error::Error for RangeWriteReject {}
706
707/// The global shard ownership catalog held by every data member.
708///
709/// This single type plays both roles in ADR 0037's model: it is the authoritative
710/// state the Cluster Supervisor leader writes through, and it is the replica each
711/// data member holds and routes against. Both write through
712/// [`apply_update`](Self::apply_update), so the stale-version rejection that
713/// makes leader writes versioned is the *same* rule that makes replica
714/// application order-independent. Nothing here needs user-data sharding to find
715/// an entry: ranges are addressed directly by `(collection, range_id)`, and
716/// routing ([`route`](Self::route)) is a local scan of the replica.
717#[derive(Debug, Clone, Default)]
718pub struct ShardOwnershipCatalog {
719    /// Declared shard key mode per collection. A collection is recorded here the
720    /// moment its first range is created (or via [`declare_collection`]).
721    ///
722    /// [`declare_collection`]: Self::declare_collection
723    collections: BTreeMap<CollectionId, ShardKeyMode>,
724    ranges: BTreeMap<(CollectionId, RangeId), RangeOwnership>,
725}
726
727impl ShardOwnershipCatalog {
728    /// An empty catalog — a cluster with no collections placed yet.
729    pub fn new() -> Self {
730        Self::default()
731    }
732
733    /// Declare a collection's shard key mode up front. Hash is the default, so
734    /// this is mainly how an operator opts a collection into
735    /// [`Ordered`](ShardKeyMode::Ordered) mode before any range exists. Declaring
736    /// the same mode twice is idempotent; redeclaring a different mode for a
737    /// collection that already has a mode is a [`ShardKeyModeMismatch`].
738    ///
739    /// [`ShardKeyModeMismatch`]: CatalogError::ShardKeyModeMismatch
740    pub fn declare_collection(
741        &mut self,
742        collection: CollectionId,
743        mode: ShardKeyMode,
744    ) -> Result<(), CatalogError> {
745        match self.collections.get(&collection) {
746            Some(&declared) if declared != mode => Err(CatalogError::ShardKeyModeMismatch {
747                collection,
748                declared,
749                attempted: mode,
750            }),
751            _ => {
752                self.collections.insert(collection, mode);
753                Ok(())
754            }
755        }
756    }
757
758    /// The declared shard key mode of `collection`, if it has any ranges or was
759    /// explicitly declared.
760    pub fn shard_key_mode(&self, collection: &CollectionId) -> Option<ShardKeyMode> {
761        self.collections.get(collection).copied()
762    }
763
764    /// Apply a versioned ownership update — the single write path for both leader
765    /// writes and replica application.
766    ///
767    /// Creation (the range does not yet exist) auto-declares the collection's
768    /// mode from the entry and checks the new range does not overlap a sibling.
769    /// Updating an existing range requires the entry's version to **strictly
770    /// advance** the current version; anything else is a
771    /// [`StaleVersion`](CatalogError::StaleVersion) rejection that leaves the
772    /// catalog untouched. Either way the entry's mode must match the collection's
773    /// declared mode.
774    pub fn apply_update(&mut self, entry: RangeOwnership) -> Result<UpdateOutcome, CatalogError> {
775        // Mode must agree with the collection (auto-declared on first range).
776        match self.collections.get(entry.collection()) {
777            Some(&declared) if declared != entry.shard_key_mode() => {
778                return Err(CatalogError::ShardKeyModeMismatch {
779                    collection: entry.collection().clone(),
780                    declared,
781                    attempted: entry.shard_key_mode(),
782                });
783            }
784            _ => {}
785        }
786
787        let key = entry.key();
788        match self.ranges.get(&key) {
789            Some(current) => {
790                if entry.version() <= current.version() {
791                    return Err(CatalogError::StaleVersion {
792                        collection: entry.collection().clone(),
793                        range_id: entry.range_id(),
794                        current: current.version(),
795                        attempted: entry.version(),
796                    });
797                }
798                if let Some(existing) = self.overlapping_sibling(&entry) {
799                    return Err(CatalogError::OverlappingRange {
800                        collection: entry.collection().clone(),
801                        existing,
802                        attempted: entry.range_id(),
803                    });
804                }
805                self.collections
806                    .insert(entry.collection().clone(), entry.shard_key_mode());
807                self.ranges.insert(key, entry);
808                Ok(UpdateOutcome::Updated)
809            }
810            None => {
811                // Creating a range: it must not overlap any sibling range of the
812                // same collection, or routing would be ambiguous.
813                if let Some(existing) = self.overlapping_sibling(&entry) {
814                    return Err(CatalogError::OverlappingRange {
815                        collection: entry.collection().clone(),
816                        existing,
817                        attempted: entry.range_id(),
818                    });
819                }
820                self.collections
821                    .insert(entry.collection().clone(), entry.shard_key_mode());
822                self.ranges.insert(key, entry);
823                Ok(UpdateOutcome::Created)
824            }
825        }
826    }
827
828    fn overlapping_sibling(&self, entry: &RangeOwnership) -> Option<RangeId> {
829        self.ranges_for(entry.collection())
830            .find(|range| {
831                range.range_id() != entry.range_id() && range.bounds().overlaps(entry.bounds())
832            })
833            .map(RangeOwnership::range_id)
834    }
835
836    /// The current ownership of one range, addressed directly by identity — no
837    /// routing required, because the catalog is what routing is built on.
838    pub fn range(&self, collection: &CollectionId, range_id: RangeId) -> Option<&RangeOwnership> {
839        self.ranges.get(&(collection.clone(), range_id))
840    }
841
842    /// Every range of `collection`, in range-id order.
843    pub fn ranges_for<'a>(
844        &'a self,
845        collection: &CollectionId,
846    ) -> impl Iterator<Item = &'a RangeOwnership> {
847        let collection = collection.clone();
848        self.ranges
849            .iter()
850            .filter(move |((c, _), _)| *c == collection)
851            .map(|(_, r)| r)
852    }
853
854    /// Route a normalized range key to the range that owns it — the catalog read
855    /// every routing decision makes. Returns the owning [`RangeOwnership`] (whose
856    /// `owner`, `epoch`, and `replicas` the caller uses to send and fence the
857    /// write), or `None` if no range covers the key yet.
858    pub fn route(&self, collection: &CollectionId, key: &[u8]) -> Option<&RangeOwnership> {
859        self.ranges_for(collection)
860            .find(|r| r.bounds().contains(key))
861    }
862
863    /// Route a logical shard key according to the collection's declared mode.
864    ///
865    /// Ordered collections route the shard key bytes directly. Hash collections
866    /// first map the shard key into a stable hash slot and route that slot's
867    /// range key through the same `collection -> range` catalog.
868    pub fn route_shard_key(
869        &self,
870        collection: &CollectionId,
871        shard_key: &[u8],
872    ) -> Option<&RangeOwnership> {
873        match self.shard_key_mode(collection)? {
874            ShardKeyMode::Ordered => self.route(collection, shard_key),
875            ShardKeyMode::Hash => {
876                let range_key = hash_shard_key_to_range_key(shard_key);
877                self.route(collection, &range_key)
878            }
879        }
880    }
881
882    /// This node's [`RangeRole`] for a directly-addressed range (issue #990).
883    /// Returns `None` when no such range exists in the catalog — distinct from
884    /// [`NoCopy`](RangeRole::NoCopy), which means the range exists but this node
885    /// holds no copy of it.
886    pub fn role_at(
887        &self,
888        node: &NodeIdentity,
889        collection: &CollectionId,
890        range_id: RangeId,
891    ) -> Option<RangeRole> {
892        self.range(collection, range_id)
893            .map(|range| range.role_of(node))
894    }
895
896    /// Ownership-aware gate for a **public** write (issue #990, PRD #987).
897    ///
898    /// Routes `key` to its range, then admits the write only when `node` is the
899    /// range's current [`Owner`](RangeRole::Owner) **and** `expected_epoch`
900    /// matches the range's current ownership epoch. On success returns the owned
901    /// [`RangeOwnership`] (so the caller can proceed with the write against the
902    /// authoritative epoch); otherwise a [`RangeWriteReject`] explaining why.
903    ///
904    /// This is the public surface's gate — the counterpart of the instance-wide
905    /// [`WriteGate`](crate::runtime::write_gate::WriteGate) for multi-writer,
906    /// per-range ownership. The internal replica-apply path does **not** consult
907    /// it: replicated changes flow into a replica through the privileged apply
908    /// path (fenced by issue #991's range-authority watermark), so a node that
909    /// rejects a *public* write here can still legitimately apply the owner's
910    /// replicated changes for the very same range.
911    pub fn admit_public_write(
912        &self,
913        node: &NodeIdentity,
914        collection: &CollectionId,
915        key: &[u8],
916        expected_epoch: OwnershipEpoch,
917    ) -> Result<&RangeOwnership, RangeWriteReject> {
918        let range =
919            self.route_shard_key(collection, key)
920                .ok_or_else(|| RangeWriteReject::NoRange {
921                    collection: collection.clone(),
922                })?;
923        let role = range.role_of(node);
924        if !role.may_write_public() {
925            return Err(RangeWriteReject::NotOwner {
926                collection: collection.clone(),
927                range_id: range.range_id(),
928                role,
929                owner: range.owner().clone(),
930            });
931        }
932        if expected_epoch != range.epoch() {
933            return Err(RangeWriteReject::StaleEpoch {
934                collection: collection.clone(),
935                range_id: range.range_id(),
936                expected: expected_epoch,
937                current: range.epoch(),
938            });
939        }
940        Ok(range)
941    }
942
943    /// Total number of owned ranges across all collections.
944    pub fn range_count(&self) -> usize {
945        self.ranges.len()
946    }
947
948    /// All ranges, in `(collection, range_id)` order — the full catalog content
949    /// a joining member adopts as its starting replica
950    /// (see [`ControlPlaneSnapshot`](super::join::ControlPlaneSnapshot)).
951    pub fn entries(&self) -> impl Iterator<Item = &RangeOwnership> {
952        self.ranges.values()
953    }
954}
955
956#[cfg(test)]
957mod tests {
958    use super::*;
959
960    fn collection(name: &str) -> CollectionId {
961        CollectionId::new(name).unwrap()
962    }
963
964    fn ident(cn: &str) -> NodeIdentity {
965        NodeIdentity::from_certificate_subject(cn).unwrap()
966    }
967
968    fn bounds(lower: &[u8], upper: &[u8]) -> RangeBounds {
969        RangeBounds::new(RangeBound::key(lower), RangeBound::key(upper)).unwrap()
970    }
971
972    /// A hash range over `[lower, Max)` owned by `owner`.
973    fn hash_range(coll: &CollectionId, id: u64, bnds: RangeBounds, owner: &str) -> RangeOwnership {
974        RangeOwnership::establish(
975            coll.clone(),
976            RangeId::new(id),
977            ShardKeyMode::Hash,
978            bnds,
979            ident(owner),
980            [ident("CN=replica-1")],
981            PlacementMetadata::with_replication_factor(3),
982        )
983    }
984
985    fn single_hash_slot_bounds(key: &[u8]) -> RangeBounds {
986        let slot = super::super::slot::hash_shard_key_to_slot(key);
987        let lower = RangeBound::key(slot.range_key());
988        let upper = match slot.value().checked_add(1) {
989            Some(next) if next < super::super::slot::PRODUCTION_HASH_SLOT_COUNT => {
990                RangeBound::key(super::super::slot::HashSlot::new(next).unwrap().range_key())
991            }
992            _ => RangeBound::Max,
993        };
994        RangeBounds::new(lower, upper).unwrap()
995    }
996
997    #[test]
998    fn empty_catalog_creation() {
999        let catalog = ShardOwnershipCatalog::new();
1000        assert_eq!(catalog.range_count(), 0);
1001        assert!(catalog.shard_key_mode(&collection("orders")).is_none());
1002    }
1003
1004    #[test]
1005    fn hash_is_the_default_shard_key_mode() {
1006        // The first range of a collection auto-declares its mode; a range built
1007        // with the default mode lands the collection in Hash mode.
1008        assert_eq!(ShardKeyMode::default(), ShardKeyMode::Hash);
1009
1010        let mut catalog = ShardOwnershipCatalog::new();
1011        let orders = collection("orders");
1012        catalog
1013            .apply_update(hash_range(&orders, 1, RangeBounds::full(), "CN=node-a"))
1014            .unwrap();
1015        assert_eq!(catalog.shard_key_mode(&orders), Some(ShardKeyMode::Hash));
1016    }
1017
1018    #[test]
1019    fn hash_range_entry_routes_to_owner() {
1020        let mut catalog = ShardOwnershipCatalog::new();
1021        let orders = collection("orders");
1022
1023        // Two hash token ranges split at 0x80.
1024        catalog
1025            .apply_update(hash_range(
1026                &orders,
1027                1,
1028                RangeBounds::new(RangeBound::Min, RangeBound::key([0x80])).unwrap(),
1029                "CN=node-a",
1030            ))
1031            .unwrap();
1032        catalog
1033            .apply_update(hash_range(
1034                &orders,
1035                2,
1036                RangeBounds::new(RangeBound::key([0x80]), RangeBound::Max).unwrap(),
1037                "CN=node-b",
1038            ))
1039            .unwrap();
1040
1041        // Routing reads expose the owner for a key without any user-data sharding.
1042        assert_eq!(
1043            catalog.route(&orders, &[0x10]).unwrap().owner(),
1044            &ident("CN=node-a")
1045        );
1046        assert_eq!(
1047            catalog.route(&orders, &[0x80]).unwrap().owner(),
1048            &ident("CN=node-b")
1049        );
1050        assert_eq!(
1051            catalog.route(&orders, &[0xff]).unwrap().owner(),
1052            &ident("CN=node-b")
1053        );
1054        // The routing read also exposes replicas and fencing epoch.
1055        let r = catalog.route(&orders, &[0x10]).unwrap();
1056        assert_eq!(r.replicas(), &[ident("CN=replica-1")]);
1057        assert_eq!(r.epoch(), OwnershipEpoch::initial());
1058    }
1059
1060    #[test]
1061    fn hash_mode_routes_logical_shard_key_through_hash_slot() {
1062        let mut catalog = ShardOwnershipCatalog::new();
1063        let orders = collection("orders");
1064        let key = b"tenant:42";
1065        catalog
1066            .apply_update(hash_range(
1067                &orders,
1068                1,
1069                single_hash_slot_bounds(key),
1070                "CN=node-a",
1071            ))
1072            .unwrap();
1073
1074        let routed = catalog
1075            .route_shard_key(&orders, key)
1076            .expect("hash slot range covers the logical shard key");
1077        assert_eq!(routed.owner(), &ident("CN=node-a"));
1078    }
1079
1080    #[test]
1081    fn ordered_mode_can_be_declared_and_routed() {
1082        let mut catalog = ShardOwnershipCatalog::new();
1083        let events = collection("events");
1084        catalog
1085            .declare_collection(events.clone(), ShardKeyMode::Ordered)
1086            .unwrap();
1087        assert_eq!(catalog.shard_key_mode(&events), Some(ShardKeyMode::Ordered));
1088
1089        // Ordered ranges bound the ordered key itself: [a, m) and [m, z).
1090        catalog
1091            .apply_update(RangeOwnership::establish(
1092                events.clone(),
1093                RangeId::new(1),
1094                ShardKeyMode::Ordered,
1095                bounds(b"a", b"m"),
1096                ident("CN=node-a"),
1097                [],
1098                PlacementMetadata::with_replication_factor(3),
1099            ))
1100            .unwrap();
1101        catalog
1102            .apply_update(RangeOwnership::establish(
1103                events.clone(),
1104                RangeId::new(2),
1105                ShardKeyMode::Ordered,
1106                bounds(b"m", b"z"),
1107                ident("CN=node-b"),
1108                [],
1109                PlacementMetadata::with_replication_factor(3),
1110            ))
1111            .unwrap();
1112
1113        assert_eq!(
1114            catalog.route(&events, b"alpha").unwrap().owner(),
1115            &ident("CN=node-a")
1116        );
1117        assert_eq!(
1118            catalog.route(&events, b"mike").unwrap().owner(),
1119            &ident("CN=node-b")
1120        );
1121        // A key outside every declared range routes nowhere.
1122        assert!(catalog.route(&events, b"zzz").is_none());
1123    }
1124
1125    #[test]
1126    fn declaring_a_conflicting_mode_is_rejected() {
1127        let mut catalog = ShardOwnershipCatalog::new();
1128        let events = collection("events");
1129        catalog
1130            .declare_collection(events.clone(), ShardKeyMode::Ordered)
1131            .unwrap();
1132        // Redeclaring the same mode is fine.
1133        catalog
1134            .declare_collection(events.clone(), ShardKeyMode::Ordered)
1135            .unwrap();
1136        // A different mode is a mismatch.
1137        let err = catalog
1138            .declare_collection(events.clone(), ShardKeyMode::Hash)
1139            .unwrap_err();
1140        assert_eq!(
1141            err,
1142            CatalogError::ShardKeyModeMismatch {
1143                collection: events.clone(),
1144                declared: ShardKeyMode::Ordered,
1145                attempted: ShardKeyMode::Hash,
1146            }
1147        );
1148        // And a range whose mode disagrees with the declared collection is rejected.
1149        let err = catalog
1150            .apply_update(hash_range(&events, 1, RangeBounds::full(), "CN=node-a"))
1151            .unwrap_err();
1152        assert!(matches!(err, CatalogError::ShardKeyModeMismatch { .. }));
1153    }
1154
1155    #[test]
1156    fn version_bumps_on_owner_transfer_and_epoch_fences() {
1157        let mut catalog = ShardOwnershipCatalog::new();
1158        let orders = collection("orders");
1159        catalog
1160            .apply_update(hash_range(&orders, 1, RangeBounds::full(), "CN=node-a"))
1161            .unwrap();
1162
1163        let current = catalog.range(&orders, RangeId::new(1)).unwrap();
1164        assert_eq!(current.version(), CatalogVersion::initial());
1165        assert_eq!(current.epoch(), OwnershipEpoch::initial());
1166
1167        // Owner transfer advances both version and fencing epoch.
1168        let moved = current.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]);
1169        let outcome = catalog.apply_update(moved).unwrap();
1170        assert_eq!(outcome, UpdateOutcome::Updated);
1171
1172        let after = catalog.range(&orders, RangeId::new(1)).unwrap();
1173        assert_eq!(after.owner(), &ident("CN=node-b"));
1174        assert_eq!(after.version().value(), 2);
1175        assert_eq!(after.epoch().value(), 2); // old owner is now fenced
1176
1177        // A replica-set change advances the version but NOT the epoch.
1178        let replicas_changed = after.update_replicas([ident("CN=node-c")]);
1179        catalog.apply_update(replicas_changed).unwrap();
1180        let after2 = catalog.range(&orders, RangeId::new(1)).unwrap();
1181        assert_eq!(after2.version().value(), 3);
1182        assert_eq!(after2.epoch().value(), 2); // write authority did not move
1183        assert_eq!(after2.replicas(), &[ident("CN=node-c")]);
1184    }
1185
1186    #[test]
1187    fn stale_update_is_rejected_and_leaves_catalog_unchanged() {
1188        let mut catalog = ShardOwnershipCatalog::new();
1189        let orders = collection("orders");
1190        catalog
1191            .apply_update(hash_range(&orders, 1, RangeBounds::full(), "CN=node-a"))
1192            .unwrap();
1193
1194        let v1 = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
1195        // Advance to v2.
1196        catalog
1197            .apply_update(v1.transfer_to(ident("CN=node-b"), []))
1198            .unwrap();
1199        assert_eq!(
1200            catalog.range(&orders, RangeId::new(1)).unwrap().owner(),
1201            &ident("CN=node-b")
1202        );
1203
1204        // Re-applying the original v1 entry (and even a fresh v1-versioned write)
1205        // is stale: version 1 does not advance past the current version 2.
1206        let err = catalog.apply_update(v1.clone()).unwrap_err();
1207        assert_eq!(
1208            err,
1209            CatalogError::StaleVersion {
1210                collection: orders.clone(),
1211                range_id: RangeId::new(1),
1212                current: CatalogVersion::initial().next(),
1213                attempted: CatalogVersion::initial(),
1214            }
1215        );
1216        // The stale write did not roll ownership back.
1217        assert_eq!(
1218            catalog.range(&orders, RangeId::new(1)).unwrap().owner(),
1219            &ident("CN=node-b")
1220        );
1221        assert_eq!(
1222            catalog
1223                .range(&orders, RangeId::new(1))
1224                .unwrap()
1225                .version()
1226                .value(),
1227            2
1228        );
1229    }
1230
1231    #[test]
1232    fn overlapping_range_creation_is_rejected() {
1233        let mut catalog = ShardOwnershipCatalog::new();
1234        let orders = collection("orders");
1235        catalog
1236            .apply_update(hash_range(
1237                &orders,
1238                1,
1239                bounds(&[0x00], &[0x80]),
1240                "CN=node-a",
1241            ))
1242            .unwrap();
1243        // A new range overlapping [0x00, 0x80) is ambiguous for routing.
1244        let err = catalog
1245            .apply_update(hash_range(
1246                &orders,
1247                2,
1248                bounds(&[0x40], &[0xc0]),
1249                "CN=node-b",
1250            ))
1251            .unwrap_err();
1252        assert_eq!(
1253            err,
1254            CatalogError::OverlappingRange {
1255                collection: orders.clone(),
1256                existing: RangeId::new(1),
1257                attempted: RangeId::new(2),
1258            }
1259        );
1260        assert_eq!(catalog.range_count(), 1);
1261    }
1262
1263    #[test]
1264    fn overlapping_range_update_is_rejected() {
1265        let mut catalog = ShardOwnershipCatalog::new();
1266        let orders = collection("orders");
1267        catalog
1268            .apply_update(hash_range(
1269                &orders,
1270                1,
1271                bounds(&[0x00], &[0x80]),
1272                "CN=node-a",
1273            ))
1274            .unwrap();
1275        catalog
1276            .apply_update(hash_range(
1277                &orders,
1278                2,
1279                RangeBounds::new(RangeBound::key([0x80]), RangeBound::Max).unwrap(),
1280                "CN=node-b",
1281            ))
1282            .unwrap();
1283
1284        let widened = catalog
1285            .range(&orders, RangeId::new(1))
1286            .unwrap()
1287            .with_bounds(bounds(&[0x00], &[0xc0]));
1288        let err = catalog.apply_update(widened).unwrap_err();
1289        assert_eq!(
1290            err,
1291            CatalogError::OverlappingRange {
1292                collection: orders.clone(),
1293                existing: RangeId::new(2),
1294                attempted: RangeId::new(1),
1295            }
1296        );
1297        assert_eq!(
1298            catalog.range(&orders, RangeId::new(1)).unwrap().bounds(),
1299            &bounds(&[0x00], &[0x80])
1300        );
1301    }
1302
1303    #[test]
1304    fn catalog_replicates_to_data_members_with_read_visibility() {
1305        // Leader writes the catalog; a data member holds its own replica and
1306        // applies the same versioned updates — no user-data sharding involved.
1307        let orders = collection("orders");
1308        let mut leader = ShardOwnershipCatalog::new();
1309        let mut data_member = ShardOwnershipCatalog::new();
1310
1311        // Leader creates a range; ship the entry to the data member.
1312        let create = hash_range(&orders, 1, RangeBounds::full(), "CN=node-a");
1313        leader.apply_update(create.clone()).unwrap();
1314        assert_eq!(
1315            data_member.apply_update(create).unwrap(),
1316            UpdateOutcome::Created
1317        );
1318
1319        // The data member can route locally to the same owner the leader has.
1320        assert_eq!(
1321            data_member.route(&orders, b"any-key").unwrap().owner(),
1322            &ident("CN=node-a")
1323        );
1324
1325        // Leader transfers ownership; replicate the v2 entry.
1326        let v2 = leader
1327            .range(&orders, RangeId::new(1))
1328            .unwrap()
1329            .transfer_to(ident("CN=node-b"), []);
1330        leader.apply_update(v2.clone()).unwrap();
1331        assert_eq!(
1332            data_member.apply_update(v2.clone()).unwrap(),
1333            UpdateOutcome::Updated
1334        );
1335        assert_eq!(
1336            data_member.route(&orders, b"any-key").unwrap().owner(),
1337            &ident("CN=node-b")
1338        );
1339
1340        // Out-of-order / duplicate replication: re-delivering v2 after it is
1341        // applied is stale on the replica and rejected, so it stays consistent.
1342        let err = data_member.apply_update(v2).unwrap_err();
1343        assert!(matches!(err, CatalogError::StaleVersion { .. }));
1344        assert_eq!(
1345            data_member
1346                .range(&orders, RangeId::new(1))
1347                .unwrap()
1348                .version()
1349                .value(),
1350            2
1351        );
1352    }
1353
1354    #[test]
1355    fn range_bounds_reject_empty_or_inverted() {
1356        assert!(RangeBounds::new(RangeBound::key([0x10]), RangeBound::key([0x10])).is_err());
1357        assert!(RangeBounds::new(RangeBound::key([0x20]), RangeBound::key([0x10])).is_err());
1358        assert!(RangeBounds::new(RangeBound::Max, RangeBound::Min).is_err());
1359        assert!(RangeBounds::full().contains(b"anything"));
1360    }
1361
1362    // ---------------------------------------------------------------
1363    // Issue #990 — per-range role model and ownership-aware write gate.
1364    // ---------------------------------------------------------------
1365
1366    /// A range owned by `owner` with an explicit replica set.
1367    fn range_with(
1368        coll: &CollectionId,
1369        id: u64,
1370        bnds: RangeBounds,
1371        owner: &str,
1372        replicas: &[&str],
1373    ) -> RangeOwnership {
1374        RangeOwnership::establish(
1375            coll.clone(),
1376            RangeId::new(id),
1377            ShardKeyMode::Hash,
1378            bnds,
1379            ident(owner),
1380            replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
1381            PlacementMetadata::with_replication_factor(3),
1382        )
1383    }
1384
1385    #[test]
1386    fn role_of_distinguishes_owner_replica_and_no_copy() {
1387        let orders = collection("orders");
1388        let range = range_with(&orders, 1, RangeBounds::full(), "CN=node-a", &["CN=node-b"]);
1389
1390        assert_eq!(range.role_of(&ident("CN=node-a")), RangeRole::Owner);
1391        assert_eq!(range.role_of(&ident("CN=node-b")), RangeRole::Replica);
1392        assert_eq!(range.role_of(&ident("CN=node-c")), RangeRole::NoCopy);
1393        assert!(RangeRole::Owner.may_write_public());
1394        assert!(!RangeRole::Replica.may_write_public());
1395        assert!(!RangeRole::NoCopy.may_write_public());
1396    }
1397
1398    #[test]
1399    fn role_is_per_range_not_a_global_node_role() {
1400        // node-a owns range 1 and is only a replica of range 2 — the same node
1401        // holds different roles for different ranges of the same collection.
1402        let mut catalog = ShardOwnershipCatalog::new();
1403        let orders = collection("orders");
1404        catalog
1405            .apply_update(range_with(
1406                &orders,
1407                1,
1408                RangeBounds::new(RangeBound::Min, RangeBound::key([0x80])).unwrap(),
1409                "CN=node-a",
1410                &["CN=node-b"],
1411            ))
1412            .unwrap();
1413        catalog
1414            .apply_update(range_with(
1415                &orders,
1416                2,
1417                RangeBounds::new(RangeBound::key([0x80]), RangeBound::Max).unwrap(),
1418                "CN=node-b",
1419                &["CN=node-a"],
1420            ))
1421            .unwrap();
1422
1423        let node_a = ident("CN=node-a");
1424        assert_eq!(
1425            catalog.role_at(&node_a, &orders, RangeId::new(1)),
1426            Some(RangeRole::Owner)
1427        );
1428        assert_eq!(
1429            catalog.role_at(&node_a, &orders, RangeId::new(2)),
1430            Some(RangeRole::Replica)
1431        );
1432        // A range that does not exist is None, not NoCopy.
1433        assert_eq!(catalog.role_at(&node_a, &orders, RangeId::new(99)), None);
1434        // And a collection nobody placed yet routes nowhere.
1435        assert_eq!(
1436            catalog.role_at(&node_a, &collection("ghost"), RangeId::new(1)),
1437            None
1438        );
1439    }
1440
1441    #[test]
1442    fn public_write_admitted_on_owner_at_matching_epoch() {
1443        let mut catalog = ShardOwnershipCatalog::new();
1444        let orders = collection("orders");
1445        catalog
1446            .apply_update(range_with(
1447                &orders,
1448                1,
1449                RangeBounds::full(),
1450                "CN=node-a",
1451                &["CN=node-b"],
1452            ))
1453            .unwrap();
1454
1455        let admitted = catalog
1456            .admit_public_write(
1457                &ident("CN=node-a"),
1458                &orders,
1459                b"k",
1460                OwnershipEpoch::initial(),
1461            )
1462            .expect("owner at current epoch may write");
1463        assert_eq!(admitted.owner(), &ident("CN=node-a"));
1464        assert_eq!(admitted.range_id(), RangeId::new(1));
1465    }
1466
1467    #[test]
1468    fn public_write_uses_hash_slot_routing_for_hash_collections() {
1469        let mut catalog = ShardOwnershipCatalog::new();
1470        let orders = collection("orders");
1471        let key = b"tenant:42";
1472        catalog
1473            .apply_update(hash_range(
1474                &orders,
1475                1,
1476                single_hash_slot_bounds(key),
1477                "CN=node-a",
1478            ))
1479            .unwrap();
1480
1481        let admitted = catalog
1482            .admit_public_write(&ident("CN=node-a"), &orders, key, OwnershipEpoch::initial())
1483            .expect("hash owner admits write routed by shard-key slot");
1484        assert_eq!(admitted.range_id(), RangeId::new(1));
1485    }
1486
1487    #[test]
1488    fn public_write_rejected_on_replica_with_routing_error() {
1489        let mut catalog = ShardOwnershipCatalog::new();
1490        let orders = collection("orders");
1491        catalog
1492            .apply_update(range_with(
1493                &orders,
1494                1,
1495                RangeBounds::full(),
1496                "CN=node-a",
1497                &["CN=node-b"],
1498            ))
1499            .unwrap();
1500
1501        // node-b holds a copy but is a replica — a public write must be routed
1502        // to the owner, not applied locally.
1503        let err = catalog
1504            .admit_public_write(
1505                &ident("CN=node-b"),
1506                &orders,
1507                b"k",
1508                OwnershipEpoch::initial(),
1509            )
1510            .unwrap_err();
1511        match err {
1512            RangeWriteReject::NotOwner {
1513                role, ref owner, ..
1514            } => {
1515                assert_eq!(role, RangeRole::Replica);
1516                assert_eq!(owner, &ident("CN=node-a"));
1517            }
1518            other => panic!("expected NotOwner(Replica), got {other:?}"),
1519        }
1520        // The rejection names the owner so the caller can re-route.
1521        assert!(err.to_string().contains("route the write to"));
1522    }
1523
1524    #[test]
1525    fn public_write_rejected_on_no_copy_holder() {
1526        let mut catalog = ShardOwnershipCatalog::new();
1527        let orders = collection("orders");
1528        catalog
1529            .apply_update(range_with(
1530                &orders,
1531                1,
1532                RangeBounds::full(),
1533                "CN=node-a",
1534                &["CN=node-b"],
1535            ))
1536            .unwrap();
1537
1538        // node-c holds no copy of the range at all.
1539        let err = catalog
1540            .admit_public_write(
1541                &ident("CN=node-c"),
1542                &orders,
1543                b"k",
1544                OwnershipEpoch::initial(),
1545            )
1546            .unwrap_err();
1547        match err {
1548            RangeWriteReject::NotOwner { role, .. } => assert_eq!(role, RangeRole::NoCopy),
1549            other => panic!("expected NotOwner(NoCopy), got {other:?}"),
1550        }
1551    }
1552
1553    #[test]
1554    fn public_write_rejected_on_stale_ownership_epoch() {
1555        // Ownership moved a→b and back to a, advancing the epoch twice. A write
1556        // the routing layer authorised under the original epoch must be fenced
1557        // even though node-a is, once again, the current owner.
1558        let mut catalog = ShardOwnershipCatalog::new();
1559        let orders = collection("orders");
1560        let v1 = range_with(&orders, 1, RangeBounds::full(), "CN=node-a", &["CN=node-b"]);
1561        let original_epoch = v1.epoch();
1562        catalog.apply_update(v1.clone()).unwrap();
1563
1564        let v2 = v1.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]);
1565        catalog.apply_update(v2.clone()).unwrap();
1566        let v3 = v2.transfer_to(ident("CN=node-a"), [ident("CN=node-b")]);
1567        catalog.apply_update(v3.clone()).unwrap();
1568
1569        // node-a is the current owner again, but at a newer epoch.
1570        assert_ne!(original_epoch, v3.epoch());
1571        let err = catalog
1572            .admit_public_write(&ident("CN=node-a"), &orders, b"k", original_epoch)
1573            .unwrap_err();
1574        match err {
1575            RangeWriteReject::StaleEpoch {
1576                expected, current, ..
1577            } => {
1578                assert_eq!(expected, original_epoch);
1579                assert_eq!(current, v3.epoch());
1580            }
1581            other => panic!("expected StaleEpoch, got {other:?}"),
1582        }
1583        // The same owner at the *current* epoch is admitted.
1584        assert!(catalog
1585            .admit_public_write(&ident("CN=node-a"), &orders, b"k", v3.epoch())
1586            .is_ok());
1587    }
1588
1589    #[test]
1590    fn public_write_rejected_when_no_range_covers_the_key() {
1591        let catalog = ShardOwnershipCatalog::new();
1592        let orders = collection("orders");
1593        let err = catalog
1594            .admit_public_write(
1595                &ident("CN=node-a"),
1596                &orders,
1597                b"k",
1598                OwnershipEpoch::initial(),
1599            )
1600            .unwrap_err();
1601        assert!(matches!(err, RangeWriteReject::NoRange { .. }));
1602    }
1603
1604    #[test]
1605    fn internal_apply_path_stays_privileged_for_a_public_write_replica() {
1606        // A node that rejects a *public* write because it is only a replica must
1607        // still admit the owner's replicated changes through the privileged
1608        // internal apply path — that path is gated by issue #991's range
1609        // authority fence, not by this public ownership gate.
1610        use crate::replication::cdc::{ChangeOperation, ChangeRecord, RangeAuthority};
1611
1612        let mut catalog = ShardOwnershipCatalog::new();
1613        let orders = collection("orders");
1614        catalog
1615            .apply_update(range_with(
1616                &orders,
1617                7,
1618                RangeBounds::full(),
1619                "CN=node-a",
1620                &["CN=node-b"],
1621            ))
1622            .unwrap();
1623
1624        // Public gate: node-b is a replica → rejected, never reaches storage.
1625        assert!(matches!(
1626            catalog
1627                .admit_public_write(
1628                    &ident("CN=node-b"),
1629                    &orders,
1630                    b"k",
1631                    OwnershipEpoch::initial()
1632                )
1633                .unwrap_err(),
1634            RangeWriteReject::NotOwner {
1635                role: RangeRole::Replica,
1636                ..
1637            }
1638        ));
1639
1640        // Internal apply: the owner's replicated change for the same range is
1641        // admitted by the range-authority fence on the replica.
1642        let record = ChangeRecord {
1643            term: 1,
1644            lsn: 1,
1645            timestamp: 0,
1646            operation: ChangeOperation::Insert,
1647            collection: orders.as_str().to_string(),
1648            entity_id: 1,
1649            entity_kind: "row".to_string(),
1650            entity_bytes: Some(vec![1]),
1651            metadata: None,
1652            refresh_records: None,
1653            range_id: None,
1654            ownership_epoch: None,
1655        }
1656        .with_range_authority(7, OwnershipEpoch::initial().value());
1657        let fence = RangeAuthority {
1658            range_id: 7,
1659            min_term: 1,
1660            min_ownership_epoch: OwnershipEpoch::initial().value(),
1661        };
1662        assert!(
1663            fence.admit(&record).is_ok(),
1664            "replica internal apply must remain privileged for the owner's changes"
1665        );
1666    }
1667}