use std::collections::BTreeMap;
use super::identity::NodeIdentity;
use super::ownership::{CollectionId, OwnershipEpoch, RangeId, ShardOwnershipCatalog};
use super::ownership_transition::CommitWatermark;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct KeyTarget {
collection: CollectionId,
key: Vec<u8>,
}
impl KeyTarget {
pub fn new(collection: CollectionId, key: impl Into<Vec<u8>>) -> Self {
Self {
collection,
key: key.into(),
}
}
pub fn collection(&self) -> &CollectionId {
&self.collection
}
pub fn key(&self) -> &[u8] {
&self.key
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ResolvedTarget {
collection: CollectionId,
key: Vec<u8>,
range_id: RangeId,
owner: NodeIdentity,
epoch: OwnershipEpoch,
}
impl ResolvedTarget {
pub fn collection(&self) -> &CollectionId {
&self.collection
}
pub fn key(&self) -> &[u8] {
&self.key
}
pub fn range_id(&self) -> RangeId {
self.range_id
}
pub fn owner(&self) -> &NodeIdentity {
&self.owner
}
pub fn epoch(&self) -> OwnershipEpoch {
self.epoch
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RangeParticipant {
collection: CollectionId,
range_id: RangeId,
epoch: OwnershipEpoch,
}
impl RangeParticipant {
pub fn collection(&self) -> &CollectionId {
&self.collection
}
pub fn range_id(&self) -> RangeId {
self.range_id
}
pub fn epoch(&self) -> OwnershipEpoch {
self.epoch
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct WriterParticipation {
writer: NodeIdentity,
ranges: Vec<RangeParticipant>,
}
impl WriterParticipation {
pub fn writer(&self) -> &NodeIdentity {
&self.writer
}
pub fn ranges(&self) -> &[RangeParticipant] {
&self.ranges
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct WriteTransactionPlan {
participation: WriterParticipation,
}
impl WriteTransactionPlan {
pub fn writer(&self) -> &NodeIdentity {
self.participation.writer()
}
pub fn ranges(&self) -> &[RangeParticipant] {
self.participation.ranges()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WriteTransactionReject {
Empty,
Unroutable {
collection: CollectionId,
key: Vec<u8>,
},
CrossRange { writers: Vec<WriterParticipation> },
}
impl std::fmt::Display for WriteTransactionReject {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Empty => write!(f, "write transaction names no targets"),
Self::Unroutable { collection, key } => write!(
f,
"no range of collection {collection} covers key {} — re-resolve routing",
DisplayKey(key)
),
Self::CrossRange { writers } => {
write!(
f,
"cross-range write transaction spans {} writers and is unsupported: ",
writers.len()
)?;
for (i, w) in writers.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{} owns ", w.writer())?;
for (j, r) in w.ranges().iter().enumerate() {
if j > 0 {
write!(f, "+")?;
}
write!(f, "{}/{}", r.collection(), r.range_id())?;
}
}
Ok(())
}
}
}
}
impl std::error::Error for WriteTransactionReject {}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReadLeg {
owner: NodeIdentity,
targets: Vec<ResolvedTarget>,
}
impl ReadLeg {
pub fn owner(&self) -> &NodeIdentity {
&self.owner
}
pub fn targets(&self) -> &[ResolvedTarget] {
&self.targets
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReadFanout {
legs: Vec<ReadLeg>,
}
impl ReadFanout {
pub fn legs(&self) -> &[ReadLeg] {
&self.legs
}
pub fn is_cross_range(&self) -> bool {
self.legs.len() > 1
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ReadFanoutReject {
Empty,
Unroutable {
collection: CollectionId,
key: Vec<u8>,
},
}
impl std::fmt::Display for ReadFanoutReject {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Empty => write!(f, "read fanout names no targets"),
Self::Unroutable { collection, key } => write!(
f,
"no range of collection {collection} covers key {} — re-resolve routing",
DisplayKey(key)
),
}
}
}
impl std::error::Error for ReadFanoutReject {}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct GlobalReadWatermark {
marks: BTreeMap<(CollectionId, RangeId), CommitWatermark>,
}
impl GlobalReadWatermark {
pub fn new() -> Self {
Self::default()
}
pub fn with(
mut self,
collection: CollectionId,
range_id: RangeId,
watermark: CommitWatermark,
) -> Self {
self.marks.insert((collection, range_id), watermark);
self
}
pub fn insert(
&mut self,
collection: CollectionId,
range_id: RangeId,
watermark: CommitWatermark,
) {
self.marks.insert((collection, range_id), watermark);
}
pub fn covers(&self, collection: &CollectionId, range_id: RangeId) -> Option<CommitWatermark> {
self.marks.get(&(collection.clone(), range_id)).copied()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ConsistentReadLeg {
owner: NodeIdentity,
targets: Vec<PinnedTarget>,
}
impl ConsistentReadLeg {
pub fn owner(&self) -> &NodeIdentity {
&self.owner
}
pub fn targets(&self) -> &[PinnedTarget] {
&self.targets
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PinnedTarget {
target: ResolvedTarget,
watermark: CommitWatermark,
}
impl PinnedTarget {
pub fn target(&self) -> &ResolvedTarget {
&self.target
}
pub fn watermark(&self) -> CommitWatermark {
self.watermark
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ConsistentReadPlan {
legs: Vec<ConsistentReadLeg>,
}
impl ConsistentReadPlan {
pub fn legs(&self) -> &[ConsistentReadLeg] {
&self.legs
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ConsistentReadReject {
Empty,
Unroutable {
collection: CollectionId,
key: Vec<u8>,
},
NoSafeSnapshot,
WatermarkGap {
collection: CollectionId,
range_id: RangeId,
},
}
impl std::fmt::Display for ConsistentReadReject {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Empty => write!(f, "consistent read names no targets"),
Self::Unroutable { collection, key } => write!(
f,
"no range of collection {collection} covers key {} — re-resolve routing",
DisplayKey(key)
),
Self::NoSafeSnapshot => write!(
f,
"consistent cross-range read requires a global safe snapshot/watermark, none supplied"
),
Self::WatermarkGap {
collection,
range_id,
} => write!(
f,
"safe snapshot does not cover {collection}/{range_id}; cannot serve a consistent read"
),
}
}
}
impl std::error::Error for ConsistentReadReject {}
struct DisplayKey<'a>(&'a [u8]);
impl std::fmt::Display for DisplayKey<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "0x")?;
for b in self.0 {
write!(f, "{b:02x}")?;
}
Ok(())
}
}
impl ShardOwnershipCatalog {
fn resolve_targets(
&self,
targets: &[KeyTarget],
) -> Result<Vec<ResolvedTarget>, (CollectionId, Vec<u8>)> {
let mut resolved = Vec::with_capacity(targets.len());
for t in targets {
match self.route_shard_key(t.collection(), t.key()) {
Some(range) => resolved.push(ResolvedTarget {
collection: t.collection().clone(),
key: t.key().to_vec(),
range_id: range.range_id(),
owner: range.owner().clone(),
epoch: range.epoch(),
}),
None => return Err((t.collection().clone(), t.key().to_vec())),
}
}
Ok(resolved)
}
pub fn plan_write_transaction(
&self,
targets: &[KeyTarget],
) -> Result<WriteTransactionPlan, WriteTransactionReject> {
if targets.is_empty() {
return Err(WriteTransactionReject::Empty);
}
let resolved = self
.resolve_targets(targets)
.map_err(|(collection, key)| WriteTransactionReject::Unroutable { collection, key })?;
let writers = group_by_owner(&resolved);
if writers.len() == 1 {
let (writer, ranges) = writers.into_iter().next().expect("exactly one writer");
Ok(WriteTransactionPlan {
participation: WriterParticipation { writer, ranges },
})
} else {
Err(WriteTransactionReject::CrossRange {
writers: writers
.into_iter()
.map(|(writer, ranges)| WriterParticipation { writer, ranges })
.collect(),
})
}
}
pub fn plan_read_fanout(&self, targets: &[KeyTarget]) -> Result<ReadFanout, ReadFanoutReject> {
if targets.is_empty() {
return Err(ReadFanoutReject::Empty);
}
let resolved = self
.resolve_targets(targets)
.map_err(|(collection, key)| ReadFanoutReject::Unroutable { collection, key })?;
Ok(ReadFanout {
legs: group_targets_by_owner(resolved),
})
}
pub fn plan_consistent_read(
&self,
targets: &[KeyTarget],
snapshot: Option<&GlobalReadWatermark>,
) -> Result<ConsistentReadPlan, ConsistentReadReject> {
if targets.is_empty() {
return Err(ConsistentReadReject::Empty);
}
let resolved = self
.resolve_targets(targets)
.map_err(|(collection, key)| ConsistentReadReject::Unroutable { collection, key })?;
let snapshot = snapshot.ok_or(ConsistentReadReject::NoSafeSnapshot)?;
let mut pinned = Vec::with_capacity(resolved.len());
for target in resolved {
let watermark = snapshot
.covers(target.collection(), target.range_id())
.ok_or_else(|| ConsistentReadReject::WatermarkGap {
collection: target.collection().clone(),
range_id: target.range_id(),
})?;
pinned.push(PinnedTarget { target, watermark });
}
Ok(ConsistentReadPlan {
legs: group_pinned_by_owner(pinned),
})
}
}
fn group_by_owner(resolved: &[ResolvedTarget]) -> Vec<(NodeIdentity, Vec<RangeParticipant>)> {
let mut by_owner: BTreeMap<NodeIdentity, BTreeMap<RangeId, RangeParticipant>> = BTreeMap::new();
for t in resolved {
by_owner
.entry(t.owner().clone())
.or_default()
.entry(t.range_id())
.or_insert_with(|| RangeParticipant {
collection: t.collection().clone(),
range_id: t.range_id(),
epoch: t.epoch(),
});
}
by_owner
.into_iter()
.map(|(owner, ranges)| (owner, ranges.into_values().collect()))
.collect()
}
fn group_targets_by_owner(resolved: Vec<ResolvedTarget>) -> Vec<ReadLeg> {
let mut by_owner: BTreeMap<NodeIdentity, Vec<ResolvedTarget>> = BTreeMap::new();
for t in resolved {
by_owner.entry(t.owner().clone()).or_default().push(t);
}
by_owner
.into_iter()
.map(|(owner, targets)| ReadLeg { owner, targets })
.collect()
}
fn group_pinned_by_owner(pinned: Vec<PinnedTarget>) -> Vec<ConsistentReadLeg> {
let mut by_owner: BTreeMap<NodeIdentity, Vec<PinnedTarget>> = BTreeMap::new();
for p in pinned {
by_owner
.entry(p.target().owner().clone())
.or_default()
.push(p);
}
by_owner
.into_iter()
.map(|(owner, targets)| ConsistentReadLeg { owner, targets })
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cluster::ownership::{PlacementMetadata, RangeBound, RangeBounds, ShardKeyMode};
fn collection(name: &str) -> CollectionId {
CollectionId::new(name).unwrap()
}
fn ident(cn: &str) -> NodeIdentity {
NodeIdentity::from_certificate_subject(cn).unwrap()
}
fn bounds(lower: &[u8], upper: &[u8]) -> RangeBounds {
RangeBounds::new(RangeBound::key(lower), RangeBound::key(upper)).unwrap()
}
fn range(
coll: &CollectionId,
id: u64,
bnds: RangeBounds,
owner: &str,
) -> super::super::ownership::RangeOwnership {
super::super::ownership::RangeOwnership::establish(
coll.clone(),
RangeId::new(id),
ShardKeyMode::Ordered,
bnds,
ident(owner),
[ident("CN=replica-1")],
PlacementMetadata::with_replication_factor(3),
)
}
fn two_range_catalog() -> (ShardOwnershipCatalog, CollectionId) {
let orders = collection("orders");
let mut catalog = ShardOwnershipCatalog::new();
catalog
.apply_update(range(&orders, 1, bounds(b"a", b"m"), "CN=node-a"))
.unwrap();
catalog
.apply_update(range(
&orders,
2,
RangeBounds::new(RangeBound::key(b"m"), RangeBound::Max).unwrap(),
"CN=node-b",
))
.unwrap();
(catalog, orders)
}
fn target(coll: &CollectionId, key: &[u8]) -> KeyTarget {
KeyTarget::new(coll.clone(), key.to_vec())
}
#[test]
fn single_writer_transaction_succeeds() {
let orders = collection("orders");
let mut catalog = ShardOwnershipCatalog::new();
catalog
.apply_update(range(&orders, 1, bounds(b"a", b"m"), "CN=node-a"))
.unwrap();
catalog
.apply_update(range(
&orders,
2,
RangeBounds::new(RangeBound::key(b"m"), RangeBound::Max).unwrap(),
"CN=node-a",
))
.unwrap();
let plan = catalog
.plan_write_transaction(&[target(&orders, b"alice"), target(&orders, b"zeb")])
.expect("single-writer transaction is admitted");
assert_eq!(plan.writer(), &ident("CN=node-a"));
let ids: Vec<u64> = plan.ranges().iter().map(|r| r.range_id().value()).collect();
assert_eq!(ids, vec![1, 2]);
assert_eq!(plan.ranges()[0].epoch(), OwnershipEpoch::initial());
}
#[test]
fn single_range_transaction_succeeds() {
let (catalog, orders) = two_range_catalog();
let plan = catalog
.plan_write_transaction(&[target(&orders, b"alice"), target(&orders, b"bob")])
.expect("single-range transaction is admitted");
assert_eq!(plan.writer(), &ident("CN=node-a"));
assert_eq!(plan.ranges().len(), 1);
assert_eq!(plan.ranges()[0].range_id(), RangeId::new(1));
}
#[test]
fn cross_range_write_transaction_is_rejected() {
let (catalog, orders) = two_range_catalog();
let err = catalog
.plan_write_transaction(&[target(&orders, b"alice"), target(&orders, b"zeb")])
.expect_err("cross-writer transaction is rejected");
match err {
WriteTransactionReject::CrossRange { writers } => {
assert_eq!(writers.len(), 2);
assert_eq!(writers[0].writer(), &ident("CN=node-a"));
assert_eq!(writers[1].writer(), &ident("CN=node-b"));
assert_eq!(writers[0].ranges()[0].range_id(), RangeId::new(1));
assert_eq!(writers[1].ranges()[0].range_id(), RangeId::new(2));
}
other => panic!("expected CrossRange, got {other:?}"),
}
}
#[test]
fn empty_write_transaction_is_rejected() {
let catalog = ShardOwnershipCatalog::new();
assert_eq!(
catalog.plan_write_transaction(&[]),
Err(WriteTransactionReject::Empty)
);
}
#[test]
fn unroutable_write_transaction_is_rejected() {
let catalog = ShardOwnershipCatalog::new();
let orders = collection("orders");
match catalog.plan_write_transaction(&[target(&orders, b"x")]) {
Err(WriteTransactionReject::Unroutable { collection, key }) => {
assert_eq!(collection, orders);
assert_eq!(key, b"x");
}
other => panic!("expected Unroutable, got {other:?}"),
}
}
#[test]
fn read_fanout_collects_per_owner_legs() {
let (catalog, orders) = two_range_catalog();
let fanout = catalog
.plan_read_fanout(&[
target(&orders, b"alice"),
target(&orders, b"zeb"),
target(&orders, b"bob"),
])
.expect("fanout planned");
assert!(fanout.is_cross_range());
assert_eq!(fanout.legs().len(), 2);
let a = &fanout.legs()[0];
assert_eq!(a.owner(), &ident("CN=node-a"));
assert_eq!(a.targets().len(), 2);
let b = &fanout.legs()[1];
assert_eq!(b.owner(), &ident("CN=node-b"));
assert_eq!(b.targets().len(), 1);
assert_eq!(b.targets()[0].key(), b"zeb");
}
#[test]
fn single_owner_read_is_not_cross_range() {
let (catalog, orders) = two_range_catalog();
let fanout = catalog
.plan_read_fanout(&[target(&orders, b"alice"), target(&orders, b"bob")])
.expect("fanout planned");
assert!(!fanout.is_cross_range());
assert_eq!(fanout.legs().len(), 1);
}
#[test]
fn unroutable_read_fanout_is_rejected() {
let catalog = ShardOwnershipCatalog::new();
let orders = collection("orders");
match catalog.plan_read_fanout(&[target(&orders, b"x")]) {
Err(ReadFanoutReject::Unroutable { collection, .. }) => {
assert_eq!(collection, orders)
}
other => panic!("expected Unroutable, got {other:?}"),
}
}
#[test]
fn consistent_read_without_snapshot_is_rejected() {
let (catalog, orders) = two_range_catalog();
assert_eq!(
catalog
.plan_consistent_read(&[target(&orders, b"alice"), target(&orders, b"zeb")], None),
Err(ConsistentReadReject::NoSafeSnapshot)
);
}
#[test]
fn consistent_read_with_incomplete_snapshot_is_rejected() {
let (catalog, orders) = two_range_catalog();
let snapshot = GlobalReadWatermark::new().with(
orders.clone(),
RangeId::new(1),
CommitWatermark::new(1, 100),
);
match catalog.plan_consistent_read(
&[target(&orders, b"alice"), target(&orders, b"zeb")],
Some(&snapshot),
) {
Err(ConsistentReadReject::WatermarkGap {
collection,
range_id,
}) => {
assert_eq!(collection, orders);
assert_eq!(range_id, RangeId::new(2));
}
other => panic!("expected WatermarkGap, got {other:?}"),
}
}
#[test]
fn consistent_read_with_full_snapshot_succeeds() {
let (catalog, orders) = two_range_catalog();
let snapshot = GlobalReadWatermark::new()
.with(
orders.clone(),
RangeId::new(1),
CommitWatermark::new(1, 100),
)
.with(
orders.clone(),
RangeId::new(2),
CommitWatermark::new(1, 250),
);
let plan = catalog
.plan_consistent_read(
&[target(&orders, b"alice"), target(&orders, b"zeb")],
Some(&snapshot),
)
.expect("consistent read planned");
assert_eq!(plan.legs().len(), 2);
let a = &plan.legs()[0];
assert_eq!(a.owner(), &ident("CN=node-a"));
assert_eq!(a.targets()[0].watermark(), CommitWatermark::new(1, 100));
let b = &plan.legs()[1];
assert_eq!(b.owner(), &ident("CN=node-b"));
assert_eq!(b.targets()[0].watermark(), CommitWatermark::new(1, 250));
}
#[test]
fn empty_consistent_read_is_rejected() {
let catalog = ShardOwnershipCatalog::new();
assert_eq!(
catalog.plan_consistent_read(&[], None),
Err(ConsistentReadReject::Empty)
);
}
#[test]
fn cross_range_rejection_message_names_writers() {
let (catalog, orders) = two_range_catalog();
let err = catalog
.plan_write_transaction(&[target(&orders, b"alice"), target(&orders, b"zeb")])
.unwrap_err();
let msg = err.to_string();
assert!(msg.contains("cross-range write transaction"));
assert!(msg.contains("CN=node-a"));
assert!(msg.contains("CN=node-b"));
}
}