Skip to main content

reddb_server/cluster/
cross_range.rs

1//! First-cut cross-range transaction and read behaviour (issue #1002, PRD #987).
2//!
3//! Once a collection is split into independently-owned ranges (issue #989's
4//! catalog, routed by [`plan_route`](ShardOwnershipCatalog::plan_route)), a
5//! single client operation can touch keys that land in *different* ranges — and,
6//! in a multi-writer cluster, those ranges can be owned by *different writers*.
7//! This module is the request-layer gate that decides what the first multi-writer
8//! cut is allowed to do with such an operation. It deliberately refuses to
9//! pretend a cross-writer operation is globally atomic or globally
10//! snapshot-consistent when nothing underneath guarantees it.
11//!
12//! Three decisions live here, all pure reads of the [`ShardOwnershipCatalog`]:
13//!
14//! * **Write transactions** ([`plan_write_transaction`]). Resolve every targeted
15//!   key to its owning range and group by writer. A transaction confined to a
16//!   *single* writer (even one that spans several of that writer's own ranges)
17//!   commits on that owner and is admitted. A transaction whose keys span ranges
18//!   owned by *different* writers has no atomic-commit path in this cut, so it is
19//!   rejected with [`WriteTransactionReject::CrossRange`] naming every writer
20//!   involved — a clear "unsupported" contract rather than a silent partial
21//!   commit.
22//!
23//! * **Simple read fanout** ([`plan_read_fanout`]). A best-effort read may span
24//!   any number of range owners; the plan collects one [`ReadLeg`] per owner so
25//!   the caller can scatter the read and gather the results. This is explicitly
26//!   *not* a globally consistent snapshot — each leg observes its owner at
27//!   whatever point it happens to be at — and the type name says so.
28//!
29//! * **Consistent / transactional reads** ([`plan_consistent_read`]). A read that
30//!   must look globally consistent needs a safe snapshot point that covers every
31//!   range it touches. The caller supplies a [`GlobalReadWatermark`]; the plan
32//!   pins each leg to that range's watermark. With no snapshot the request fails
33//!   with [`ConsistentReadReject::NoSafeSnapshot`]; with a snapshot that is
34//!   missing a targeted range it fails with [`ConsistentReadReject::WatermarkGap`].
35//!   Either way the caller learns it cannot get a consistent answer rather than
36//!   getting an inconsistent one dressed up as consistent.
37//!
38//! Like the rest of the cluster module this is a pure decision layer with no I/O:
39//! it maps a catalog plus a set of [`KeyTarget`]s to an intent, so the
40//! cross-range contract is exercised deterministically. The transport that
41//! actually fans the legs out and the storage that admits each write are layered
42//! on top.
43
44use std::collections::BTreeMap;
45
46use super::identity::NodeIdentity;
47use super::ownership::{CollectionId, OwnershipEpoch, RangeId, ShardOwnershipCatalog};
48use super::ownership_transition::CommitWatermark;
49
50/// One `(collection, key)` a cross-range operation touches.
51///
52/// A transaction or multi-key read is just a set of these; the catalog resolves
53/// each to its owning range to decide whether the operation crosses writers.
54#[derive(Debug, Clone, PartialEq, Eq)]
55pub struct KeyTarget {
56    collection: CollectionId,
57    key: Vec<u8>,
58}
59
60impl KeyTarget {
61    pub fn new(collection: CollectionId, key: impl Into<Vec<u8>>) -> Self {
62        Self {
63            collection,
64            key: key.into(),
65        }
66    }
67
68    pub fn collection(&self) -> &CollectionId {
69        &self.collection
70    }
71
72    pub fn key(&self) -> &[u8] {
73        &self.key
74    }
75}
76
77/// A targeted key resolved to the range that owns it — the catalog read every
78/// cross-range decision is built from.
79#[derive(Debug, Clone, PartialEq, Eq)]
80pub struct ResolvedTarget {
81    collection: CollectionId,
82    key: Vec<u8>,
83    range_id: RangeId,
84    owner: NodeIdentity,
85    epoch: OwnershipEpoch,
86}
87
88impl ResolvedTarget {
89    pub fn collection(&self) -> &CollectionId {
90        &self.collection
91    }
92
93    pub fn key(&self) -> &[u8] {
94        &self.key
95    }
96
97    pub fn range_id(&self) -> RangeId {
98        self.range_id
99    }
100
101    pub fn owner(&self) -> &NodeIdentity {
102        &self.owner
103    }
104
105    pub fn epoch(&self) -> OwnershipEpoch {
106        self.epoch
107    }
108}
109
110/// A range a writer owns that an operation touches, with the epoch the caller
111/// must fence each write under (the same epoch
112/// [`admit_public_write`](ShardOwnershipCatalog::admit_public_write) checks).
113#[derive(Debug, Clone, PartialEq, Eq)]
114pub struct RangeParticipant {
115    collection: CollectionId,
116    range_id: RangeId,
117    epoch: OwnershipEpoch,
118}
119
120impl RangeParticipant {
121    pub fn collection(&self) -> &CollectionId {
122        &self.collection
123    }
124
125    pub fn range_id(&self) -> RangeId {
126        self.range_id
127    }
128
129    pub fn epoch(&self) -> OwnershipEpoch {
130        self.epoch
131    }
132}
133
134/// One writer's participation in a cross-range write transaction: the writer and
135/// the distinct ranges of theirs the transaction touches. Used both as the
136/// admitted single-writer plan and, in the rejection, to name each writer the
137/// transaction would have had to coordinate.
138#[derive(Debug, Clone, PartialEq, Eq)]
139pub struct WriterParticipation {
140    writer: NodeIdentity,
141    ranges: Vec<RangeParticipant>,
142}
143
144impl WriterParticipation {
145    pub fn writer(&self) -> &NodeIdentity {
146        &self.writer
147    }
148
149    pub fn ranges(&self) -> &[RangeParticipant] {
150        &self.ranges
151    }
152}
153
154/// An admitted single-writer write transaction: every targeted key resolves to a
155/// range owned by the *same* writer, so the transaction commits atomically on
156/// that owner. Carries each participating range so the caller fences every write
157/// at the right epoch.
158#[derive(Debug, Clone, PartialEq, Eq)]
159pub struct WriteTransactionPlan {
160    participation: WriterParticipation,
161}
162
163impl WriteTransactionPlan {
164    /// The single writer that owns every range the transaction touches.
165    pub fn writer(&self) -> &NodeIdentity {
166        self.participation.writer()
167    }
168
169    /// The distinct ranges (with fencing epochs) the transaction writes.
170    pub fn ranges(&self) -> &[RangeParticipant] {
171        self.participation.ranges()
172    }
173}
174
175/// Why a write transaction could not be planned in the first multi-writer cut.
176#[derive(Debug, Clone, PartialEq, Eq)]
177pub enum WriteTransactionReject {
178    /// The transaction named no targets — there is nothing to commit.
179    Empty,
180    /// A targeted key resolves to no range of its collection; routing is stale or
181    /// the collection is not yet placed. The caller must refresh its catalog.
182    Unroutable {
183        collection: CollectionId,
184        key: Vec<u8>,
185    },
186    /// The transaction's keys span ranges owned by **different writers**. There is
187    /// no atomic cross-writer commit path in this cut, so the transaction is
188    /// rejected rather than committed partially. Carries every writer involved (in
189    /// identity order) so the caller sees exactly which owners it straddled.
190    CrossRange { writers: Vec<WriterParticipation> },
191}
192
193impl std::fmt::Display for WriteTransactionReject {
194    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195        match self {
196            Self::Empty => write!(f, "write transaction names no targets"),
197            Self::Unroutable { collection, key } => write!(
198                f,
199                "no range of collection {collection} covers key {} — re-resolve routing",
200                DisplayKey(key)
201            ),
202            Self::CrossRange { writers } => {
203                write!(
204                    f,
205                    "cross-range write transaction spans {} writers and is unsupported: ",
206                    writers.len()
207                )?;
208                for (i, w) in writers.iter().enumerate() {
209                    if i > 0 {
210                        write!(f, ", ")?;
211                    }
212                    write!(f, "{} owns ", w.writer())?;
213                    for (j, r) in w.ranges().iter().enumerate() {
214                        if j > 0 {
215                            write!(f, "+")?;
216                        }
217                        write!(f, "{}/{}", r.collection(), r.range_id())?;
218                    }
219                }
220                Ok(())
221            }
222        }
223    }
224}
225
226impl std::error::Error for WriteTransactionReject {}
227
228/// One owner's leg of a read: the owner and the resolved targets to read there.
229#[derive(Debug, Clone, PartialEq, Eq)]
230pub struct ReadLeg {
231    owner: NodeIdentity,
232    targets: Vec<ResolvedTarget>,
233}
234
235impl ReadLeg {
236    pub fn owner(&self) -> &NodeIdentity {
237        &self.owner
238    }
239
240    pub fn targets(&self) -> &[ResolvedTarget] {
241        &self.targets
242    }
243}
244
245/// A simple, best-effort cross-range read split into one [`ReadLeg`] per owner.
246///
247/// **Not** a globally consistent snapshot: each leg observes its owner at
248/// whatever point that owner is at when it answers, so two legs may reflect
249/// different moments in time. For a globally consistent answer use
250/// [`plan_consistent_read`](ShardOwnershipCatalog::plan_consistent_read).
251#[derive(Debug, Clone, PartialEq, Eq)]
252pub struct ReadFanout {
253    legs: Vec<ReadLeg>,
254}
255
256impl ReadFanout {
257    /// One leg per distinct owner, in identity order.
258    pub fn legs(&self) -> &[ReadLeg] {
259        &self.legs
260    }
261
262    /// Whether the read fans out to more than one owner.
263    pub fn is_cross_range(&self) -> bool {
264        self.legs.len() > 1
265    }
266}
267
268/// Why a simple read fanout could not be planned.
269#[derive(Debug, Clone, PartialEq, Eq)]
270pub enum ReadFanoutReject {
271    /// The read named no targets.
272    Empty,
273    /// A targeted key resolves to no range of its collection.
274    Unroutable {
275        collection: CollectionId,
276        key: Vec<u8>,
277    },
278}
279
280impl std::fmt::Display for ReadFanoutReject {
281    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
282        match self {
283            Self::Empty => write!(f, "read fanout names no targets"),
284            Self::Unroutable { collection, key } => write!(
285                f,
286                "no range of collection {collection} covers key {} — re-resolve routing",
287                DisplayKey(key)
288            ),
289        }
290    }
291}
292
293impl std::error::Error for ReadFanoutReject {}
294
295/// A safe snapshot point for a globally consistent cross-range read: a commit
296/// watermark per `(collection, range)` that the read pins itself to.
297///
298/// This is the "explicit safe snapshot/watermark path" the issue requires before
299/// a cross-range read may claim to be consistent. A consistent read must find a
300/// watermark here for **every** range it touches; a missing entry means the
301/// snapshot does not cover that range and the read cannot be served consistently.
302#[derive(Debug, Clone, Default, PartialEq, Eq)]
303pub struct GlobalReadWatermark {
304    marks: BTreeMap<(CollectionId, RangeId), CommitWatermark>,
305}
306
307impl GlobalReadWatermark {
308    pub fn new() -> Self {
309        Self::default()
310    }
311
312    /// Pin `range`'s safe read point to `watermark` (builder form).
313    pub fn with(
314        mut self,
315        collection: CollectionId,
316        range_id: RangeId,
317        watermark: CommitWatermark,
318    ) -> Self {
319        self.marks.insert((collection, range_id), watermark);
320        self
321    }
322
323    /// Record `range`'s safe read point.
324    pub fn insert(
325        &mut self,
326        collection: CollectionId,
327        range_id: RangeId,
328        watermark: CommitWatermark,
329    ) {
330        self.marks.insert((collection, range_id), watermark);
331    }
332
333    /// The pinned watermark for a range, or `None` if the snapshot does not cover
334    /// it.
335    pub fn covers(&self, collection: &CollectionId, range_id: RangeId) -> Option<CommitWatermark> {
336        self.marks.get(&(collection.clone(), range_id)).copied()
337    }
338}
339
340/// One owner's leg of a consistent read: each resolved target paired with the
341/// safe watermark its range is pinned to for this snapshot.
342#[derive(Debug, Clone, PartialEq, Eq)]
343pub struct ConsistentReadLeg {
344    owner: NodeIdentity,
345    targets: Vec<PinnedTarget>,
346}
347
348impl ConsistentReadLeg {
349    pub fn owner(&self) -> &NodeIdentity {
350        &self.owner
351    }
352
353    pub fn targets(&self) -> &[PinnedTarget] {
354        &self.targets
355    }
356}
357
358/// A resolved target pinned to the safe read watermark of its range.
359#[derive(Debug, Clone, PartialEq, Eq)]
360pub struct PinnedTarget {
361    target: ResolvedTarget,
362    watermark: CommitWatermark,
363}
364
365impl PinnedTarget {
366    pub fn target(&self) -> &ResolvedTarget {
367        &self.target
368    }
369
370    pub fn watermark(&self) -> CommitWatermark {
371        self.watermark
372    }
373}
374
375/// A globally consistent cross-range read, pinned to a safe snapshot covering
376/// every range it touches.
377#[derive(Debug, Clone, PartialEq, Eq)]
378pub struct ConsistentReadPlan {
379    legs: Vec<ConsistentReadLeg>,
380}
381
382impl ConsistentReadPlan {
383    /// One leg per distinct owner, in identity order, each pinned to its range's
384    /// safe watermark.
385    pub fn legs(&self) -> &[ConsistentReadLeg] {
386        &self.legs
387    }
388}
389
390/// Why a consistent cross-range read could not be planned.
391#[derive(Debug, Clone, PartialEq, Eq)]
392pub enum ConsistentReadReject {
393    /// The read named no targets.
394    Empty,
395    /// A targeted key resolves to no range of its collection.
396    Unroutable {
397        collection: CollectionId,
398        key: Vec<u8>,
399    },
400    /// No safe snapshot was supplied. A cross-range read cannot be served
401    /// consistently without a global watermark, so the request fails clearly
402    /// rather than silently degrading to a best-effort fanout.
403    NoSafeSnapshot,
404    /// The supplied snapshot does not cover a targeted range, so the read cannot
405    /// be pinned to a single safe point across all of its ranges.
406    WatermarkGap {
407        collection: CollectionId,
408        range_id: RangeId,
409    },
410}
411
412impl std::fmt::Display for ConsistentReadReject {
413    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
414        match self {
415            Self::Empty => write!(f, "consistent read names no targets"),
416            Self::Unroutable { collection, key } => write!(
417                f,
418                "no range of collection {collection} covers key {} — re-resolve routing",
419                DisplayKey(key)
420            ),
421            Self::NoSafeSnapshot => write!(
422                f,
423                "consistent cross-range read requires a global safe snapshot/watermark, none supplied"
424            ),
425            Self::WatermarkGap {
426                collection,
427                range_id,
428            } => write!(
429                f,
430                "safe snapshot does not cover {collection}/{range_id}; cannot serve a consistent read"
431            ),
432        }
433    }
434}
435
436impl std::error::Error for ConsistentReadReject {}
437
438/// Hex-ish key rendering for error messages — keys are arbitrary bytes.
439struct DisplayKey<'a>(&'a [u8]);
440
441impl std::fmt::Display for DisplayKey<'_> {
442    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
443        write!(f, "0x")?;
444        for b in self.0 {
445            write!(f, "{b:02x}")?;
446        }
447        Ok(())
448    }
449}
450
451impl ShardOwnershipCatalog {
452    /// Resolve every target to its owning range, preserving caller order.
453    /// `Err` on the first key no range covers.
454    fn resolve_targets(
455        &self,
456        targets: &[KeyTarget],
457    ) -> Result<Vec<ResolvedTarget>, (CollectionId, Vec<u8>)> {
458        let mut resolved = Vec::with_capacity(targets.len());
459        for t in targets {
460            match self.route_shard_key(t.collection(), t.key()) {
461                Some(range) => resolved.push(ResolvedTarget {
462                    collection: t.collection().clone(),
463                    key: t.key().to_vec(),
464                    range_id: range.range_id(),
465                    owner: range.owner().clone(),
466                    epoch: range.epoch(),
467                }),
468                None => return Err((t.collection().clone(), t.key().to_vec())),
469            }
470        }
471        Ok(resolved)
472    }
473
474    /// Plan a write transaction over `targets` in the first multi-writer cut
475    /// (issue #1002).
476    ///
477    /// Resolves every targeted key to its owning range and groups by writer:
478    ///
479    /// * all targets owned by one writer → [`WriteTransactionPlan`] (the
480    ///   transaction commits atomically on that owner, even across several of its
481    ///   own ranges);
482    /// * targets span ranges of different writers →
483    ///   [`WriteTransactionReject::CrossRange`] naming every writer — this cut has
484    ///   no atomic cross-writer commit;
485    /// * a target routes nowhere → [`WriteTransactionReject::Unroutable`].
486    ///
487    /// Pure: it reads the catalog and returns intent. Each admitted write still
488    /// passes [`admit_public_write`](Self::admit_public_write) at the owner's
489    /// current epoch, so a stale plan cannot smuggle a write past fencing.
490    pub fn plan_write_transaction(
491        &self,
492        targets: &[KeyTarget],
493    ) -> Result<WriteTransactionPlan, WriteTransactionReject> {
494        if targets.is_empty() {
495            return Err(WriteTransactionReject::Empty);
496        }
497        let resolved = self
498            .resolve_targets(targets)
499            .map_err(|(collection, key)| WriteTransactionReject::Unroutable { collection, key })?;
500
501        let writers = group_by_owner(&resolved);
502        if writers.len() == 1 {
503            let (writer, ranges) = writers.into_iter().next().expect("exactly one writer");
504            Ok(WriteTransactionPlan {
505                participation: WriterParticipation { writer, ranges },
506            })
507        } else {
508            Err(WriteTransactionReject::CrossRange {
509                writers: writers
510                    .into_iter()
511                    .map(|(writer, ranges)| WriterParticipation { writer, ranges })
512                    .collect(),
513            })
514        }
515    }
516
517    /// Plan a simple, best-effort cross-range read fanout over `targets`
518    /// (issue #1002).
519    ///
520    /// Collects the resolved targets into one [`ReadLeg`] per owner so the caller
521    /// can scatter the read across every range owner and gather the results. This
522    /// is *not* a consistent snapshot — see [`ReadFanout`]. `Err` only when the
523    /// read is empty or a target routes nowhere; spanning many owners is the
524    /// expected, successful case.
525    pub fn plan_read_fanout(&self, targets: &[KeyTarget]) -> Result<ReadFanout, ReadFanoutReject> {
526        if targets.is_empty() {
527            return Err(ReadFanoutReject::Empty);
528        }
529        let resolved = self
530            .resolve_targets(targets)
531            .map_err(|(collection, key)| ReadFanoutReject::Unroutable { collection, key })?;
532
533        Ok(ReadFanout {
534            legs: group_targets_by_owner(resolved),
535        })
536    }
537
538    /// Plan a globally consistent cross-range read over `targets`, pinned to
539    /// `snapshot` (issue #1002).
540    ///
541    /// A consistent read must pin every range it touches to a single safe point:
542    ///
543    /// * `snapshot` is `None` → [`ConsistentReadReject::NoSafeSnapshot`]; the
544    ///   caller must obtain a global watermark first;
545    /// * a targeted range is absent from `snapshot` →
546    ///   [`ConsistentReadReject::WatermarkGap`];
547    /// * a target routes nowhere → [`ConsistentReadReject::Unroutable`];
548    /// * otherwise → a [`ConsistentReadPlan`] with each leg pinned to its range's
549    ///   watermark.
550    ///
551    /// This is the explicit safe-snapshot path: without it a cross-range read may
552    /// only be served as a best-effort [`ReadFanout`], never as a consistent one.
553    pub fn plan_consistent_read(
554        &self,
555        targets: &[KeyTarget],
556        snapshot: Option<&GlobalReadWatermark>,
557    ) -> Result<ConsistentReadPlan, ConsistentReadReject> {
558        if targets.is_empty() {
559            return Err(ConsistentReadReject::Empty);
560        }
561        let resolved = self
562            .resolve_targets(targets)
563            .map_err(|(collection, key)| ConsistentReadReject::Unroutable { collection, key })?;
564
565        let snapshot = snapshot.ok_or(ConsistentReadReject::NoSafeSnapshot)?;
566
567        // Every targeted range must be covered by the snapshot before any leg is
568        // built — a partial pin is not a consistent read.
569        let mut pinned = Vec::with_capacity(resolved.len());
570        for target in resolved {
571            let watermark = snapshot
572                .covers(target.collection(), target.range_id())
573                .ok_or_else(|| ConsistentReadReject::WatermarkGap {
574                    collection: target.collection().clone(),
575                    range_id: target.range_id(),
576                })?;
577            pinned.push(PinnedTarget { target, watermark });
578        }
579
580        Ok(ConsistentReadPlan {
581            legs: group_pinned_by_owner(pinned),
582        })
583    }
584}
585
586/// Group resolved targets by owner into `(writer, distinct ranges)` pairs, in
587/// identity order. Ranges within a writer are deduplicated and ordered by id.
588fn group_by_owner(resolved: &[ResolvedTarget]) -> Vec<(NodeIdentity, Vec<RangeParticipant>)> {
589    let mut by_owner: BTreeMap<NodeIdentity, BTreeMap<RangeId, RangeParticipant>> = BTreeMap::new();
590    for t in resolved {
591        by_owner
592            .entry(t.owner().clone())
593            .or_default()
594            .entry(t.range_id())
595            .or_insert_with(|| RangeParticipant {
596                collection: t.collection().clone(),
597                range_id: t.range_id(),
598                epoch: t.epoch(),
599            });
600    }
601    by_owner
602        .into_iter()
603        .map(|(owner, ranges)| (owner, ranges.into_values().collect()))
604        .collect()
605}
606
607/// Group resolved targets into one [`ReadLeg`] per owner, in identity order.
608fn group_targets_by_owner(resolved: Vec<ResolvedTarget>) -> Vec<ReadLeg> {
609    let mut by_owner: BTreeMap<NodeIdentity, Vec<ResolvedTarget>> = BTreeMap::new();
610    for t in resolved {
611        by_owner.entry(t.owner().clone()).or_default().push(t);
612    }
613    by_owner
614        .into_iter()
615        .map(|(owner, targets)| ReadLeg { owner, targets })
616        .collect()
617}
618
619/// Group pinned targets into one [`ConsistentReadLeg`] per owner, in identity
620/// order.
621fn group_pinned_by_owner(pinned: Vec<PinnedTarget>) -> Vec<ConsistentReadLeg> {
622    let mut by_owner: BTreeMap<NodeIdentity, Vec<PinnedTarget>> = BTreeMap::new();
623    for p in pinned {
624        by_owner
625            .entry(p.target().owner().clone())
626            .or_default()
627            .push(p);
628    }
629    by_owner
630        .into_iter()
631        .map(|(owner, targets)| ConsistentReadLeg { owner, targets })
632        .collect()
633}
634
635#[cfg(test)]
636mod tests {
637    use super::*;
638    use crate::cluster::ownership::{PlacementMetadata, RangeBound, RangeBounds, ShardKeyMode};
639
640    fn collection(name: &str) -> CollectionId {
641        CollectionId::new(name).unwrap()
642    }
643
644    fn ident(cn: &str) -> NodeIdentity {
645        NodeIdentity::from_certificate_subject(cn).unwrap()
646    }
647
648    fn bounds(lower: &[u8], upper: &[u8]) -> RangeBounds {
649        RangeBounds::new(RangeBound::key(lower), RangeBound::key(upper)).unwrap()
650    }
651
652    /// A range over `[lower, upper)` of `coll` owned by `owner`.
653    fn range(
654        coll: &CollectionId,
655        id: u64,
656        bnds: RangeBounds,
657        owner: &str,
658    ) -> super::super::ownership::RangeOwnership {
659        super::super::ownership::RangeOwnership::establish(
660            coll.clone(),
661            RangeId::new(id),
662            ShardKeyMode::Ordered,
663            bnds,
664            ident(owner),
665            [ident("CN=replica-1")],
666            PlacementMetadata::with_replication_factor(3),
667        )
668    }
669
670    /// Catalog with `orders` split into two ranges: [a,m) owned by node-a,
671    /// [m,Max) owned by node-b.
672    fn two_range_catalog() -> (ShardOwnershipCatalog, CollectionId) {
673        let orders = collection("orders");
674        let mut catalog = ShardOwnershipCatalog::new();
675        catalog
676            .apply_update(range(&orders, 1, bounds(b"a", b"m"), "CN=node-a"))
677            .unwrap();
678        catalog
679            .apply_update(range(
680                &orders,
681                2,
682                RangeBounds::new(RangeBound::key(b"m"), RangeBound::Max).unwrap(),
683                "CN=node-b",
684            ))
685            .unwrap();
686        (catalog, orders)
687    }
688
689    fn target(coll: &CollectionId, key: &[u8]) -> KeyTarget {
690        KeyTarget::new(coll.clone(), key.to_vec())
691    }
692
693    // AC #5: a write transaction whose keys all land in one writer's ranges is
694    // admitted — even when it spans several of that writer's ranges.
695    #[test]
696    fn single_writer_transaction_succeeds() {
697        let orders = collection("orders");
698        let mut catalog = ShardOwnershipCatalog::new();
699        // Two ranges both owned by node-a.
700        catalog
701            .apply_update(range(&orders, 1, bounds(b"a", b"m"), "CN=node-a"))
702            .unwrap();
703        catalog
704            .apply_update(range(
705                &orders,
706                2,
707                RangeBounds::new(RangeBound::key(b"m"), RangeBound::Max).unwrap(),
708                "CN=node-a",
709            ))
710            .unwrap();
711
712        let plan = catalog
713            .plan_write_transaction(&[target(&orders, b"alice"), target(&orders, b"zeb")])
714            .expect("single-writer transaction is admitted");
715        assert_eq!(plan.writer(), &ident("CN=node-a"));
716        // Both of node-a's ranges participate, deduplicated and id-ordered.
717        let ids: Vec<u64> = plan.ranges().iter().map(|r| r.range_id().value()).collect();
718        assert_eq!(ids, vec![1, 2]);
719        assert_eq!(plan.ranges()[0].epoch(), OwnershipEpoch::initial());
720    }
721
722    // AC #5: keys that all land in a single range are trivially single-writer.
723    #[test]
724    fn single_range_transaction_succeeds() {
725        let (catalog, orders) = two_range_catalog();
726        let plan = catalog
727            .plan_write_transaction(&[target(&orders, b"alice"), target(&orders, b"bob")])
728            .expect("single-range transaction is admitted");
729        assert_eq!(plan.writer(), &ident("CN=node-a"));
730        assert_eq!(plan.ranges().len(), 1);
731        assert_eq!(plan.ranges()[0].range_id(), RangeId::new(1));
732    }
733
734    // AC #1 + #2: a transaction straddling ranges owned by different writers is
735    // detected and rejected, naming both writers.
736    #[test]
737    fn cross_range_write_transaction_is_rejected() {
738        let (catalog, orders) = two_range_catalog();
739        let err = catalog
740            .plan_write_transaction(&[target(&orders, b"alice"), target(&orders, b"zeb")])
741            .expect_err("cross-writer transaction is rejected");
742        match err {
743            WriteTransactionReject::CrossRange { writers } => {
744                assert_eq!(writers.len(), 2);
745                assert_eq!(writers[0].writer(), &ident("CN=node-a"));
746                assert_eq!(writers[1].writer(), &ident("CN=node-b"));
747                assert_eq!(writers[0].ranges()[0].range_id(), RangeId::new(1));
748                assert_eq!(writers[1].ranges()[0].range_id(), RangeId::new(2));
749            }
750            other => panic!("expected CrossRange, got {other:?}"),
751        }
752    }
753
754    #[test]
755    fn empty_write_transaction_is_rejected() {
756        let catalog = ShardOwnershipCatalog::new();
757        assert_eq!(
758            catalog.plan_write_transaction(&[]),
759            Err(WriteTransactionReject::Empty)
760        );
761    }
762
763    #[test]
764    fn unroutable_write_transaction_is_rejected() {
765        let catalog = ShardOwnershipCatalog::new();
766        let orders = collection("orders");
767        match catalog.plan_write_transaction(&[target(&orders, b"x")]) {
768            Err(WriteTransactionReject::Unroutable { collection, key }) => {
769                assert_eq!(collection, orders);
770                assert_eq!(key, b"x");
771            }
772            other => panic!("expected Unroutable, got {other:?}"),
773        }
774    }
775
776    // AC #3: a simple read fanout collects one leg per owner across ranges.
777    #[test]
778    fn read_fanout_collects_per_owner_legs() {
779        let (catalog, orders) = two_range_catalog();
780        let fanout = catalog
781            .plan_read_fanout(&[
782                target(&orders, b"alice"),
783                target(&orders, b"zeb"),
784                target(&orders, b"bob"),
785            ])
786            .expect("fanout planned");
787        assert!(fanout.is_cross_range());
788        assert_eq!(fanout.legs().len(), 2);
789        // node-a leg gets alice + bob (range 1); node-b leg gets zeb (range 2).
790        let a = &fanout.legs()[0];
791        assert_eq!(a.owner(), &ident("CN=node-a"));
792        assert_eq!(a.targets().len(), 2);
793        let b = &fanout.legs()[1];
794        assert_eq!(b.owner(), &ident("CN=node-b"));
795        assert_eq!(b.targets().len(), 1);
796        assert_eq!(b.targets()[0].key(), b"zeb");
797    }
798
799    #[test]
800    fn single_owner_read_is_not_cross_range() {
801        let (catalog, orders) = two_range_catalog();
802        let fanout = catalog
803            .plan_read_fanout(&[target(&orders, b"alice"), target(&orders, b"bob")])
804            .expect("fanout planned");
805        assert!(!fanout.is_cross_range());
806        assert_eq!(fanout.legs().len(), 1);
807    }
808
809    #[test]
810    fn unroutable_read_fanout_is_rejected() {
811        let catalog = ShardOwnershipCatalog::new();
812        let orders = collection("orders");
813        match catalog.plan_read_fanout(&[target(&orders, b"x")]) {
814            Err(ReadFanoutReject::Unroutable { collection, .. }) => {
815                assert_eq!(collection, orders)
816            }
817            other => panic!("expected Unroutable, got {other:?}"),
818        }
819    }
820
821    // AC #4: a consistent cross-range read with no snapshot fails clearly.
822    #[test]
823    fn consistent_read_without_snapshot_is_rejected() {
824        let (catalog, orders) = two_range_catalog();
825        assert_eq!(
826            catalog
827                .plan_consistent_read(&[target(&orders, b"alice"), target(&orders, b"zeb")], None),
828            Err(ConsistentReadReject::NoSafeSnapshot)
829        );
830    }
831
832    // AC #4: a snapshot missing a targeted range fails with a watermark gap.
833    #[test]
834    fn consistent_read_with_incomplete_snapshot_is_rejected() {
835        let (catalog, orders) = two_range_catalog();
836        // Snapshot covers range 1 but not range 2.
837        let snapshot = GlobalReadWatermark::new().with(
838            orders.clone(),
839            RangeId::new(1),
840            CommitWatermark::new(1, 100),
841        );
842        match catalog.plan_consistent_read(
843            &[target(&orders, b"alice"), target(&orders, b"zeb")],
844            Some(&snapshot),
845        ) {
846            Err(ConsistentReadReject::WatermarkGap {
847                collection,
848                range_id,
849            }) => {
850                assert_eq!(collection, orders);
851                assert_eq!(range_id, RangeId::new(2));
852            }
853            other => panic!("expected WatermarkGap, got {other:?}"),
854        }
855    }
856
857    // AC #4: with a snapshot covering every targeted range, the consistent read
858    // is planned and each leg is pinned to its range's watermark.
859    #[test]
860    fn consistent_read_with_full_snapshot_succeeds() {
861        let (catalog, orders) = two_range_catalog();
862        let snapshot = GlobalReadWatermark::new()
863            .with(
864                orders.clone(),
865                RangeId::new(1),
866                CommitWatermark::new(1, 100),
867            )
868            .with(
869                orders.clone(),
870                RangeId::new(2),
871                CommitWatermark::new(1, 250),
872            );
873        let plan = catalog
874            .plan_consistent_read(
875                &[target(&orders, b"alice"), target(&orders, b"zeb")],
876                Some(&snapshot),
877            )
878            .expect("consistent read planned");
879        assert_eq!(plan.legs().len(), 2);
880        let a = &plan.legs()[0];
881        assert_eq!(a.owner(), &ident("CN=node-a"));
882        assert_eq!(a.targets()[0].watermark(), CommitWatermark::new(1, 100));
883        let b = &plan.legs()[1];
884        assert_eq!(b.owner(), &ident("CN=node-b"));
885        assert_eq!(b.targets()[0].watermark(), CommitWatermark::new(1, 250));
886    }
887
888    #[test]
889    fn empty_consistent_read_is_rejected() {
890        let catalog = ShardOwnershipCatalog::new();
891        assert_eq!(
892            catalog.plan_consistent_read(&[], None),
893            Err(ConsistentReadReject::Empty)
894        );
895    }
896
897    // The rejection contract renders a readable, writer-naming message.
898    #[test]
899    fn cross_range_rejection_message_names_writers() {
900        let (catalog, orders) = two_range_catalog();
901        let err = catalog
902            .plan_write_transaction(&[target(&orders, b"alice"), target(&orders, b"zeb")])
903            .unwrap_err();
904        let msg = err.to_string();
905        assert!(msg.contains("cross-range write transaction"));
906        assert!(msg.contains("CN=node-a"));
907        assert!(msg.contains("CN=node-b"));
908    }
909}