Skip to main content

reddb_server/cluster/
ownership.rs

1//! The global shard ownership catalog (issue #989, PRD #987, ADR 0037).
2//!
3//! The shard ownership catalog is the **source of truth for range routing and
4//! failover** in a multi-writer cluster. Per the glossary it is *"explicit,
5//! versioned RedDB catalog state that records shard/range bounds, current writer
6//! owner, replicas, and ownership epoch/version"* and — crucially — it is
7//! *"special global control-plane state replicated to all data members rather
8//! than sharded like ordinary user collections"*.
9//!
10//! That last point shapes the whole module. The catalog is the thing that tells
11//! you *where* a collection's data lives, so it cannot itself be located by the
12//! same user-data sharding it describes — that would be circular. Instead every
13//! data member holds a full [`ShardOwnershipCatalog`] replica and routes against
14//! it locally ([`ShardOwnershipCatalog::route`]). Replication is modelled as
15//! shipping versioned [`RangeOwnership`] entries that each member applies through
16//! the same [`apply_update`](ShardOwnershipCatalog::apply_update) path that the
17//! Supervisor leader writes through — and that path rejects stale versions, so a
18//! late or out-of-order replica update can never overwrite newer ownership.
19//!
20//! ## What the catalog records
21//!
22//! One [`RangeOwnership`] entry per owned shard/range carries everything routing
23//! and fencing need:
24//!
25//! * [`CollectionId`] + [`RangeId`] — *which* range of *which* collection.
26//! * [`ShardKeyMode`] — [`Hash`](ShardKeyMode::Hash) (the default, for uniform
27//!   distribution) or [`Ordered`](ShardKeyMode::Ordered) (declared when range
28//!   locality and ordered scans matter more than hotspot resistance).
29//! * [`RangeBounds`] — the half-open `[lower, upper)` partition this entry owns.
30//! * `owner` ([`NodeIdentity`]) + `replicas` — the current single writer for the
31//!   range and its read/catch-up copies.
32//! * [`OwnershipEpoch`] + [`CatalogVersion`] — the fencing epoch (bumped on owner
33//!   change so a stale old owner is fenced) and the monotonic write version
34//!   (bumped on *every* accepted update so stale writes are rejected).
35//! * [`PlacementMetadata`] — replication factor and free-form placement
36//!   attributes (region/zone/weight) the rebalancer reads.
37//!
38//! Ownership changes are produced as *transitions* — new entries built with
39//! [`RangeOwnership::transfer_to`] / [`update_replicas`](RangeOwnership::update_replicas)
40//! / [`update_placement`](RangeOwnership::update_placement) — never arbitrary row
41//! edits, matching ADR 0037's "transitions, not arbitrary row edits".
42//!
43//! Everything here is a pure data model with no I/O, so the routing, versioning,
44//! and replication story is exercised deterministically.
45
46use std::collections::BTreeMap;
47
48use super::identity::NodeIdentity;
49
50/// A user collection's stable identity, as recorded in the catalog.
51///
52/// The catalog is keyed by collection (and range within it); this is the
53/// collection's own name, not a shard-routed handle. Resolving a
54/// [`CollectionId`] to its ranges needs only the catalog itself — no user-data
55/// sharding — which is what lets the catalog be the thing that *bootstraps*
56/// routing.
57#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
58pub struct CollectionId(String);
59
60#[derive(Debug, Clone, PartialEq, Eq)]
61pub struct CollectionIdError;
62
63impl std::fmt::Display for CollectionIdError {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        write!(f, "collection id is empty")
66    }
67}
68
69impl std::error::Error for CollectionIdError {}
70
71impl CollectionId {
72    /// Build a collection id from a non-empty name.
73    pub fn new(value: impl AsRef<str>) -> Result<Self, CollectionIdError> {
74        let value = value.as_ref().trim();
75        if value.is_empty() {
76            return Err(CollectionIdError);
77        }
78        Ok(Self(value.to_string()))
79    }
80
81    pub fn as_str(&self) -> &str {
82        &self.0
83    }
84}
85
86impl std::fmt::Display for CollectionId {
87    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        f.write_str(&self.0)
89    }
90}
91
92/// Stable identifier for one shard/range within a collection.
93///
94/// Ranges are owned at sub-collection granularity (ADR 0037), so a collection
95/// can have many of these — each is one independently-owned, independently-routed
96/// partition. The id is stable across ownership transitions: moving a range to a
97/// new owner keeps its [`RangeId`] and bumps its epoch/version, it does not mint
98/// a new range.
99#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
100pub struct RangeId(u64);
101
102impl RangeId {
103    pub fn new(value: u64) -> Self {
104        Self(value)
105    }
106
107    pub fn value(self) -> u64 {
108        self.0
109    }
110}
111
112impl std::fmt::Display for RangeId {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        write!(f, "{}", self.0)
115    }
116}
117
118/// Collection-level partitioning mode for shard/range ownership.
119///
120/// Per the glossary, *"Hash mode is the default for uniform distribution;
121/// ordered mode is declared when range locality and ordered scans matter more
122/// than automatic hotspot resistance."* The mode is fixed per collection: every
123/// range of a collection shares its mode, and an entry whose mode disagrees with
124/// its collection's declared mode is rejected
125/// ([`CatalogError::ShardKeyModeMismatch`]).
126#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
127pub enum ShardKeyMode {
128    /// Uniform hash distribution — the default. Bounds are over hash tokens.
129    #[default]
130    Hash,
131    /// Ordered key ranges, declared for range locality / ordered scans. Bounds
132    /// are over the ordered shard key itself.
133    Ordered,
134}
135
136/// One edge of a [`RangeBounds`].
137///
138/// Bounds are byte strings so the same type serves both shard key modes: a
139/// [`Hash`](ShardKeyMode::Hash) range bounds hash-token bytes, an
140/// [`Ordered`](ShardKeyMode::Ordered) range bounds the ordered key bytes
141/// directly. [`Min`](RangeBound::Min)/[`Max`](RangeBound::Max) are the open ends
142/// of the keyspace.
143#[derive(Debug, Clone, PartialEq, Eq)]
144pub enum RangeBound {
145    /// The open low end of the keyspace (everything is `>= Min`).
146    Min,
147    /// A concrete boundary key.
148    Key(Vec<u8>),
149    /// The open high end of the keyspace (everything is `< Max`).
150    Max,
151}
152
153impl RangeBound {
154    /// A boundary at the given key bytes.
155    pub fn key(bytes: impl Into<Vec<u8>>) -> Self {
156        RangeBound::Key(bytes.into())
157    }
158
159    /// Total order over keyspace positions: `Min < every Key < Max`, with keys
160    /// compared lexicographically. This is what makes both `contains` and
161    /// `overlaps` plain comparisons.
162    fn position(&self) -> Position<'_> {
163        match self {
164            RangeBound::Min => Position::Min,
165            RangeBound::Key(k) => Position::Key(k),
166            RangeBound::Max => Position::Max,
167        }
168    }
169}
170
171#[derive(PartialEq, Eq, PartialOrd, Ord)]
172enum Position<'a> {
173    Min,
174    Key(&'a [u8]),
175    Max,
176}
177
178/// The half-open `[lower, upper)` partition a range owns.
179///
180/// Half-open bounds tile the keyspace without gaps or double-cover: adjacent
181/// ranges share a boundary key that belongs to exactly one of them. `lower` must
182/// be strictly below `upper`, so an empty or inverted range cannot be
183/// constructed.
184#[derive(Debug, Clone, PartialEq, Eq)]
185pub struct RangeBounds {
186    lower: RangeBound,
187    upper: RangeBound,
188}
189
190#[derive(Debug, Clone, PartialEq, Eq)]
191pub struct RangeBoundsError;
192
193impl std::fmt::Display for RangeBoundsError {
194    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195        write!(
196            f,
197            "range lower bound must be strictly below the upper bound"
198        )
199    }
200}
201
202impl std::error::Error for RangeBoundsError {}
203
204impl RangeBounds {
205    /// Bounds from an explicit `[lower, upper)` pair. Errors if the range would
206    /// be empty or inverted (`lower >= upper`).
207    pub fn new(lower: RangeBound, upper: RangeBound) -> Result<Self, RangeBoundsError> {
208        if lower.position() >= upper.position() {
209            return Err(RangeBoundsError);
210        }
211        Ok(Self { lower, upper })
212    }
213
214    /// The whole keyspace, `[Min, Max)` — a single range covering a collection.
215    pub fn full() -> Self {
216        Self {
217            lower: RangeBound::Min,
218            upper: RangeBound::Max,
219        }
220    }
221
222    pub fn lower(&self) -> &RangeBound {
223        &self.lower
224    }
225
226    pub fn upper(&self) -> &RangeBound {
227        &self.upper
228    }
229
230    /// Does `key` fall inside this half-open range? Lower bound inclusive, upper
231    /// bound exclusive — the routing predicate.
232    pub fn contains(&self, key: &[u8]) -> bool {
233        let key = Position::Key(key);
234        self.lower.position() <= key && key < self.upper.position()
235    }
236
237    /// Do these two ranges share any key? Used to keep a collection's ranges
238    /// non-overlapping so routing resolves to exactly one owner.
239    pub fn overlaps(&self, other: &RangeBounds) -> bool {
240        self.lower.position() < other.upper.position()
241            && other.lower.position() < self.upper.position()
242    }
243}
244
245/// Monotonic write version of a single catalog entry.
246///
247/// Every accepted update to a range bumps its version. An update that does not
248/// strictly advance the version is stale and is rejected — this is the
249/// compare-and-advance rule that makes catalog replication safe regardless of
250/// delivery order.
251#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
252pub struct CatalogVersion(u64);
253
254impl CatalogVersion {
255    /// The version a range is created at.
256    pub fn initial() -> Self {
257        Self(1)
258    }
259
260    pub fn value(self) -> u64 {
261        self.0
262    }
263
264    fn next(self) -> Self {
265        Self(self.0 + 1)
266    }
267}
268
269impl std::fmt::Display for CatalogVersion {
270    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
271        write!(f, "{}", self.0)
272    }
273}
274
275/// Fencing epoch for a range's write authority.
276///
277/// Distinct from [`CatalogVersion`]: the version advances on *any* catalog edit,
278/// but the epoch advances only when **write authority moves** (a new owner). A
279/// WAL/logical record stamped with an epoch older than the catalog's current
280/// epoch is from a fenced old owner and must be rejected (ADR 0037, "fencing is
281/// enforced below routing").
282#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
283pub struct OwnershipEpoch(u64);
284
285impl OwnershipEpoch {
286    /// The epoch a range is created at.
287    pub fn initial() -> Self {
288        Self(1)
289    }
290
291    pub fn value(self) -> u64 {
292        self.0
293    }
294
295    fn next(self) -> Self {
296        Self(self.0 + 1)
297    }
298}
299
300impl std::fmt::Display for OwnershipEpoch {
301    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
302        write!(f, "{}", self.0)
303    }
304}
305
306/// Placement metadata the rebalancer reads when planning transitions.
307///
308/// The MVP carries the range's replication factor plus a free-form attribute map
309/// for region/zone/operator-weight hints. It is descriptive control-plane data,
310/// not an authorization source.
311#[derive(Debug, Clone, Default, PartialEq, Eq)]
312pub struct PlacementMetadata {
313    replication_factor: usize,
314    attributes: BTreeMap<String, String>,
315}
316
317impl PlacementMetadata {
318    /// Placement with a target replication factor and no attributes.
319    pub fn with_replication_factor(replication_factor: usize) -> Self {
320        Self {
321            replication_factor,
322            attributes: BTreeMap::new(),
323        }
324    }
325
326    /// Attach a placement attribute (e.g. `region` → `us-east-1`).
327    pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
328        self.attributes.insert(key.into(), value.into());
329        self
330    }
331
332    pub fn replication_factor(&self) -> usize {
333        self.replication_factor
334    }
335
336    pub fn attribute(&self, key: &str) -> Option<&str> {
337        self.attributes.get(key).map(String::as_str)
338    }
339}
340
341/// One owned shard/range: the catalog's unit of routing and fencing.
342///
343/// An entry is self-describing — it carries its collection, range id, mode,
344/// bounds, owner, replicas, epoch, version, and placement — so a data member
345/// that receives it by replication can route and fence without consulting any
346/// other state. New ownership states are produced as *transitions* off an
347/// existing entry ([`transfer_to`](Self::transfer_to),
348/// [`update_replicas`](Self::update_replicas),
349/// [`update_placement`](Self::update_placement)), each of which advances the
350/// version — and, for an owner change, the fencing epoch.
351#[derive(Debug, Clone, PartialEq, Eq)]
352pub struct RangeOwnership {
353    collection: CollectionId,
354    range_id: RangeId,
355    shard_key_mode: ShardKeyMode,
356    bounds: RangeBounds,
357    owner: NodeIdentity,
358    replicas: Vec<NodeIdentity>,
359    epoch: OwnershipEpoch,
360    version: CatalogVersion,
361    placement: PlacementMetadata,
362}
363
364impl RangeOwnership {
365    /// The initial ownership state for a freshly created range: version and
366    /// epoch both at their [`initial`](CatalogVersion::initial) values.
367    #[allow(clippy::too_many_arguments)]
368    pub fn establish(
369        collection: CollectionId,
370        range_id: RangeId,
371        shard_key_mode: ShardKeyMode,
372        bounds: RangeBounds,
373        owner: NodeIdentity,
374        replicas: impl IntoIterator<Item = NodeIdentity>,
375        placement: PlacementMetadata,
376    ) -> Self {
377        Self {
378            collection,
379            range_id,
380            shard_key_mode,
381            bounds,
382            owner,
383            replicas: replicas.into_iter().collect(),
384            epoch: OwnershipEpoch::initial(),
385            version: CatalogVersion::initial(),
386            placement,
387        }
388    }
389
390    pub fn collection(&self) -> &CollectionId {
391        &self.collection
392    }
393
394    pub fn range_id(&self) -> RangeId {
395        self.range_id
396    }
397
398    pub fn shard_key_mode(&self) -> ShardKeyMode {
399        self.shard_key_mode
400    }
401
402    pub fn bounds(&self) -> &RangeBounds {
403        &self.bounds
404    }
405
406    pub fn owner(&self) -> &NodeIdentity {
407        &self.owner
408    }
409
410    pub fn replicas(&self) -> &[NodeIdentity] {
411        &self.replicas
412    }
413
414    pub fn epoch(&self) -> OwnershipEpoch {
415        self.epoch
416    }
417
418    pub fn version(&self) -> CatalogVersion {
419        self.version
420    }
421
422    pub fn placement(&self) -> &PlacementMetadata {
423        &self.placement
424    }
425
426    /// The catalog key for this entry: `(collection, range_id)`.
427    fn key(&self) -> (CollectionId, RangeId) {
428        (self.collection.clone(), self.range_id)
429    }
430
431    /// A transition that moves write authority to `new_owner` with `new_replicas`.
432    /// Advances **both** the version (it is a catalog write) and the ownership
433    /// epoch (write authority moved, so any old owner is fenced).
434    pub fn transfer_to(
435        &self,
436        new_owner: NodeIdentity,
437        new_replicas: impl IntoIterator<Item = NodeIdentity>,
438    ) -> Self {
439        Self {
440            owner: new_owner,
441            replicas: new_replicas.into_iter().collect(),
442            epoch: self.epoch.next(),
443            version: self.version.next(),
444            ..self.clone()
445        }
446    }
447
448    /// A transition that changes only the replica set. Advances the version but
449    /// **not** the epoch: write authority did not move, so no owner is fenced.
450    pub fn update_replicas(&self, new_replicas: impl IntoIterator<Item = NodeIdentity>) -> Self {
451        Self {
452            replicas: new_replicas.into_iter().collect(),
453            version: self.version.next(),
454            ..self.clone()
455        }
456    }
457
458    /// A transition that changes only placement metadata. Advances the version
459    /// but not the epoch.
460    pub fn update_placement(&self, placement: PlacementMetadata) -> Self {
461        Self {
462            placement,
463            version: self.version.next(),
464            ..self.clone()
465        }
466    }
467}
468
469/// Whether an accepted update created a new range or advanced an existing one.
470#[derive(Debug, Clone, Copy, PartialEq, Eq)]
471pub enum UpdateOutcome {
472    /// The range did not exist and was created at this version.
473    Created,
474    /// An existing range advanced to a newer version.
475    Updated,
476}
477
478/// Why a catalog update was rejected.
479#[derive(Debug, Clone, PartialEq, Eq)]
480pub enum CatalogError {
481    /// The update's version did not strictly advance the range's current
482    /// version — a stale or out-of-order write. Carries both versions so the
483    /// caller (or a replica) can see how far behind it was.
484    StaleVersion {
485        collection: CollectionId,
486        range_id: RangeId,
487        current: CatalogVersion,
488        attempted: CatalogVersion,
489    },
490    /// The entry's shard key mode disagrees with the collection's declared mode.
491    /// A collection is hash- *or* ordered-partitioned, never both.
492    ShardKeyModeMismatch {
493        collection: CollectionId,
494        declared: ShardKeyMode,
495        attempted: ShardKeyMode,
496    },
497    /// Creating this range would overlap an existing range of the same
498    /// collection, which would make routing ambiguous.
499    OverlappingRange {
500        collection: CollectionId,
501        existing: RangeId,
502        attempted: RangeId,
503    },
504}
505
506impl std::fmt::Display for CatalogError {
507    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
508        match self {
509            Self::StaleVersion {
510                collection,
511                range_id,
512                current,
513                attempted,
514            } => write!(
515                f,
516                "stale catalog update for {collection}/{range_id}: current version {current}, attempted {attempted}"
517            ),
518            Self::ShardKeyModeMismatch {
519                collection,
520                declared,
521                attempted,
522            } => write!(
523                f,
524                "collection {collection} is declared {declared:?} but range uses {attempted:?}"
525            ),
526            Self::OverlappingRange {
527                collection,
528                existing,
529                attempted,
530            } => write!(
531                f,
532                "range {attempted} overlaps existing range {existing} of collection {collection}"
533            ),
534        }
535    }
536}
537
538impl std::error::Error for CatalogError {}
539
540/// The global shard ownership catalog held by every data member.
541///
542/// This single type plays both roles in ADR 0037's model: it is the authoritative
543/// state the Cluster Supervisor leader writes through, and it is the replica each
544/// data member holds and routes against. Both write through
545/// [`apply_update`](Self::apply_update), so the stale-version rejection that
546/// makes leader writes versioned is the *same* rule that makes replica
547/// application order-independent. Nothing here needs user-data sharding to find
548/// an entry: ranges are addressed directly by `(collection, range_id)`, and
549/// routing ([`route`](Self::route)) is a local scan of the replica.
550#[derive(Debug, Clone, Default)]
551pub struct ShardOwnershipCatalog {
552    /// Declared shard key mode per collection. A collection is recorded here the
553    /// moment its first range is created (or via [`declare_collection`]).
554    ///
555    /// [`declare_collection`]: Self::declare_collection
556    collections: BTreeMap<CollectionId, ShardKeyMode>,
557    ranges: BTreeMap<(CollectionId, RangeId), RangeOwnership>,
558}
559
560impl ShardOwnershipCatalog {
561    /// An empty catalog — a cluster with no collections placed yet.
562    pub fn new() -> Self {
563        Self::default()
564    }
565
566    /// Declare a collection's shard key mode up front. Hash is the default, so
567    /// this is mainly how an operator opts a collection into
568    /// [`Ordered`](ShardKeyMode::Ordered) mode before any range exists. Declaring
569    /// the same mode twice is idempotent; redeclaring a different mode for a
570    /// collection that already has a mode is a [`ShardKeyModeMismatch`].
571    ///
572    /// [`ShardKeyModeMismatch`]: CatalogError::ShardKeyModeMismatch
573    pub fn declare_collection(
574        &mut self,
575        collection: CollectionId,
576        mode: ShardKeyMode,
577    ) -> Result<(), CatalogError> {
578        match self.collections.get(&collection) {
579            Some(&declared) if declared != mode => Err(CatalogError::ShardKeyModeMismatch {
580                collection,
581                declared,
582                attempted: mode,
583            }),
584            _ => {
585                self.collections.insert(collection, mode);
586                Ok(())
587            }
588        }
589    }
590
591    /// The declared shard key mode of `collection`, if it has any ranges or was
592    /// explicitly declared.
593    pub fn shard_key_mode(&self, collection: &CollectionId) -> Option<ShardKeyMode> {
594        self.collections.get(collection).copied()
595    }
596
597    /// Apply a versioned ownership update — the single write path for both leader
598    /// writes and replica application.
599    ///
600    /// Creation (the range does not yet exist) auto-declares the collection's
601    /// mode from the entry and checks the new range does not overlap a sibling.
602    /// Updating an existing range requires the entry's version to **strictly
603    /// advance** the current version; anything else is a
604    /// [`StaleVersion`](CatalogError::StaleVersion) rejection that leaves the
605    /// catalog untouched. Either way the entry's mode must match the collection's
606    /// declared mode.
607    pub fn apply_update(&mut self, entry: RangeOwnership) -> Result<UpdateOutcome, CatalogError> {
608        // Mode must agree with the collection (auto-declared on first range).
609        match self.collections.get(entry.collection()) {
610            Some(&declared) if declared != entry.shard_key_mode() => {
611                return Err(CatalogError::ShardKeyModeMismatch {
612                    collection: entry.collection().clone(),
613                    declared,
614                    attempted: entry.shard_key_mode(),
615                });
616            }
617            _ => {}
618        }
619
620        let key = entry.key();
621        match self.ranges.get(&key) {
622            Some(current) => {
623                if entry.version() <= current.version() {
624                    return Err(CatalogError::StaleVersion {
625                        collection: entry.collection().clone(),
626                        range_id: entry.range_id(),
627                        current: current.version(),
628                        attempted: entry.version(),
629                    });
630                }
631                self.collections
632                    .insert(entry.collection().clone(), entry.shard_key_mode());
633                self.ranges.insert(key, entry);
634                Ok(UpdateOutcome::Updated)
635            }
636            None => {
637                // Creating a range: it must not overlap any sibling range of the
638                // same collection, or routing would be ambiguous.
639                if let Some(existing) = self
640                    .ranges_for(entry.collection())
641                    .find(|r| r.bounds().overlaps(entry.bounds()))
642                {
643                    return Err(CatalogError::OverlappingRange {
644                        collection: entry.collection().clone(),
645                        existing: existing.range_id(),
646                        attempted: entry.range_id(),
647                    });
648                }
649                self.collections
650                    .insert(entry.collection().clone(), entry.shard_key_mode());
651                self.ranges.insert(key, entry);
652                Ok(UpdateOutcome::Created)
653            }
654        }
655    }
656
657    /// The current ownership of one range, addressed directly by identity — no
658    /// routing required, because the catalog is what routing is built on.
659    pub fn range(&self, collection: &CollectionId, range_id: RangeId) -> Option<&RangeOwnership> {
660        self.ranges.get(&(collection.clone(), range_id))
661    }
662
663    /// Every range of `collection`, in range-id order.
664    pub fn ranges_for<'a>(
665        &'a self,
666        collection: &CollectionId,
667    ) -> impl Iterator<Item = &'a RangeOwnership> {
668        let collection = collection.clone();
669        self.ranges
670            .iter()
671            .filter(move |((c, _), _)| *c == collection)
672            .map(|(_, r)| r)
673    }
674
675    /// Route a key to the range that owns it — the catalog read every routing
676    /// decision makes. Returns the owning [`RangeOwnership`] (whose `owner`,
677    /// `epoch`, and `replicas` the caller uses to send and fence the write), or
678    /// `None` if no range covers the key yet.
679    pub fn route(&self, collection: &CollectionId, key: &[u8]) -> Option<&RangeOwnership> {
680        self.ranges_for(collection)
681            .find(|r| r.bounds().contains(key))
682    }
683
684    /// Total number of owned ranges across all collections.
685    pub fn range_count(&self) -> usize {
686        self.ranges.len()
687    }
688
689    /// All ranges, in `(collection, range_id)` order — the full catalog content
690    /// a joining member adopts as its starting replica
691    /// (see [`ControlPlaneSnapshot`](super::join::ControlPlaneSnapshot)).
692    pub fn entries(&self) -> impl Iterator<Item = &RangeOwnership> {
693        self.ranges.values()
694    }
695}
696
697#[cfg(test)]
698mod tests {
699    use super::*;
700
701    fn collection(name: &str) -> CollectionId {
702        CollectionId::new(name).unwrap()
703    }
704
705    fn ident(cn: &str) -> NodeIdentity {
706        NodeIdentity::from_certificate_subject(cn).unwrap()
707    }
708
709    fn bounds(lower: &[u8], upper: &[u8]) -> RangeBounds {
710        RangeBounds::new(RangeBound::key(lower), RangeBound::key(upper)).unwrap()
711    }
712
713    /// A hash range over `[lower, Max)` owned by `owner`.
714    fn hash_range(coll: &CollectionId, id: u64, bnds: RangeBounds, owner: &str) -> RangeOwnership {
715        RangeOwnership::establish(
716            coll.clone(),
717            RangeId::new(id),
718            ShardKeyMode::Hash,
719            bnds,
720            ident(owner),
721            [ident("CN=replica-1")],
722            PlacementMetadata::with_replication_factor(3),
723        )
724    }
725
726    #[test]
727    fn empty_catalog_creation() {
728        let catalog = ShardOwnershipCatalog::new();
729        assert_eq!(catalog.range_count(), 0);
730        assert!(catalog.shard_key_mode(&collection("orders")).is_none());
731    }
732
733    #[test]
734    fn hash_is_the_default_shard_key_mode() {
735        // The first range of a collection auto-declares its mode; a range built
736        // with the default mode lands the collection in Hash mode.
737        assert_eq!(ShardKeyMode::default(), ShardKeyMode::Hash);
738
739        let mut catalog = ShardOwnershipCatalog::new();
740        let orders = collection("orders");
741        catalog
742            .apply_update(hash_range(&orders, 1, RangeBounds::full(), "CN=node-a"))
743            .unwrap();
744        assert_eq!(catalog.shard_key_mode(&orders), Some(ShardKeyMode::Hash));
745    }
746
747    #[test]
748    fn hash_range_entry_routes_to_owner() {
749        let mut catalog = ShardOwnershipCatalog::new();
750        let orders = collection("orders");
751
752        // Two hash token ranges split at 0x80.
753        catalog
754            .apply_update(hash_range(
755                &orders,
756                1,
757                RangeBounds::new(RangeBound::Min, RangeBound::key([0x80])).unwrap(),
758                "CN=node-a",
759            ))
760            .unwrap();
761        catalog
762            .apply_update(hash_range(
763                &orders,
764                2,
765                RangeBounds::new(RangeBound::key([0x80]), RangeBound::Max).unwrap(),
766                "CN=node-b",
767            ))
768            .unwrap();
769
770        // Routing reads expose the owner for a key without any user-data sharding.
771        assert_eq!(
772            catalog.route(&orders, &[0x10]).unwrap().owner(),
773            &ident("CN=node-a")
774        );
775        assert_eq!(
776            catalog.route(&orders, &[0x80]).unwrap().owner(),
777            &ident("CN=node-b")
778        );
779        assert_eq!(
780            catalog.route(&orders, &[0xff]).unwrap().owner(),
781            &ident("CN=node-b")
782        );
783        // The routing read also exposes replicas and fencing epoch.
784        let r = catalog.route(&orders, &[0x10]).unwrap();
785        assert_eq!(r.replicas(), &[ident("CN=replica-1")]);
786        assert_eq!(r.epoch(), OwnershipEpoch::initial());
787    }
788
789    #[test]
790    fn ordered_mode_can_be_declared_and_routed() {
791        let mut catalog = ShardOwnershipCatalog::new();
792        let events = collection("events");
793        catalog
794            .declare_collection(events.clone(), ShardKeyMode::Ordered)
795            .unwrap();
796        assert_eq!(catalog.shard_key_mode(&events), Some(ShardKeyMode::Ordered));
797
798        // Ordered ranges bound the ordered key itself: [a, m) and [m, z).
799        catalog
800            .apply_update(RangeOwnership::establish(
801                events.clone(),
802                RangeId::new(1),
803                ShardKeyMode::Ordered,
804                bounds(b"a", b"m"),
805                ident("CN=node-a"),
806                [],
807                PlacementMetadata::with_replication_factor(3),
808            ))
809            .unwrap();
810        catalog
811            .apply_update(RangeOwnership::establish(
812                events.clone(),
813                RangeId::new(2),
814                ShardKeyMode::Ordered,
815                bounds(b"m", b"z"),
816                ident("CN=node-b"),
817                [],
818                PlacementMetadata::with_replication_factor(3),
819            ))
820            .unwrap();
821
822        assert_eq!(
823            catalog.route(&events, b"alpha").unwrap().owner(),
824            &ident("CN=node-a")
825        );
826        assert_eq!(
827            catalog.route(&events, b"mike").unwrap().owner(),
828            &ident("CN=node-b")
829        );
830        // A key outside every declared range routes nowhere.
831        assert!(catalog.route(&events, b"zzz").is_none());
832    }
833
834    #[test]
835    fn declaring_a_conflicting_mode_is_rejected() {
836        let mut catalog = ShardOwnershipCatalog::new();
837        let events = collection("events");
838        catalog
839            .declare_collection(events.clone(), ShardKeyMode::Ordered)
840            .unwrap();
841        // Redeclaring the same mode is fine.
842        catalog
843            .declare_collection(events.clone(), ShardKeyMode::Ordered)
844            .unwrap();
845        // A different mode is a mismatch.
846        let err = catalog
847            .declare_collection(events.clone(), ShardKeyMode::Hash)
848            .unwrap_err();
849        assert_eq!(
850            err,
851            CatalogError::ShardKeyModeMismatch {
852                collection: events.clone(),
853                declared: ShardKeyMode::Ordered,
854                attempted: ShardKeyMode::Hash,
855            }
856        );
857        // And a range whose mode disagrees with the declared collection is rejected.
858        let err = catalog
859            .apply_update(hash_range(&events, 1, RangeBounds::full(), "CN=node-a"))
860            .unwrap_err();
861        assert!(matches!(err, CatalogError::ShardKeyModeMismatch { .. }));
862    }
863
864    #[test]
865    fn version_bumps_on_owner_transfer_and_epoch_fences() {
866        let mut catalog = ShardOwnershipCatalog::new();
867        let orders = collection("orders");
868        catalog
869            .apply_update(hash_range(&orders, 1, RangeBounds::full(), "CN=node-a"))
870            .unwrap();
871
872        let current = catalog.range(&orders, RangeId::new(1)).unwrap();
873        assert_eq!(current.version(), CatalogVersion::initial());
874        assert_eq!(current.epoch(), OwnershipEpoch::initial());
875
876        // Owner transfer advances both version and fencing epoch.
877        let moved = current.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]);
878        let outcome = catalog.apply_update(moved).unwrap();
879        assert_eq!(outcome, UpdateOutcome::Updated);
880
881        let after = catalog.range(&orders, RangeId::new(1)).unwrap();
882        assert_eq!(after.owner(), &ident("CN=node-b"));
883        assert_eq!(after.version().value(), 2);
884        assert_eq!(after.epoch().value(), 2); // old owner is now fenced
885
886        // A replica-set change advances the version but NOT the epoch.
887        let replicas_changed = after.update_replicas([ident("CN=node-c")]);
888        catalog.apply_update(replicas_changed).unwrap();
889        let after2 = catalog.range(&orders, RangeId::new(1)).unwrap();
890        assert_eq!(after2.version().value(), 3);
891        assert_eq!(after2.epoch().value(), 2); // write authority did not move
892        assert_eq!(after2.replicas(), &[ident("CN=node-c")]);
893    }
894
895    #[test]
896    fn stale_update_is_rejected_and_leaves_catalog_unchanged() {
897        let mut catalog = ShardOwnershipCatalog::new();
898        let orders = collection("orders");
899        catalog
900            .apply_update(hash_range(&orders, 1, RangeBounds::full(), "CN=node-a"))
901            .unwrap();
902
903        let v1 = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
904        // Advance to v2.
905        catalog
906            .apply_update(v1.transfer_to(ident("CN=node-b"), []))
907            .unwrap();
908        assert_eq!(
909            catalog.range(&orders, RangeId::new(1)).unwrap().owner(),
910            &ident("CN=node-b")
911        );
912
913        // Re-applying the original v1 entry (and even a fresh v1-versioned write)
914        // is stale: version 1 does not advance past the current version 2.
915        let err = catalog.apply_update(v1.clone()).unwrap_err();
916        assert_eq!(
917            err,
918            CatalogError::StaleVersion {
919                collection: orders.clone(),
920                range_id: RangeId::new(1),
921                current: CatalogVersion::initial().next(),
922                attempted: CatalogVersion::initial(),
923            }
924        );
925        // The stale write did not roll ownership back.
926        assert_eq!(
927            catalog.range(&orders, RangeId::new(1)).unwrap().owner(),
928            &ident("CN=node-b")
929        );
930        assert_eq!(
931            catalog
932                .range(&orders, RangeId::new(1))
933                .unwrap()
934                .version()
935                .value(),
936            2
937        );
938    }
939
940    #[test]
941    fn overlapping_range_creation_is_rejected() {
942        let mut catalog = ShardOwnershipCatalog::new();
943        let orders = collection("orders");
944        catalog
945            .apply_update(hash_range(
946                &orders,
947                1,
948                bounds(&[0x00], &[0x80]),
949                "CN=node-a",
950            ))
951            .unwrap();
952        // A new range overlapping [0x00, 0x80) is ambiguous for routing.
953        let err = catalog
954            .apply_update(hash_range(
955                &orders,
956                2,
957                bounds(&[0x40], &[0xc0]),
958                "CN=node-b",
959            ))
960            .unwrap_err();
961        assert_eq!(
962            err,
963            CatalogError::OverlappingRange {
964                collection: orders.clone(),
965                existing: RangeId::new(1),
966                attempted: RangeId::new(2),
967            }
968        );
969        assert_eq!(catalog.range_count(), 1);
970    }
971
972    #[test]
973    fn catalog_replicates_to_data_members_with_read_visibility() {
974        // Leader writes the catalog; a data member holds its own replica and
975        // applies the same versioned updates — no user-data sharding involved.
976        let orders = collection("orders");
977        let mut leader = ShardOwnershipCatalog::new();
978        let mut data_member = ShardOwnershipCatalog::new();
979
980        // Leader creates a range; ship the entry to the data member.
981        let create = hash_range(&orders, 1, RangeBounds::full(), "CN=node-a");
982        leader.apply_update(create.clone()).unwrap();
983        assert_eq!(
984            data_member.apply_update(create).unwrap(),
985            UpdateOutcome::Created
986        );
987
988        // The data member can route locally to the same owner the leader has.
989        assert_eq!(
990            data_member.route(&orders, b"any-key").unwrap().owner(),
991            &ident("CN=node-a")
992        );
993
994        // Leader transfers ownership; replicate the v2 entry.
995        let v2 = leader
996            .range(&orders, RangeId::new(1))
997            .unwrap()
998            .transfer_to(ident("CN=node-b"), []);
999        leader.apply_update(v2.clone()).unwrap();
1000        assert_eq!(
1001            data_member.apply_update(v2.clone()).unwrap(),
1002            UpdateOutcome::Updated
1003        );
1004        assert_eq!(
1005            data_member.route(&orders, b"any-key").unwrap().owner(),
1006            &ident("CN=node-b")
1007        );
1008
1009        // Out-of-order / duplicate replication: re-delivering v2 after it is
1010        // applied is stale on the replica and rejected, so it stays consistent.
1011        let err = data_member.apply_update(v2).unwrap_err();
1012        assert!(matches!(err, CatalogError::StaleVersion { .. }));
1013        assert_eq!(
1014            data_member
1015                .range(&orders, RangeId::new(1))
1016                .unwrap()
1017                .version()
1018                .value(),
1019            2
1020        );
1021    }
1022
1023    #[test]
1024    fn range_bounds_reject_empty_or_inverted() {
1025        assert!(RangeBounds::new(RangeBound::key([0x10]), RangeBound::key([0x10])).is_err());
1026        assert!(RangeBounds::new(RangeBound::key([0x20]), RangeBound::key([0x10])).is_err());
1027        assert!(RangeBounds::new(RangeBound::Max, RangeBound::Min).is_err());
1028        assert!(RangeBounds::full().contains(b"anything"));
1029    }
1030}