use super::identity::NodeIdentity;
use super::ownership::{
CatalogVersion, CollectionId, OwnershipEpoch, RangeId, RangeOwnership, RangeRole,
ShardOwnershipCatalog,
};
pub const DEFAULT_MAX_FORWARD_PAYLOAD: usize = 1024 * 1024;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RequestOperation {
SafePointOp,
Transaction,
Streaming,
ExplicitlyUnsafe,
}
impl RequestOperation {
fn forwardable(self) -> Result<(), RedirectReason> {
match self {
RequestOperation::SafePointOp => Ok(()),
RequestOperation::Transaction => Err(RedirectReason::Transaction),
RequestOperation::Streaming => Err(RedirectReason::Streaming),
RequestOperation::ExplicitlyUnsafe => Err(RedirectReason::ExplicitlyUnsafe),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RoutedRequest {
collection: CollectionId,
key: Vec<u8>,
operation: RequestOperation,
payload_len: usize,
}
impl RoutedRequest {
pub fn new(
collection: CollectionId,
key: impl Into<Vec<u8>>,
operation: RequestOperation,
) -> Self {
Self {
collection,
key: key.into(),
operation,
payload_len: 0,
}
}
pub fn with_payload_len(mut self, payload_len: usize) -> Self {
self.payload_len = payload_len;
self
}
pub fn collection(&self) -> &CollectionId {
&self.collection
}
pub fn key(&self) -> &[u8] {
&self.key
}
pub fn operation(&self) -> RequestOperation {
self.operation
}
pub fn payload_len(&self) -> usize {
self.payload_len
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RoutingPolicy {
forwarding_enabled: bool,
max_forward_payload: usize,
}
impl RoutingPolicy {
pub fn forwarding() -> Self {
Self {
forwarding_enabled: true,
max_forward_payload: DEFAULT_MAX_FORWARD_PAYLOAD,
}
}
pub fn redirect_only() -> Self {
Self {
forwarding_enabled: false,
max_forward_payload: 0,
}
}
pub fn with_max_forward_payload(mut self, max_forward_payload: usize) -> Self {
self.max_forward_payload = max_forward_payload;
self
}
pub fn forwarding_enabled(&self) -> bool {
self.forwarding_enabled
}
pub fn max_forward_payload(&self) -> usize {
self.max_forward_payload
}
}
impl Default for RoutingPolicy {
fn default() -> Self {
Self::forwarding()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RoutingHint {
collection: CollectionId,
range_id: RangeId,
owner: NodeIdentity,
epoch: OwnershipEpoch,
version: CatalogVersion,
}
impl RoutingHint {
fn from_range(collection: &CollectionId, range: &RangeOwnership) -> Self {
Self {
collection: collection.clone(),
range_id: range.range_id(),
owner: range.owner().clone(),
epoch: range.epoch(),
version: range.version(),
}
}
pub fn collection(&self) -> &CollectionId {
&self.collection
}
pub fn range_id(&self) -> RangeId {
self.range_id
}
pub fn owner(&self) -> &NodeIdentity {
&self.owner
}
pub fn epoch(&self) -> OwnershipEpoch {
self.epoch
}
pub fn version(&self) -> CatalogVersion {
self.version
}
}
impl std::fmt::Display for RoutingHint {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}/{} owned by {} at epoch {} (catalog version {})",
self.collection, self.range_id, self.owner, self.epoch, self.version
)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RedirectReason {
ForwardingDisabled,
Transaction,
Streaming,
LargePayload { len: usize, limit: usize },
ExplicitlyUnsafe,
}
impl std::fmt::Display for RedirectReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ForwardingDisabled => write!(f, "forwarding not selected on this node"),
Self::Transaction => write!(f, "transactions must be opened on the owner"),
Self::Streaming => write!(f, "streaming operations must originate on the owner"),
Self::LargePayload { len, limit } => write!(
f,
"payload {len} bytes exceeds the {limit}-byte forward budget; send directly to the owner"
),
Self::ExplicitlyUnsafe => {
write!(f, "operation explicitly marked unsafe to forward")
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RouteDecision {
Local {
range_id: RangeId,
epoch: OwnershipEpoch,
},
Forward { hint: RoutingHint },
Redirect {
hint: RoutingHint,
reason: RedirectReason,
},
Unroutable { collection: CollectionId },
}
impl RouteDecision {
pub fn hint(&self) -> Option<&RoutingHint> {
match self {
RouteDecision::Forward { hint } | RouteDecision::Redirect { hint, .. } => Some(hint),
RouteDecision::Local { .. } | RouteDecision::Unroutable { .. } => None,
}
}
pub fn is_local(&self) -> bool {
matches!(self, RouteDecision::Local { .. })
}
}
impl ShardOwnershipCatalog {
pub fn plan_route(
&self,
local: &NodeIdentity,
request: &RoutedRequest,
policy: &RoutingPolicy,
) -> RouteDecision {
let range = match self.route_shard_key(request.collection(), request.key()) {
Some(range) => range,
None => {
return RouteDecision::Unroutable {
collection: request.collection().clone(),
}
}
};
if range.role_of(local) == RangeRole::Owner {
return RouteDecision::Local {
range_id: range.range_id(),
epoch: range.epoch(),
};
}
let hint = RoutingHint::from_range(request.collection(), range);
if !policy.forwarding_enabled() {
return RouteDecision::Redirect {
hint,
reason: RedirectReason::ForwardingDisabled,
};
}
if let Err(reason) = request.operation().forwardable() {
return RouteDecision::Redirect { hint, reason };
}
if request.payload_len() > policy.max_forward_payload() {
return RouteDecision::Redirect {
hint,
reason: RedirectReason::LargePayload {
len: request.payload_len(),
limit: policy.max_forward_payload(),
},
};
}
RouteDecision::Forward { hint }
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cluster::ownership::{PlacementMetadata, RangeBound, RangeBounds, ShardKeyMode};
fn collection(name: &str) -> CollectionId {
CollectionId::new(name).unwrap()
}
fn ident(cn: &str) -> NodeIdentity {
NodeIdentity::from_certificate_subject(cn).unwrap()
}
fn range_with(coll: &CollectionId, id: u64, owner: &str, replicas: &[&str]) -> RangeOwnership {
RangeOwnership::establish(
coll.clone(),
RangeId::new(id),
ShardKeyMode::Hash,
RangeBounds::full(),
ident(owner),
replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
PlacementMetadata::with_replication_factor(3),
)
}
fn catalog_with(range: RangeOwnership) -> ShardOwnershipCatalog {
let mut catalog = ShardOwnershipCatalog::new();
catalog.apply_update(range).unwrap();
catalog
}
#[test]
fn owner_executes_locally() {
let orders = collection("orders");
let catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
let request =
RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::SafePointOp);
let decision =
catalog.plan_route(&ident("CN=node-a"), &request, &RoutingPolicy::forwarding());
assert_eq!(
decision,
RouteDecision::Local {
range_id: RangeId::new(1),
epoch: OwnershipEpoch::initial(),
}
);
assert!(decision.is_local());
assert!(decision.hint().is_none());
}
#[test]
fn any_node_resolves_owner_from_catalog() {
let orders = collection("orders");
let catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
let request =
RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::SafePointOp);
let decision =
catalog.plan_route(&ident("CN=node-c"), &request, &RoutingPolicy::forwarding());
let hint = decision.hint().expect("non-owner carries a hint");
assert_eq!(hint.owner(), &ident("CN=node-a"));
assert_eq!(hint.range_id(), RangeId::new(1));
assert_eq!(hint.epoch(), OwnershipEpoch::initial());
}
#[test]
fn safe_point_op_is_forwarded_from_non_owner() {
let orders = collection("orders");
let catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
let request =
RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::SafePointOp);
let decision =
catalog.plan_route(&ident("CN=node-b"), &request, &RoutingPolicy::forwarding());
match decision {
RouteDecision::Forward { hint } => {
assert_eq!(hint.owner(), &ident("CN=node-a"));
assert_eq!(hint.epoch(), OwnershipEpoch::initial());
}
other => panic!("expected Forward, got {other:?}"),
}
}
#[test]
fn forwarded_write_still_passes_owner_public_gate() {
let orders = collection("orders");
let catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
let request =
RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::SafePointOp);
let hint =
match catalog.plan_route(&ident("CN=node-b"), &request, &RoutingPolicy::forwarding()) {
RouteDecision::Forward { hint } => hint,
other => panic!("expected Forward, got {other:?}"),
};
let admitted = catalog
.admit_public_write(&ident("CN=node-a"), &orders, b"k", hint.epoch())
.expect("owner admits the forwarded write at the current epoch");
assert_eq!(admitted.owner(), &ident("CN=node-a"));
}
#[test]
fn transaction_from_non_owner_is_redirected() {
let orders = collection("orders");
let catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
let request =
RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::Transaction);
let decision =
catalog.plan_route(&ident("CN=node-b"), &request, &RoutingPolicy::forwarding());
match decision {
RouteDecision::Redirect { hint, reason } => {
assert_eq!(reason, RedirectReason::Transaction);
assert_eq!(hint.owner(), &ident("CN=node-a"));
}
other => panic!("expected Redirect(Transaction), got {other:?}"),
}
}
#[test]
fn streaming_from_non_owner_is_redirected() {
let orders = collection("orders");
let catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
let request =
RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::Streaming);
match catalog.plan_route(&ident("CN=node-b"), &request, &RoutingPolicy::forwarding()) {
RouteDecision::Redirect { reason, .. } => assert_eq!(reason, RedirectReason::Streaming),
other => panic!("expected Redirect(Streaming), got {other:?}"),
}
}
#[test]
fn explicitly_unsafe_op_is_redirected() {
let orders = collection("orders");
let catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
let request = RoutedRequest::new(
orders.clone(),
b"k".to_vec(),
RequestOperation::ExplicitlyUnsafe,
);
match catalog.plan_route(&ident("CN=node-b"), &request, &RoutingPolicy::forwarding()) {
RouteDecision::Redirect { reason, .. } => {
assert_eq!(reason, RedirectReason::ExplicitlyUnsafe)
}
other => panic!("expected Redirect(ExplicitlyUnsafe), got {other:?}"),
}
}
#[test]
fn large_payload_is_redirected_not_forwarded() {
let orders = collection("orders");
let catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
let policy = RoutingPolicy::forwarding().with_max_forward_payload(64);
let request =
RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::SafePointOp)
.with_payload_len(65);
match catalog.plan_route(&ident("CN=node-b"), &request, &policy) {
RouteDecision::Redirect { reason, .. } => {
assert_eq!(reason, RedirectReason::LargePayload { len: 65, limit: 64 })
}
other => panic!("expected Redirect(LargePayload), got {other:?}"),
}
let at_budget =
RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::SafePointOp)
.with_payload_len(64);
assert!(matches!(
catalog.plan_route(&ident("CN=node-b"), &at_budget, &policy),
RouteDecision::Forward { .. }
));
}
#[test]
fn redirect_only_policy_redirects_safe_op() {
let orders = collection("orders");
let catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
let request =
RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::SafePointOp);
match catalog.plan_route(
&ident("CN=node-b"),
&request,
&RoutingPolicy::redirect_only(),
) {
RouteDecision::Redirect { hint, reason } => {
assert_eq!(reason, RedirectReason::ForwardingDisabled);
assert_eq!(hint.owner(), &ident("CN=node-a"));
assert_eq!(hint.epoch(), OwnershipEpoch::initial());
}
other => panic!("expected Redirect(ForwardingDisabled), got {other:?}"),
}
}
#[test]
fn key_with_no_range_is_unroutable() {
let catalog = ShardOwnershipCatalog::new();
let orders = collection("orders");
let request =
RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::SafePointOp);
let decision =
catalog.plan_route(&ident("CN=node-a"), &request, &RoutingPolicy::forwarding());
assert_eq!(decision, RouteDecision::Unroutable { collection: orders });
assert!(decision.hint().is_none());
}
#[test]
fn stale_ownership_redirects_then_retry_succeeds() {
let orders = collection("orders");
let mut catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
let v1 = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
let v2 = v1.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]);
catalog.apply_update(v2).unwrap();
let request =
RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::Transaction);
let redirect =
catalog.plan_route(&ident("CN=node-a"), &request, &RoutingPolicy::forwarding());
let hint = match redirect {
RouteDecision::Redirect { hint, reason } => {
assert_eq!(reason, RedirectReason::Transaction);
hint
}
other => panic!("expected Redirect, got {other:?}"),
};
assert_eq!(hint.owner(), &ident("CN=node-b"));
assert_eq!(hint.epoch().value(), 2);
assert!(hint.epoch() > OwnershipEpoch::initial());
let retry = catalog.plan_route(hint.owner(), &request, &RoutingPolicy::forwarding());
assert_eq!(
retry,
RouteDecision::Local {
range_id: RangeId::new(1),
epoch: hint.epoch(),
}
);
}
#[test]
fn safe_op_forward_targets_current_owner_after_transfer() {
let orders = collection("orders");
let mut catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
let v1 = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
catalog
.apply_update(v1.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
.unwrap();
let request =
RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::SafePointOp);
match catalog.plan_route(&ident("CN=node-a"), &request, &RoutingPolicy::forwarding()) {
RouteDecision::Forward { hint } => {
assert_eq!(hint.owner(), &ident("CN=node-b"));
assert_eq!(hint.epoch().value(), 2);
}
other => panic!("expected Forward to new owner, got {other:?}"),
}
}
#[test]
fn routing_hint_display_names_owner_and_epoch() {
let orders = collection("orders");
let catalog = catalog_with(range_with(&orders, 4, "CN=node-a", &["CN=node-b"]));
let request =
RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::Transaction);
let hint = catalog
.plan_route(&ident("CN=node-b"), &request, &RoutingPolicy::forwarding())
.hint()
.cloned()
.expect("redirect carries a hint");
let rendered = hint.to_string();
assert!(rendered.contains("CN=node-a"));
assert!(rendered.contains("epoch 1"));
}
}