1use super::identity::NodeIdentity;
36use super::ownership::{
37 CatalogVersion, CollectionId, OwnershipEpoch, RangeId, RangeOwnership, RangeRole,
38 ShardOwnershipCatalog,
39};
40
41pub const DEFAULT_MAX_FORWARD_PAYLOAD: usize = 1024 * 1024;
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum RequestOperation {
58 SafePointOp,
61 Transaction,
64 Streaming,
67 ExplicitlyUnsafe,
69}
70
71impl RequestOperation {
72 fn forwardable(self) -> Result<(), RedirectReason> {
76 match self {
77 RequestOperation::SafePointOp => Ok(()),
78 RequestOperation::Transaction => Err(RedirectReason::Transaction),
79 RequestOperation::Streaming => Err(RedirectReason::Streaming),
80 RequestOperation::ExplicitlyUnsafe => Err(RedirectReason::ExplicitlyUnsafe),
81 }
82 }
83}
84
85#[derive(Debug, Clone, PartialEq, Eq)]
89pub struct RoutedRequest {
90 collection: CollectionId,
91 key: Vec<u8>,
92 operation: RequestOperation,
93 payload_len: usize,
94}
95
96impl RoutedRequest {
97 pub fn new(
99 collection: CollectionId,
100 key: impl Into<Vec<u8>>,
101 operation: RequestOperation,
102 ) -> Self {
103 Self {
104 collection,
105 key: key.into(),
106 operation,
107 payload_len: 0,
108 }
109 }
110
111 pub fn with_payload_len(mut self, payload_len: usize) -> Self {
113 self.payload_len = payload_len;
114 self
115 }
116
117 pub fn collection(&self) -> &CollectionId {
118 &self.collection
119 }
120
121 pub fn key(&self) -> &[u8] {
122 &self.key
123 }
124
125 pub fn operation(&self) -> RequestOperation {
126 self.operation
127 }
128
129 pub fn payload_len(&self) -> usize {
130 self.payload_len
131 }
132}
133
134#[derive(Debug, Clone, Copy, PartialEq, Eq)]
142pub struct RoutingPolicy {
143 forwarding_enabled: bool,
144 max_forward_payload: usize,
145}
146
147impl RoutingPolicy {
148 pub fn forwarding() -> Self {
151 Self {
152 forwarding_enabled: true,
153 max_forward_payload: DEFAULT_MAX_FORWARD_PAYLOAD,
154 }
155 }
156
157 pub fn redirect_only() -> Self {
160 Self {
161 forwarding_enabled: false,
162 max_forward_payload: 0,
163 }
164 }
165
166 pub fn with_max_forward_payload(mut self, max_forward_payload: usize) -> Self {
168 self.max_forward_payload = max_forward_payload;
169 self
170 }
171
172 pub fn forwarding_enabled(&self) -> bool {
173 self.forwarding_enabled
174 }
175
176 pub fn max_forward_payload(&self) -> usize {
177 self.max_forward_payload
178 }
179}
180
181impl Default for RoutingPolicy {
182 fn default() -> Self {
183 Self::forwarding()
184 }
185}
186
187#[derive(Debug, Clone, PartialEq, Eq)]
197pub struct RoutingHint {
198 collection: CollectionId,
199 range_id: RangeId,
200 owner: NodeIdentity,
201 epoch: OwnershipEpoch,
202 version: CatalogVersion,
203}
204
205impl RoutingHint {
206 fn from_range(collection: &CollectionId, range: &RangeOwnership) -> Self {
207 Self {
208 collection: collection.clone(),
209 range_id: range.range_id(),
210 owner: range.owner().clone(),
211 epoch: range.epoch(),
212 version: range.version(),
213 }
214 }
215
216 pub fn collection(&self) -> &CollectionId {
217 &self.collection
218 }
219
220 pub fn range_id(&self) -> RangeId {
221 self.range_id
222 }
223
224 pub fn owner(&self) -> &NodeIdentity {
225 &self.owner
226 }
227
228 pub fn epoch(&self) -> OwnershipEpoch {
229 self.epoch
230 }
231
232 pub fn version(&self) -> CatalogVersion {
233 self.version
234 }
235}
236
237impl std::fmt::Display for RoutingHint {
238 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
239 write!(
240 f,
241 "{}/{} owned by {} at epoch {} (catalog version {})",
242 self.collection, self.range_id, self.owner, self.epoch, self.version
243 )
244 }
245}
246
247#[derive(Debug, Clone, Copy, PartialEq, Eq)]
254pub enum RedirectReason {
255 ForwardingDisabled,
258 Transaction,
260 Streaming,
262 LargePayload { len: usize, limit: usize },
265 ExplicitlyUnsafe,
267}
268
269impl std::fmt::Display for RedirectReason {
270 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
271 match self {
272 Self::ForwardingDisabled => write!(f, "forwarding not selected on this node"),
273 Self::Transaction => write!(f, "transactions must be opened on the owner"),
274 Self::Streaming => write!(f, "streaming operations must originate on the owner"),
275 Self::LargePayload { len, limit } => write!(
276 f,
277 "payload {len} bytes exceeds the {limit}-byte forward budget; send directly to the owner"
278 ),
279 Self::ExplicitlyUnsafe => {
280 write!(f, "operation explicitly marked unsafe to forward")
281 }
282 }
283 }
284}
285
286#[derive(Debug, Clone, PartialEq, Eq)]
288pub enum RouteDecision {
289 Local {
294 range_id: RangeId,
295 epoch: OwnershipEpoch,
296 },
297 Forward { hint: RoutingHint },
301 Redirect {
306 hint: RoutingHint,
307 reason: RedirectReason,
308 },
309 Unroutable { collection: CollectionId },
313}
314
315impl RouteDecision {
316 pub fn hint(&self) -> Option<&RoutingHint> {
318 match self {
319 RouteDecision::Forward { hint } | RouteDecision::Redirect { hint, .. } => Some(hint),
320 RouteDecision::Local { .. } | RouteDecision::Unroutable { .. } => None,
321 }
322 }
323
324 pub fn is_local(&self) -> bool {
326 matches!(self, RouteDecision::Local { .. })
327 }
328}
329
330impl ShardOwnershipCatalog {
331 pub fn plan_route(
349 &self,
350 local: &NodeIdentity,
351 request: &RoutedRequest,
352 policy: &RoutingPolicy,
353 ) -> RouteDecision {
354 let range = match self.route_shard_key(request.collection(), request.key()) {
355 Some(range) => range,
356 None => {
357 return RouteDecision::Unroutable {
358 collection: request.collection().clone(),
359 }
360 }
361 };
362
363 if range.role_of(local) == RangeRole::Owner {
364 return RouteDecision::Local {
365 range_id: range.range_id(),
366 epoch: range.epoch(),
367 };
368 }
369
370 let hint = RoutingHint::from_range(request.collection(), range);
371
372 if !policy.forwarding_enabled() {
375 return RouteDecision::Redirect {
376 hint,
377 reason: RedirectReason::ForwardingDisabled,
378 };
379 }
380 if let Err(reason) = request.operation().forwardable() {
381 return RouteDecision::Redirect { hint, reason };
382 }
383 if request.payload_len() > policy.max_forward_payload() {
384 return RouteDecision::Redirect {
385 hint,
386 reason: RedirectReason::LargePayload {
387 len: request.payload_len(),
388 limit: policy.max_forward_payload(),
389 },
390 };
391 }
392 RouteDecision::Forward { hint }
393 }
394}
395
396#[cfg(test)]
397mod tests {
398 use super::*;
399 use crate::cluster::ownership::{PlacementMetadata, RangeBound, RangeBounds, ShardKeyMode};
400
401 fn collection(name: &str) -> CollectionId {
402 CollectionId::new(name).unwrap()
403 }
404
405 fn ident(cn: &str) -> NodeIdentity {
406 NodeIdentity::from_certificate_subject(cn).unwrap()
407 }
408
409 fn range_with(coll: &CollectionId, id: u64, owner: &str, replicas: &[&str]) -> RangeOwnership {
411 RangeOwnership::establish(
412 coll.clone(),
413 RangeId::new(id),
414 ShardKeyMode::Hash,
415 RangeBounds::full(),
416 ident(owner),
417 replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
418 PlacementMetadata::with_replication_factor(3),
419 )
420 }
421
422 fn catalog_with(range: RangeOwnership) -> ShardOwnershipCatalog {
423 let mut catalog = ShardOwnershipCatalog::new();
424 catalog.apply_update(range).unwrap();
425 catalog
426 }
427
428 #[test]
431 fn owner_executes_locally() {
432 let orders = collection("orders");
433 let catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
434 let request =
435 RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::SafePointOp);
436
437 let decision =
438 catalog.plan_route(&ident("CN=node-a"), &request, &RoutingPolicy::forwarding());
439 assert_eq!(
440 decision,
441 RouteDecision::Local {
442 range_id: RangeId::new(1),
443 epoch: OwnershipEpoch::initial(),
444 }
445 );
446 assert!(decision.is_local());
447 assert!(decision.hint().is_none());
448 }
449
450 #[test]
453 fn any_node_resolves_owner_from_catalog() {
454 let orders = collection("orders");
455 let catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
456 let request =
457 RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::SafePointOp);
458
459 let decision =
461 catalog.plan_route(&ident("CN=node-c"), &request, &RoutingPolicy::forwarding());
462 let hint = decision.hint().expect("non-owner carries a hint");
463 assert_eq!(hint.owner(), &ident("CN=node-a"));
464 assert_eq!(hint.range_id(), RangeId::new(1));
465 assert_eq!(hint.epoch(), OwnershipEpoch::initial());
466 }
467
468 #[test]
470 fn safe_point_op_is_forwarded_from_non_owner() {
471 let orders = collection("orders");
472 let catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
473 let request =
474 RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::SafePointOp);
475
476 let decision =
478 catalog.plan_route(&ident("CN=node-b"), &request, &RoutingPolicy::forwarding());
479 match decision {
480 RouteDecision::Forward { hint } => {
481 assert_eq!(hint.owner(), &ident("CN=node-a"));
482 assert_eq!(hint.epoch(), OwnershipEpoch::initial());
483 }
484 other => panic!("expected Forward, got {other:?}"),
485 }
486 }
487
488 #[test]
491 fn forwarded_write_still_passes_owner_public_gate() {
492 let orders = collection("orders");
493 let catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
494 let request =
495 RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::SafePointOp);
496
497 let hint =
498 match catalog.plan_route(&ident("CN=node-b"), &request, &RoutingPolicy::forwarding()) {
499 RouteDecision::Forward { hint } => hint,
500 other => panic!("expected Forward, got {other:?}"),
501 };
502 let admitted = catalog
504 .admit_public_write(&ident("CN=node-a"), &orders, b"k", hint.epoch())
505 .expect("owner admits the forwarded write at the current epoch");
506 assert_eq!(admitted.owner(), &ident("CN=node-a"));
507 }
508
509 #[test]
511 fn transaction_from_non_owner_is_redirected() {
512 let orders = collection("orders");
513 let catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
514 let request =
515 RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::Transaction);
516
517 let decision =
518 catalog.plan_route(&ident("CN=node-b"), &request, &RoutingPolicy::forwarding());
519 match decision {
520 RouteDecision::Redirect { hint, reason } => {
521 assert_eq!(reason, RedirectReason::Transaction);
522 assert_eq!(hint.owner(), &ident("CN=node-a"));
523 }
524 other => panic!("expected Redirect(Transaction), got {other:?}"),
525 }
526 }
527
528 #[test]
530 fn streaming_from_non_owner_is_redirected() {
531 let orders = collection("orders");
532 let catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
533 let request =
534 RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::Streaming);
535
536 match catalog.plan_route(&ident("CN=node-b"), &request, &RoutingPolicy::forwarding()) {
537 RouteDecision::Redirect { reason, .. } => assert_eq!(reason, RedirectReason::Streaming),
538 other => panic!("expected Redirect(Streaming), got {other:?}"),
539 }
540 }
541
542 #[test]
544 fn explicitly_unsafe_op_is_redirected() {
545 let orders = collection("orders");
546 let catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
547 let request = RoutedRequest::new(
548 orders.clone(),
549 b"k".to_vec(),
550 RequestOperation::ExplicitlyUnsafe,
551 );
552
553 match catalog.plan_route(&ident("CN=node-b"), &request, &RoutingPolicy::forwarding()) {
554 RouteDecision::Redirect { reason, .. } => {
555 assert_eq!(reason, RedirectReason::ExplicitlyUnsafe)
556 }
557 other => panic!("expected Redirect(ExplicitlyUnsafe), got {other:?}"),
558 }
559 }
560
561 #[test]
564 fn large_payload_is_redirected_not_forwarded() {
565 let orders = collection("orders");
566 let catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
567 let policy = RoutingPolicy::forwarding().with_max_forward_payload(64);
568 let request =
569 RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::SafePointOp)
570 .with_payload_len(65);
571
572 match catalog.plan_route(&ident("CN=node-b"), &request, &policy) {
573 RouteDecision::Redirect { reason, .. } => {
574 assert_eq!(reason, RedirectReason::LargePayload { len: 65, limit: 64 })
575 }
576 other => panic!("expected Redirect(LargePayload), got {other:?}"),
577 }
578 let at_budget =
580 RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::SafePointOp)
581 .with_payload_len(64);
582 assert!(matches!(
583 catalog.plan_route(&ident("CN=node-b"), &at_budget, &policy),
584 RouteDecision::Forward { .. }
585 ));
586 }
587
588 #[test]
591 fn redirect_only_policy_redirects_safe_op() {
592 let orders = collection("orders");
593 let catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
594 let request =
595 RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::SafePointOp);
596
597 match catalog.plan_route(
598 &ident("CN=node-b"),
599 &request,
600 &RoutingPolicy::redirect_only(),
601 ) {
602 RouteDecision::Redirect { hint, reason } => {
603 assert_eq!(reason, RedirectReason::ForwardingDisabled);
604 assert_eq!(hint.owner(), &ident("CN=node-a"));
605 assert_eq!(hint.epoch(), OwnershipEpoch::initial());
606 }
607 other => panic!("expected Redirect(ForwardingDisabled), got {other:?}"),
608 }
609 }
610
611 #[test]
613 fn key_with_no_range_is_unroutable() {
614 let catalog = ShardOwnershipCatalog::new();
615 let orders = collection("orders");
616 let request =
617 RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::SafePointOp);
618
619 let decision =
620 catalog.plan_route(&ident("CN=node-a"), &request, &RoutingPolicy::forwarding());
621 assert_eq!(decision, RouteDecision::Unroutable { collection: orders });
622 assert!(decision.hint().is_none());
623 }
624
625 #[test]
629 fn stale_ownership_redirects_then_retry_succeeds() {
630 let orders = collection("orders");
631
632 let mut catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
634
635 let v1 = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
638 let v2 = v1.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]);
639 catalog.apply_update(v2).unwrap();
640
641 let request =
645 RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::Transaction);
646 let redirect =
647 catalog.plan_route(&ident("CN=node-a"), &request, &RoutingPolicy::forwarding());
648 let hint = match redirect {
649 RouteDecision::Redirect { hint, reason } => {
650 assert_eq!(reason, RedirectReason::Transaction);
651 hint
652 }
653 other => panic!("expected Redirect, got {other:?}"),
654 };
655 assert_eq!(hint.owner(), &ident("CN=node-b"));
656 assert_eq!(hint.epoch().value(), 2);
657 assert!(hint.epoch() > OwnershipEpoch::initial());
658
659 let retry = catalog.plan_route(hint.owner(), &request, &RoutingPolicy::forwarding());
661 assert_eq!(
662 retry,
663 RouteDecision::Local {
664 range_id: RangeId::new(1),
665 epoch: hint.epoch(),
666 }
667 );
668 }
669
670 #[test]
673 fn safe_op_forward_targets_current_owner_after_transfer() {
674 let orders = collection("orders");
675 let mut catalog = catalog_with(range_with(&orders, 1, "CN=node-a", &["CN=node-b"]));
676 let v1 = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
677 catalog
678 .apply_update(v1.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]))
679 .unwrap();
680
681 let request =
682 RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::SafePointOp);
683 match catalog.plan_route(&ident("CN=node-a"), &request, &RoutingPolicy::forwarding()) {
684 RouteDecision::Forward { hint } => {
685 assert_eq!(hint.owner(), &ident("CN=node-b"));
686 assert_eq!(hint.epoch().value(), 2);
687 }
688 other => panic!("expected Forward to new owner, got {other:?}"),
689 }
690 }
691
692 #[test]
693 fn routing_hint_display_names_owner_and_epoch() {
694 let orders = collection("orders");
695 let catalog = catalog_with(range_with(&orders, 4, "CN=node-a", &["CN=node-b"]));
696 let request =
697 RoutedRequest::new(orders.clone(), b"k".to_vec(), RequestOperation::Transaction);
698 let hint = catalog
699 .plan_route(&ident("CN=node-b"), &request, &RoutingPolicy::forwarding())
700 .hint()
701 .cloned()
702 .expect("redirect carries a hint");
703 let rendered = hint.to_string();
704 assert!(rendered.contains("CN=node-a"));
705 assert!(rendered.contains("epoch 1"));
706 }
707}