1use std::sync::atomic::{AtomicU64, Ordering};
46use std::sync::Arc;
47
48use tokio::sync::mpsc;
49
50use crate::cluster::pool::ServerPool;
51use crate::cluster::snitch::{rack_distance, RackDistance};
52use crate::cluster::vnode;
53use crate::conf::HashType as ConfHashType;
54use crate::hashkit::{self, HashType};
55use crate::io::mbuf::MbufPool;
56use crate::msg::{ConsistencyLevel, Msg, MsgRouting, MsgType};
57use crate::net::dispatcher::{DispatchOutcome, Dispatcher, OutboundEnvelope, ServerSink};
58use crate::net::server::OutboundRequest;
59
60#[must_use]
74pub fn distribution_shadow_disagreement_total() -> u64 {
75 SHADOW_DISAGREEMENTS.load(Ordering::Relaxed)
76}
77
78pub fn reset_distribution_shadow_disagreement_total() {
89 SHADOW_DISAGREEMENTS.store(0, Ordering::Relaxed);
90}
91
92static SHADOW_DISAGREEMENTS: AtomicU64 = AtomicU64::new(0);
93
94fn bump_shadow_disagreement() {
95 SHADOW_DISAGREEMENTS.fetch_add(1, Ordering::Relaxed);
96}
97
98fn enter_plan_span(
104 req_id: u64,
105 plan: &DispatchPlan,
106) -> (tracing::Span, tracing::span::EnteredSpan) {
107 let req_span = tracing::Span::current();
108 let kind: &'static str = match plan {
109 DispatchPlan::Drop => "drop",
110 DispatchPlan::NoTargets => "no_targets",
111 DispatchPlan::LocalDatastore => "local_datastore",
112 DispatchPlan::Replicas { .. } => "replicas",
113 };
114 let targets = match plan {
115 DispatchPlan::Replicas { targets, .. } => targets.len(),
116 _ => 0,
117 };
118 let span = tracing::info_span!("dispatch.plan", req_id, plan = kind, targets,).entered();
119 (req_span, span)
120}
121
122fn map_hash(h: ConfHashType) -> HashType {
123 match h {
124 ConfHashType::OneAtATime => HashType::OneAtATime,
125 ConfHashType::Md5 => HashType::Md5,
126 ConfHashType::Crc16 => HashType::Crc16,
127 ConfHashType::Crc32 => HashType::Crc32,
128 ConfHashType::Crc32a => HashType::Crc32a,
129 ConfHashType::Fnv1_64 => HashType::Fnv1_64,
130 ConfHashType::Fnv1a64 => HashType::Fnv1a_64,
131 ConfHashType::Fnv1_32 => HashType::Fnv1_32,
132 ConfHashType::Fnv1a32 => HashType::Fnv1a_32,
133 ConfHashType::Hsieh => HashType::Hsieh,
134 ConfHashType::Murmur => HashType::Murmur,
135 ConfHashType::Jenkins => HashType::Jenkins,
136 ConfHashType::Murmur3 => HashType::Murmur3,
137 ConfHashType::Murmur3X64_64 => HashType::Murmur3X64_64,
138 }
139}
140
141#[derive(Clone, Debug, Eq, PartialEq)]
143pub struct ReplicaTarget {
144 pub peer_idx: u32,
146 pub dc: String,
148 pub rack: String,
150 pub is_local: bool,
152}
153
154#[derive(Clone, Debug, Eq, PartialEq)]
162pub enum DispatchPlan {
163 LocalDatastore,
165 Replicas {
171 targets: Vec<ReplicaTarget>,
173 consistency: ConsistencyLevel,
175 },
176 NoTargets,
179 Drop,
181}
182
183#[derive(Debug, Clone)]
185pub struct ClusterDispatcher {
186 pool: Arc<ServerPool>,
187 backend: Option<mpsc::Sender<OutboundRequest>>,
194 peer_backends: std::collections::HashMap<u32, mpsc::Sender<OutboundRequest>>,
202 mbuf_pool: MbufPool,
207 hint_store: Option<Arc<crate::cluster::hints::HintStore>>,
214 failure_metrics: Option<Arc<crate::stats::FailureMetrics>>,
220 command_extension: Option<Arc<dyn crate::embed::CommandExtension>>,
231}
232
233impl ClusterDispatcher {
234 #[must_use]
254 pub fn new(pool: Arc<ServerPool>) -> Self {
255 Self {
256 pool,
257 backend: None,
258 peer_backends: std::collections::HashMap::new(),
259 mbuf_pool: MbufPool::default(),
260 hint_store: None,
261 failure_metrics: None,
262 command_extension: None,
263 }
264 }
265
266 #[must_use]
288 pub fn with_mbuf_pool(mut self, pool: MbufPool) -> Self {
289 self.mbuf_pool = pool;
290 self
291 }
292
293 #[must_use]
297 pub fn mbuf_pool(&self) -> &MbufPool {
298 &self.mbuf_pool
299 }
300
301 #[must_use]
310 pub fn with_backend(mut self, backend: mpsc::Sender<OutboundRequest>) -> Self {
311 self.backend = Some(backend);
312 self
313 }
314
315 #[must_use]
327 pub fn with_peer_backend(
328 mut self,
329 peer_idx: u32,
330 sender: mpsc::Sender<OutboundRequest>,
331 ) -> Self {
332 self.peer_backends.insert(peer_idx, sender);
333 self
334 }
335
336 #[must_use]
338 pub fn has_backend(&self) -> bool {
339 self.backend.is_some()
340 }
341
342 #[must_use]
344 pub fn peer_backend_count(&self) -> usize {
345 self.peer_backends.len()
346 }
347
348 #[must_use]
350 pub fn pool(&self) -> &Arc<ServerPool> {
351 &self.pool
352 }
353
354 #[must_use]
387 pub fn with_hint_store(mut self, store: Arc<crate::cluster::hints::HintStore>) -> Self {
388 self.hint_store = Some(store);
389 self
390 }
391
392 #[must_use]
394 pub fn hint_store(&self) -> Option<&Arc<crate::cluster::hints::HintStore>> {
395 self.hint_store.as_ref()
396 }
397
398 #[must_use]
428 pub fn with_failure_metrics(mut self, metrics: Arc<crate::stats::FailureMetrics>) -> Self {
429 self.failure_metrics = Some(metrics);
430 self
431 }
432
433 #[must_use]
435 pub fn failure_metrics(&self) -> Option<&Arc<crate::stats::FailureMetrics>> {
436 self.failure_metrics.as_ref()
437 }
438
439 #[must_use]
482 pub fn with_command_extension(mut self, ext: Arc<dyn crate::embed::CommandExtension>) -> Self {
483 self.command_extension = Some(ext);
484 self
485 }
486
487 #[must_use]
489 pub fn command_extension(&self) -> Option<&Arc<dyn crate::embed::CommandExtension>> {
490 self.command_extension.as_ref()
491 }
492
493 #[must_use]
496 pub fn hinted_handoff_active(&self) -> bool {
497 self.hint_store.is_some() && self.pool.config().enable_hinted_handoff
498 }
499
500 #[must_use]
515 pub fn plan(&self, req: &Msg, key: &[u8]) -> DispatchPlan {
516 let cfg = self.pool.config();
517 let peers = self.pool.peers().read();
518 if peers.is_empty() {
519 self.record_no_targets_metric(cfg, ConsistencyLevel::default());
520 return DispatchPlan::NoTargets;
521 }
522 if matches!(req.routing(), MsgRouting::LocalNodeOnly) {
523 return DispatchPlan::LocalDatastore;
524 }
525 if key.is_empty() {
526 return DispatchPlan::LocalDatastore;
527 }
528 let token = hashkit::hash(map_hash(cfg.hash), key);
529 let key_hash64 = hashkit::hash64(map_hash(cfg.hash), key);
530 let bucket = crate::proto::redis::bucket_name(key);
531 let bucket_type = cfg.resolve_bucket_type(bucket);
532 let is_read = matches!(req.ty(), MsgType::Unknown) || req.flags().is_read;
533 let consistency = match (bucket_type, is_read) {
534 (Some(bt), true) => bt.read_consistency,
535 (Some(bt), false) => bt.write_consistency,
536 (None, true) => cfg.read_consistency,
537 (None, false) => cfg.write_consistency,
538 };
539 let n_val_cap = bucket_type.map_or(0, |bt| bt.n_val);
540 let dcs = self.pool.datacenters().read();
541 let include_down = self.hinted_handoff_active() && !is_read;
550 let routable = collect_routable(
551 &dcs,
552 &peers,
553 &token,
554 key_hash64,
555 cfg.distribution,
556 include_down,
557 );
558 if let Some(shadow) = cfg.distribution_shadow {
559 if shadow != cfg.distribution {
560 let shadow_routable =
561 collect_routable(&dcs, &peers, &token, key_hash64, shadow, include_down);
562 if !plans_agree(&routable, &shadow_routable) {
563 bump_shadow_disagreement();
564 tracing::debug!(
565 target: "dynomite::dispatch::shadow",
566 live = cfg.distribution.as_str(),
567 shadow = shadow.as_str(),
568 "shadow distribution disagreed on key route"
569 );
570 }
571 }
572 }
573 if routable.is_empty() {
574 self.record_no_targets_metric(cfg, consistency);
575 return DispatchPlan::NoTargets;
576 }
577 let (local, remote): (Vec<_>, Vec<_>) = routable
578 .into_iter()
579 .partition(|(dc_idx, _, _)| dcs[*dc_idx].name() == cfg.dc);
580 let plan =
581 plan_with_consistency(cfg, &dcs, &peers, consistency, req.routing(), local, remote);
582 let plan = cap_replicas(plan, n_val_cap);
583 if matches!(plan, DispatchPlan::NoTargets) {
584 self.record_no_targets_metric(cfg, consistency);
585 }
586 plan
587 }
588
589 fn record_no_targets_metric(
592 &self,
593 cfg: &crate::cluster::pool::PoolConfig,
594 consistency: ConsistencyLevel,
595 ) {
596 if let Some(m) = self.failure_metrics.as_ref() {
597 m.record_no_targets(&cfg.dc, &cfg.rack, consistency);
598 }
599 }
600
601 fn peer_dc_label(&self, peer_idx: u32) -> String {
605 let peers = self.pool.peers().read();
606 peers
607 .get(peer_idx as usize)
608 .map_or_else(|| self.pool.config().dc.clone(), |p| p.dc().to_string())
609 }
610}
611
612fn cap_replicas(plan: DispatchPlan, cap: u8) -> DispatchPlan {
618 if cap == 0 {
619 return plan;
620 }
621 let cap = cap as usize;
622 match plan {
623 DispatchPlan::Replicas {
624 mut targets,
625 consistency,
626 } if targets.len() > cap => {
627 targets.truncate(cap);
628 DispatchPlan::Replicas {
629 targets,
630 consistency,
631 }
632 }
633 other => other,
634 }
635}
636
637fn plans_agree(a: &[(usize, usize, u32)], b: &[(usize, usize, u32)]) -> bool {
638 if a.len() != b.len() {
639 return false;
640 }
641 let mut a_idx: Vec<u32> = a.iter().map(|t| t.2).collect();
642 let mut b_idx: Vec<u32> = b.iter().map(|t| t.2).collect();
643 a_idx.sort_unstable();
644 b_idx.sort_unstable();
645 a_idx == b_idx
646}
647
648fn collect_routable(
649 dcs: &[crate::cluster::Datacenter],
650 peers: &[crate::cluster::peer::Peer],
651 token: &crate::hashkit::DynToken,
652 hash64: u64,
653 distribution: crate::conf::Distribution,
654 include_down: bool,
655) -> Vec<(usize, usize, u32)> {
656 let mut routable: Vec<(usize, usize, u32)> = Vec::new();
657 for (dc_idx, dc) in dcs.iter().enumerate() {
658 for (rack_idx, rack) in dc.racks().iter().enumerate() {
659 let candidate = match (distribution, rack.random_slices()) {
660 (crate::conf::Distribution::RandomSlicing, Some(slices)) => {
661 slices.claimant_for(hash64).and_then(|name| {
668 peers.iter().find_map(|p| {
669 if p.dc() == dc.name()
670 && p.rack() == rack.name()
671 && p.endpoint().pname() == name
672 {
673 Some(p.idx())
674 } else {
675 None
676 }
677 })
678 })
679 }
680 _ => vnode::dispatch(rack.continuums(), token),
681 };
682 if let Some(peer_idx) = candidate {
683 if let Some(peer) = peers.get(peer_idx as usize) {
684 let state = peer.state();
685 let accept = state.is_routable()
686 || (include_down && matches!(state, crate::cluster::peer::PeerState::Down));
687 if accept {
688 routable.push((dc_idx, rack_idx, peer_idx));
689 }
690 }
691 }
692 }
693 }
694 routable
695}
696
697fn build_target(
698 dcs: &[crate::cluster::Datacenter],
699 peers: &[crate::cluster::peer::Peer],
700 dc_idx: usize,
701 rack_idx: usize,
702 peer_idx: u32,
703) -> ReplicaTarget {
704 let dc_name = dcs[dc_idx].name().to_string();
705 let rack_name = dcs[dc_idx].racks()[rack_idx].name().to_string();
706 let is_local = peers
707 .get(peer_idx as usize)
708 .is_some_and(crate::cluster::peer::Peer::is_local);
709 ReplicaTarget {
710 peer_idx,
711 dc: dc_name,
712 rack: rack_name,
713 is_local,
714 }
715}
716
717fn plan_with_consistency(
718 cfg: &crate::cluster::pool::PoolConfig,
719 dcs: &[crate::cluster::Datacenter],
720 peers: &[crate::cluster::peer::Peer],
721 consistency: ConsistencyLevel,
722 routing: MsgRouting,
723 local: Vec<(usize, usize, u32)>,
724 remote: Vec<(usize, usize, u32)>,
725) -> DispatchPlan {
726 let want_per_dc_fanout = matches!(consistency, ConsistencyLevel::DcEachSafeQuorum)
727 || matches!(routing, MsgRouting::AllNodesAllRacksAllDcs);
728 let mut targets: Vec<ReplicaTarget> = Vec::new();
729 match consistency {
730 ConsistencyLevel::DcOne => {
731 if local.is_empty() {
732 return DispatchPlan::NoTargets;
733 }
734 let mut best: Option<(RackDistance, (usize, usize, u32))> = None;
735 for (dc_idx, rack_idx, peer_idx) in local {
736 let rack_name = dcs[dc_idx].racks()[rack_idx].name();
737 let d = rack_distance(&cfg.dc, &cfg.rack, &cfg.dc, rack_name);
738 let take = match best {
739 None => true,
740 Some((bd, _)) => d.cost() < bd.cost(),
741 };
742 if take {
743 best = Some((d, (dc_idx, rack_idx, peer_idx)));
744 }
745 }
746 if let Some((_, (dc_idx, rack_idx, peer_idx))) = best {
747 let is_local_node = peers
748 .get(peer_idx as usize)
749 .is_some_and(crate::cluster::peer::Peer::is_local);
750 if is_local_node {
751 return DispatchPlan::LocalDatastore;
752 }
753 targets.push(build_target(dcs, peers, dc_idx, rack_idx, peer_idx));
754 }
755 }
756 ConsistencyLevel::DcQuorum | ConsistencyLevel::DcSafeQuorum => {
757 if local.is_empty() {
758 return DispatchPlan::NoTargets;
759 }
760 for (dc_idx, rack_idx, peer_idx) in local {
761 targets.push(build_target(dcs, peers, dc_idx, rack_idx, peer_idx));
762 }
763 }
764 ConsistencyLevel::DcEachSafeQuorum => {
765 if local.is_empty() && remote.is_empty() {
766 return DispatchPlan::NoTargets;
767 }
768 for (dc_idx, rack_idx, peer_idx) in local.iter().chain(remote.iter()) {
769 targets.push(build_target(dcs, peers, *dc_idx, *rack_idx, *peer_idx));
770 }
771 }
772 }
773 if want_per_dc_fanout && !remote.is_empty() {
774 for (dc_idx, rack_idx, peer_idx) in remote {
775 if !targets.iter().any(|t| t.peer_idx == peer_idx) {
776 targets.push(build_target(dcs, peers, dc_idx, rack_idx, peer_idx));
777 }
778 }
779 }
780 if targets.is_empty() {
781 return DispatchPlan::LocalDatastore;
782 }
783 DispatchPlan::Replicas {
784 targets,
785 consistency,
786 }
787}
788
789impl Dispatcher for ClusterDispatcher {
790 #[allow(
791 clippy::too_many_lines,
792 reason = "single dispatch fn must enumerate every plan; splitting hides the planner-to-effect mapping"
793 )]
794 fn dispatch(&self, req: Msg, responder: ServerSink) -> DispatchOutcome {
795 if req.flags().quit {
796 return DispatchOutcome::Drop;
797 }
798 if let Some(ext) = self.command_extension.as_ref() {
804 if let Some(outcome) = self.intercept_command(ext.as_ref(), &req) {
805 return outcome;
806 }
807 }
808 let key: Vec<u8> = req
816 .keys()
817 .first()
818 .map(|kp| kp.tag_bytes().to_vec())
819 .unwrap_or_default();
820 let plan = self.plan(&req, &key);
821 let (req_span, _plan_span) = enter_plan_span(req.id(), &plan);
822 match plan {
823 DispatchPlan::Drop => DispatchOutcome::Drop,
824 DispatchPlan::NoTargets => {
825 let err_type = if matches!(req.ty(), MsgType::ReqRedisGet | MsgType::ReqRedisSet) {
826 MsgType::RspRedisError
827 } else {
828 MsgType::RspMcServerError
829 };
830 let rsp = crate::msg::response::make_error(
831 &req,
832 err_type,
833 0,
834 crate::msg::DynErrorCode::DynomiteNoQuorumAchieved,
835 &self.mbuf_pool,
836 );
837 DispatchOutcome::Error(rsp)
838 }
839 DispatchPlan::LocalDatastore => {
840 if let Some(tx) = self.backend.as_ref() {
841 let bytes: Vec<u8> = req
846 .mbufs()
847 .iter()
848 .flat_map(|b| b.readable().to_vec())
849 .collect();
850 if bytes.is_empty() {
851 return DispatchOutcome::Drop;
855 }
856 let env = OutboundRequest {
857 bytes,
858 req_id: req.id(),
859 responder,
860 span: req_span.clone(),
861 ty: crate::proto::dnode::DmsgType::Req,
862 target_peer_idx: None,
863 };
864 if let Err(err) = tx.try_send(env) {
865 if let Some(m) = self.failure_metrics.as_ref() {
868 match err {
869 tokio::sync::mpsc::error::TrySendError::Full(_) => {
870 m.record_backend_send_full();
871 }
872 tokio::sync::mpsc::error::TrySendError::Closed(_) => {
873 m.record_backend_send_closed();
874 }
875 }
876 }
877 let err_type =
878 if matches!(req.ty(), MsgType::ReqRedisGet | MsgType::ReqRedisSet) {
879 MsgType::RspRedisError
880 } else {
881 MsgType::RspMcServerError
882 };
883 let rsp = crate::msg::response::make_error(
884 &req,
885 err_type,
886 0,
887 crate::msg::DynErrorCode::DynomiteNoQuorumAchieved,
888 &self.mbuf_pool,
889 );
890 return DispatchOutcome::Error(rsp);
891 }
892 }
893 DispatchOutcome::Pending
894 }
895 DispatchPlan::Replicas {
896 targets,
897 consistency,
898 } => self.dispatch_replicas(&req, &req_span, &targets, consistency, responder),
899 }
900 }
901}
902
903impl ClusterDispatcher {
904 fn dispatch_replicas(
927 &self,
928 req: &Msg,
929 req_span: &tracing::Span,
930 targets: &[ReplicaTarget],
931 consistency: ConsistencyLevel,
932 responder: ServerSink,
933 ) -> DispatchOutcome {
934 if targets.is_empty() {
935 return DispatchOutcome::Drop;
936 }
937 let bytes: Vec<u8> = req
941 .mbufs()
942 .iter()
943 .flat_map(|b| b.readable().to_vec())
944 .collect();
945 if bytes.is_empty() {
946 return DispatchOutcome::Drop;
947 }
948 let peer_states = self.snapshot_peer_states(targets);
952 let is_read = matches!(req.ty(), MsgType::Unknown) || req.flags().is_read;
953 let is_write = !is_read;
954 let handoff_active = self.hinted_handoff_active() && is_write;
955 if targets.len() == 1 {
958 return self.dispatch_replicas_direct(
959 req,
960 req_span,
961 targets,
962 &bytes,
963 &responder,
964 &HandoffCtx {
965 handoff_active,
966 peer_states: &peer_states,
967 },
968 );
969 }
970 let cfg = self.pool.config();
972 let local_dc = cfg.dc.clone();
973 let (intermediate_tx, intermediate_rx) =
978 mpsc::channel::<OutboundEnvelope>(targets.len() + 1);
979 let target_pairs: Vec<(u32, String)> =
982 targets.iter().map(|t| (t.peer_idx, t.dc.clone())).collect();
983 let repair_key: Option<Vec<u8>> = req
986 .keys()
987 .first()
988 .map(|kp| kp.tag_bytes().to_vec())
989 .filter(|k| !k.is_empty());
990 let repair_ctx = repair_key.map(|key| ReadRepairContext {
991 req_id: req.id(),
992 req_ty: req.ty(),
993 key,
994 mbuf_pool: self.mbuf_pool.clone(),
995 peer_backends: self.peer_backends.clone(),
996 local_backend: self.backend.clone(),
997 target_is_local: targets.iter().map(|t| (t.peer_idx, t.is_local)).collect(),
998 });
999 let mut sent = 0usize;
1002 let mut hinted = 0usize;
1003 for target in targets {
1004 let action = Self::choose_target_action(target, handoff_active, &peer_states);
1005 match action {
1006 TargetAction::Send => {
1007 if self.fanout_send(target, req, req_span, &bytes, &intermediate_tx) {
1008 sent += 1;
1009 } else if handoff_active
1010 && self.hint_target(target, &bytes, req, req_span, &intermediate_tx)
1011 {
1012 hinted += 1;
1013 }
1014 }
1015 TargetAction::Hint => {
1016 if self.hint_target(target, &bytes, req, req_span, &intermediate_tx) {
1017 hinted += 1;
1018 }
1019 }
1020 }
1021 }
1022 drop(intermediate_tx);
1027 if sent + hinted == 0 {
1028 return DispatchOutcome::Error(self.no_quorum_error(req));
1029 }
1030 let req_id = req.id();
1031 let req_ty = req.ty();
1032 let mbuf_pool = self.mbuf_pool.clone();
1033 let failure_metrics = self.failure_metrics.clone();
1034 tokio::spawn(coalesce_actor(
1035 req_id,
1036 req_ty,
1037 consistency,
1038 target_pairs,
1039 local_dc,
1040 intermediate_rx,
1041 responder,
1042 mbuf_pool,
1043 repair_ctx,
1044 failure_metrics,
1045 ));
1046 DispatchOutcome::Pending
1047 }
1048
1049 fn snapshot_peer_states(
1054 &self,
1055 targets: &[ReplicaTarget],
1056 ) -> std::collections::HashMap<u32, crate::cluster::peer::PeerState> {
1057 use crate::cluster::peer::PeerState;
1058 let peers = self.pool.peers().read();
1059 let mut out = std::collections::HashMap::with_capacity(targets.len());
1060 for t in targets {
1061 let state = if t.is_local {
1062 PeerState::Normal
1063 } else {
1064 peers
1065 .get(t.peer_idx as usize)
1066 .map_or(PeerState::Unknown, crate::cluster::peer::Peer::state)
1067 };
1068 out.insert(t.peer_idx, state);
1069 }
1070 out
1071 }
1072
1073 fn choose_target_action(
1074 target: &ReplicaTarget,
1075 handoff_active: bool,
1076 peer_states: &std::collections::HashMap<u32, crate::cluster::peer::PeerState>,
1077 ) -> TargetAction {
1078 use crate::cluster::peer::PeerState;
1079 if !handoff_active {
1080 return TargetAction::Send;
1081 }
1082 let state = peer_states
1083 .get(&target.peer_idx)
1084 .copied()
1085 .unwrap_or(PeerState::Unknown);
1086 match state {
1087 PeerState::Down => TargetAction::Hint,
1088 _ => TargetAction::Send,
1089 }
1090 }
1091
1092 fn fanout_send(
1095 &self,
1096 target: &ReplicaTarget,
1097 req: &Msg,
1098 req_span: &tracing::Span,
1099 bytes: &[u8],
1100 intermediate_tx: &mpsc::Sender<OutboundEnvelope>,
1101 ) -> bool {
1102 let env = OutboundRequest {
1103 bytes: bytes.to_vec(),
1104 req_id: req.id(),
1105 responder: intermediate_tx.clone(),
1106 span: req_span.clone(),
1107 ty: crate::proto::dnode::DmsgType::Req,
1108 target_peer_idx: Some(target.peer_idx),
1109 };
1110 let send_result = if target.is_local {
1111 self.backend.as_ref().map(|tx| tx.try_send(env))
1112 } else {
1113 self.peer_backends
1114 .get(&target.peer_idx)
1115 .map(|tx| tx.try_send(env))
1116 };
1117 match send_result {
1118 Some(Ok(())) => true,
1119 Some(Err(err)) => {
1120 self.observe_send_error(target, &err);
1121 false
1122 }
1123 None => false,
1124 }
1125 }
1126
1127 fn observe_send_error(
1132 &self,
1133 target: &ReplicaTarget,
1134 err: &tokio::sync::mpsc::error::TrySendError<OutboundRequest>,
1135 ) {
1136 let Some(m) = self.failure_metrics.as_ref() else {
1137 return;
1138 };
1139 if target.is_local {
1140 match err {
1141 tokio::sync::mpsc::error::TrySendError::Full(_) => m.record_backend_send_full(),
1142 tokio::sync::mpsc::error::TrySendError::Closed(_) => {
1143 m.record_backend_send_closed();
1144 }
1145 }
1146 } else {
1147 let peer_dc = self.peer_dc_label(target.peer_idx);
1148 match err {
1149 tokio::sync::mpsc::error::TrySendError::Full(_) => {
1150 m.record_peer_send_full(target.peer_idx, &peer_dc);
1151 }
1152 tokio::sync::mpsc::error::TrySendError::Closed(_) => {
1153 m.record_peer_send_closed(target.peer_idx, &peer_dc);
1154 }
1155 }
1156 }
1157 }
1158
1159 fn hint_target(
1163 &self,
1164 target: &ReplicaTarget,
1165 bytes: &[u8],
1166 req: &Msg,
1167 req_span: &tracing::Span,
1168 intermediate_tx: &mpsc::Sender<OutboundEnvelope>,
1169 ) -> bool {
1170 let Some(store) = self.hint_store.as_ref() else {
1171 return false;
1172 };
1173 let cfg = self.pool.config();
1174 let ttl = std::time::Duration::from_secs(cfg.hint_ttl_seconds.max(1));
1175 match store.enqueue(target.peer_idx, bytes.to_vec(), ttl) {
1176 Ok(()) => {}
1177 Err(e) => {
1178 tracing::debug!(
1179 target: "dynomite::hints",
1180 peer_idx = target.peer_idx,
1181 error = %e,
1182 "hint enqueue failed"
1183 );
1184 return false;
1185 }
1186 }
1187 let synth = synth_hint_reply(req, &self.mbuf_pool);
1188 let env = OutboundEnvelope {
1189 req_id: req.id(),
1190 rsp: synth,
1191 span: req_span.clone(),
1192 source_peer_idx: Some(target.peer_idx),
1193 };
1194 if intermediate_tx.try_send(env).is_err() {
1195 tracing::debug!(
1200 target: "dynomite::hints",
1201 peer_idx = target.peer_idx,
1202 "hint synth-reply could not be queued; coalescer absent"
1203 );
1204 }
1205 tracing::debug!(
1206 target: "dynomite::hints",
1207 peer_idx = target.peer_idx,
1208 bytes = bytes.len(),
1209 "stored hint for down peer"
1210 );
1211 true
1212 }
1213
1214 fn dispatch_replicas_direct(
1215 &self,
1216 req: &Msg,
1217 req_span: &tracing::Span,
1218 targets: &[ReplicaTarget],
1219 bytes: &[u8],
1220 responder: &ServerSink,
1221 ctx: &HandoffCtx<'_>,
1222 ) -> DispatchOutcome {
1223 debug_assert_eq!(targets.len(), 1);
1224 let target = &targets[0];
1225 if let TargetAction::Hint =
1229 Self::choose_target_action(target, ctx.handoff_active, ctx.peer_states)
1230 {
1231 if self.hint_single_target_direct(target, bytes, req, req_span, responder) {
1232 return DispatchOutcome::Pending;
1233 }
1234 return DispatchOutcome::Error(self.no_quorum_error(req));
1235 }
1236 let env = OutboundRequest {
1237 bytes: bytes.to_vec(),
1238 req_id: req.id(),
1239 responder: responder.clone(),
1240 span: req_span.clone(),
1241 ty: crate::proto::dnode::DmsgType::Req,
1242 target_peer_idx: Some(target.peer_idx),
1243 };
1244 let send_result = if target.is_local {
1245 self.backend.as_ref().map(|tx| tx.try_send(env))
1246 } else {
1247 self.peer_backends
1248 .get(&target.peer_idx)
1249 .map(|tx| tx.try_send(env))
1250 };
1251 let sent = match send_result {
1252 Some(Ok(())) => true,
1253 Some(Err(ref err)) => {
1254 self.observe_send_error(target, err);
1255 false
1256 }
1257 None => false,
1258 };
1259 if sent {
1260 return DispatchOutcome::Pending;
1261 }
1262 if ctx.handoff_active
1263 && self.hint_single_target_direct(target, bytes, req, req_span, responder)
1264 {
1265 return DispatchOutcome::Pending;
1266 }
1267 DispatchOutcome::Error(self.no_quorum_error(req))
1268 }
1269
1270 fn hint_single_target_direct(
1275 &self,
1276 target: &ReplicaTarget,
1277 bytes: &[u8],
1278 req: &Msg,
1279 req_span: &tracing::Span,
1280 responder: &ServerSink,
1281 ) -> bool {
1282 let Some(store) = self.hint_store.as_ref() else {
1283 return false;
1284 };
1285 let cfg = self.pool.config();
1286 let ttl = std::time::Duration::from_secs(cfg.hint_ttl_seconds.max(1));
1287 if let Err(e) = store.enqueue(target.peer_idx, bytes.to_vec(), ttl) {
1288 tracing::debug!(
1289 target: "dynomite::hints",
1290 peer_idx = target.peer_idx,
1291 error = %e,
1292 "hint enqueue failed (single-target)"
1293 );
1294 return false;
1295 }
1296 let synth = synth_hint_reply(req, &self.mbuf_pool);
1297 let env = OutboundEnvelope {
1298 req_id: req.id(),
1299 rsp: synth,
1300 span: req_span.clone(),
1301 source_peer_idx: Some(target.peer_idx),
1302 };
1303 let _ = responder.try_send(env);
1304 true
1305 }
1306
1307 fn no_quorum_error(&self, req: &Msg) -> Msg {
1308 let err_type = if matches!(req.ty(), MsgType::ReqRedisGet | MsgType::ReqRedisSet) {
1309 MsgType::RspRedisError
1310 } else {
1311 MsgType::RspMcServerError
1312 };
1313 crate::msg::response::make_error(
1314 req,
1315 err_type,
1316 0,
1317 crate::msg::DynErrorCode::DynomiteNoQuorumAchieved,
1318 &self.mbuf_pool,
1319 )
1320 }
1321
1322 fn intercept_command(
1330 &self,
1331 ext: &dyn crate::embed::CommandExtension,
1332 req: &Msg,
1333 ) -> Option<DispatchOutcome> {
1334 if ext.handles_msg_type(req.ty()) {
1335 return Some(self.run_extension_command(ext, req));
1336 }
1337 if matches!(req.ty(), MsgType::ReqRedisHset) {
1338 return self.intercept_extension_hset(ext, req);
1339 }
1340 None
1341 }
1342
1343 fn run_extension_command(
1350 &self,
1351 ext: &dyn crate::embed::CommandExtension,
1352 req: &Msg,
1353 ) -> DispatchOutcome {
1354 let recovered_kw: Vec<u8>;
1362 let keyword: &[u8] = match req.ty() {
1363 MsgType::ReqRedisFtCreate => b"FT.CREATE",
1364 MsgType::ReqRedisFtSearch => b"FT.SEARCH",
1365 MsgType::ReqRedisFtInfo => b"FT.INFO",
1366 MsgType::ReqRedisFtList => b"FT.LIST",
1367 MsgType::ReqRedisFtDropindex => b"FT.DROPINDEX",
1368 MsgType::ReqRedisFtRegex => b"FT.REGEX",
1369 MsgType::ReqRedisFtSugadd => b"FT.SUGADD",
1370 MsgType::ReqRedisFtSugget => b"FT.SUGGET",
1371 MsgType::ReqRedisFtSugdel => b"FT.SUGDEL",
1372 MsgType::ReqRedisFtSuglen => b"FT.SUGLEN",
1373 MsgType::ReqRedisFtUnknown => {
1374 recovered_kw = first_bulk_token(req).unwrap_or_else(|| b"FT.UNKNOWN".to_vec());
1375 recovered_kw.as_slice()
1376 }
1377 _ => return DispatchOutcome::Drop,
1382 };
1383 let mut args: Vec<&[u8]> = Vec::with_capacity(1 + req.keys().len() + req.args().len());
1384 args.push(keyword);
1385 for k in req.keys() {
1386 args.push(k.key());
1387 }
1388 for a in req.args() {
1389 args.push(a.bytes());
1390 }
1391 let bytes = ext.try_dispatch(&args).unwrap_or_else(|| {
1392 let kw = String::from_utf8_lossy(keyword);
1393 format!("-ERR not supported in this build: {kw}\r\n").into_bytes()
1394 });
1395 DispatchOutcome::Inline(synthetic_redis_reply(req, &self.mbuf_pool, &bytes))
1396 }
1397
1398 fn intercept_extension_hset(
1404 &self,
1405 ext: &dyn crate::embed::CommandExtension,
1406 req: &Msg,
1407 ) -> Option<DispatchOutcome> {
1408 let mut args: Vec<&[u8]> = Vec::with_capacity(req.keys().len() + req.args().len());
1409 for k in req.keys() {
1410 args.push(k.key());
1411 }
1412 for a in req.args() {
1413 args.push(a.bytes());
1414 }
1415 match ext.try_intercept_hset(&args) {
1416 crate::embed::HsetOutcome::Absorbed | crate::embed::HsetOutcome::NotIndexed => None,
1417 crate::embed::HsetOutcome::Error(message) => {
1418 let payload = format!("-ERR {message}\r\n");
1419 Some(DispatchOutcome::Error(synthetic_redis_reply(
1420 req,
1421 &self.mbuf_pool,
1422 payload.as_bytes(),
1423 )))
1424 }
1425 }
1426 }
1427}
1428
1429fn synthetic_redis_reply(req: &Msg, pool: &MbufPool, payload: &[u8]) -> Msg {
1435 let mut rsp = Msg::new(req.id(), MsgType::RspRedisStatus, false);
1436 rsp.set_parent_id(req.id());
1437 let mut written = 0usize;
1438 while written < payload.len() {
1439 let mut buf = pool.get();
1440 let n = buf.recv(&payload[written..]);
1441 debug_assert!(
1442 n > 0,
1443 "MbufPool returned a buffer with zero writable capacity"
1444 );
1445 rsp.mbufs_mut().push_back(buf);
1446 written += n;
1447 }
1448 rsp.recompute_mlen();
1449 rsp
1450}
1451
1452fn first_bulk_token(req: &Msg) -> Option<Vec<u8>> {
1462 let mut wire: Vec<u8> = Vec::new();
1463 for buf in req.mbufs() {
1464 wire.extend_from_slice(buf.readable());
1465 if wire.len() > 256 {
1466 break;
1467 }
1468 }
1469 let mut p = 0usize;
1470 if wire.first() == Some(&b'*') {
1471 let cr = wire.iter().position(|&b| b == b'\r')?;
1472 if wire.get(cr + 1) != Some(&b'\n') {
1473 return None;
1474 }
1475 p = cr + 2;
1476 }
1477 if wire.get(p) != Some(&b'$') {
1478 return None;
1479 }
1480 let header_start = p + 1;
1481 let header_cr = wire[header_start..]
1482 .iter()
1483 .position(|&b| b == b'\r')
1484 .map(|i| header_start + i)?;
1485 if wire.get(header_cr + 1) != Some(&b'\n') {
1486 return None;
1487 }
1488 let len_str = std::str::from_utf8(&wire[header_start..header_cr]).ok()?;
1489 let len: usize = len_str.parse().ok()?;
1490 let body_start = header_cr + 2;
1491 let body_end = body_start.checked_add(len)?;
1492 if wire.len() < body_end + 2 {
1493 return None;
1494 }
1495 Some(wire[body_start..body_end].to_vec())
1496}
1497
1498#[derive(Clone)]
1501struct ReadRepairContext {
1502 req_id: crate::core::types::MsgId,
1503 req_ty: MsgType,
1504 key: Vec<u8>,
1508 mbuf_pool: MbufPool,
1509 peer_backends: std::collections::HashMap<u32, mpsc::Sender<OutboundRequest>>,
1510 local_backend: Option<mpsc::Sender<OutboundRequest>>,
1511 target_is_local: std::collections::HashMap<u32, bool>,
1512}
1513
1514#[allow(
1516 clippy::too_many_arguments,
1517 reason = "actor task captures the entire dispatch context; bundling into a struct adds churn for no callsite gain"
1518)]
1519async fn coalesce_actor(
1520 req_id: crate::core::types::MsgId,
1521 req_ty: MsgType,
1522 consistency: ConsistencyLevel,
1523 targets: Vec<(u32, String)>,
1524 local_dc: String,
1525 mut intermediate_rx: mpsc::Receiver<OutboundEnvelope>,
1526 client_tx: ServerSink,
1527 mbuf_pool: MbufPool,
1528 repair_ctx: Option<ReadRepairContext>,
1529 failure_metrics: Option<Arc<crate::stats::FailureMetrics>>,
1530) {
1531 use crate::proto::redis::{CoalesceOutcome, CoalesceTracker};
1532 let mut tracker = CoalesceTracker::new(req_id, consistency, targets, &local_dc);
1533 let mut emitted = false;
1534 while let Some(env) = intermediate_rx.recv().await {
1535 let source = env.source_peer_idx.unwrap_or(u32::MAX);
1536 let span = env.span.clone();
1537 let outcome = tracker.record_reply(source, env.rsp);
1538 match outcome {
1539 CoalesceOutcome::Pending => {}
1540 CoalesceOutcome::Ready {
1541 winner,
1542 divergent_targets,
1543 } => {
1544 if !emitted {
1545 let winner_bytes: Vec<u8> = winner
1546 .mbufs()
1547 .iter()
1548 .flat_map(|b| b.readable().to_vec())
1549 .collect();
1550 let out_env = OutboundEnvelope {
1551 req_id,
1552 rsp: *winner,
1553 span: span.clone(),
1554 source_peer_idx: None,
1555 };
1556 let _ = client_tx.send(out_env).await;
1557 emitted = true;
1558 if !divergent_targets.is_empty() {
1559 if let Some(ctx) = repair_ctx.as_ref() {
1560 schedule_read_repair(ctx, &divergent_targets, &winner_bytes, &span);
1561 }
1562 }
1563 }
1564 }
1565 CoalesceOutcome::Error(reason) => {
1566 if !emitted {
1567 let err_type = if matches!(req_ty, MsgType::ReqRedisGet | MsgType::ReqRedisSet)
1568 {
1569 MsgType::RspRedisError
1570 } else {
1571 MsgType::RspMcServerError
1572 };
1573 let anchor = Msg::new(req_id, req_ty, true);
1574 let rsp = crate::msg::response::make_error(
1575 &anchor,
1576 err_type,
1577 0,
1578 crate::msg::DynErrorCode::DynomiteNoQuorumAchieved,
1579 &mbuf_pool,
1580 );
1581 let _ = client_tx
1582 .send(OutboundEnvelope {
1583 req_id,
1584 rsp,
1585 span: span.clone(),
1586 source_peer_idx: None,
1587 })
1588 .await;
1589 emitted = true;
1590 }
1591 tracing::debug!(target: "dynomite::coalesce", req_id, reason = %reason, "coalesce error");
1592 }
1593 }
1594 }
1595 if !emitted {
1596 if let Some(m) = failure_metrics.as_ref() {
1603 m.record_response_timeout(consistency);
1604 }
1605 let err_type = if matches!(req_ty, MsgType::ReqRedisGet | MsgType::ReqRedisSet) {
1606 MsgType::RspRedisError
1607 } else {
1608 MsgType::RspMcServerError
1609 };
1610 let anchor = Msg::new(req_id, req_ty, true);
1611 let rsp = crate::msg::response::make_error(
1612 &anchor,
1613 err_type,
1614 0,
1615 crate::msg::DynErrorCode::DynomiteNoQuorumAchieved,
1616 &mbuf_pool,
1617 );
1618 let _ = client_tx
1619 .send(OutboundEnvelope {
1620 req_id,
1621 rsp,
1622 span: tracing::Span::none(),
1623 source_peer_idx: None,
1624 })
1625 .await;
1626 }
1627}
1628
1629fn repair_sink() -> ServerSink {
1632 let (tx, mut rx) = mpsc::channel::<OutboundEnvelope>(8);
1633 tokio::spawn(async move {
1634 while rx.recv().await.is_some() {
1635 }
1638 });
1639 tx
1640}
1641
1642fn decode_winner_for_repair(payload: &[u8]) -> Option<RepairAction> {
1651 if payload == b"$-1\r\n" {
1652 return Some(RepairAction::Delete);
1653 }
1654 if !payload.starts_with(b"$") {
1655 return None;
1656 }
1657 let crlf = payload.iter().position(|&b| b == b'\r')?;
1659 if payload.get(crlf + 1).copied() != Some(b'\n') {
1660 return None;
1661 }
1662 let len_str = std::str::from_utf8(&payload[1..crlf]).ok()?;
1663 let len: usize = len_str.parse().ok()?;
1664 let body_start = crlf + 2;
1665 let body_end = body_start.checked_add(len)?;
1666 if payload.len() < body_end + 2 {
1667 return None;
1668 }
1669 if &payload[body_end..body_end + 2] != b"\r\n" {
1670 return None;
1671 }
1672 Some(RepairAction::Write(payload[body_start..body_end].to_vec()))
1673}
1674
1675struct HandoffCtx<'a> {
1681 handoff_active: bool,
1682 peer_states: &'a std::collections::HashMap<u32, crate::cluster::peer::PeerState>,
1683}
1684
1685#[derive(Clone, Copy, Debug, Eq, PartialEq)]
1688enum TargetAction {
1689 Send,
1691 Hint,
1694}
1695
1696fn synth_hint_reply(req: &Msg, pool: &MbufPool) -> Msg {
1707 crate::msg::response::make_simple_redis(req, pool, b"+OK\r\n")
1708}
1709
1710enum RepairAction {
1713 Write(Vec<u8>),
1715 Delete,
1718}
1719
1720fn build_repair_bytes(action: &RepairAction, key: &[u8]) -> Vec<u8> {
1722 match action {
1723 RepairAction::Write(value) => {
1724 let mut out = Vec::with_capacity(key.len() + value.len() + 32);
1725 out.extend_from_slice(b"*3\r\n$3\r\nSET\r\n$");
1726 out.extend_from_slice(key.len().to_string().as_bytes());
1727 out.extend_from_slice(b"\r\n");
1728 out.extend_from_slice(key);
1729 out.extend_from_slice(b"\r\n$");
1730 out.extend_from_slice(value.len().to_string().as_bytes());
1731 out.extend_from_slice(b"\r\n");
1732 out.extend_from_slice(value);
1733 out.extend_from_slice(b"\r\n");
1734 out
1735 }
1736 RepairAction::Delete => {
1737 let mut out = Vec::with_capacity(key.len() + 24);
1738 out.extend_from_slice(b"*2\r\n$3\r\nDEL\r\n$");
1739 out.extend_from_slice(key.len().to_string().as_bytes());
1740 out.extend_from_slice(b"\r\n");
1741 out.extend_from_slice(key);
1742 out.extend_from_slice(b"\r\n");
1743 out
1744 }
1745 }
1746}
1747
1748fn schedule_read_repair(
1766 ctx: &ReadRepairContext,
1767 divergent: &[u32],
1768 winner_bytes: &[u8],
1769 span: &tracing::Span,
1770) {
1771 if !matches!(ctx.req_ty, MsgType::ReqRedisGet) {
1772 return;
1773 }
1774 let Some(action) = decode_winner_for_repair(winner_bytes) else {
1775 return;
1776 };
1777 let bytes = build_repair_bytes(&action, &ctx.key);
1778 let sink = repair_sink();
1779 for &peer_idx in divergent {
1780 let is_local = ctx.target_is_local.get(&peer_idx).copied().unwrap_or(false);
1781 let env = OutboundRequest {
1782 bytes: bytes.clone(),
1783 req_id: ctx.req_id,
1784 responder: sink.clone(),
1785 span: span.clone(),
1786 ty: crate::proto::dnode::DmsgType::ReqForward,
1787 target_peer_idx: Some(peer_idx),
1788 };
1789 let sent = if is_local {
1790 ctx.local_backend
1791 .as_ref()
1792 .is_some_and(|tx| tx.try_send(env).is_ok())
1793 } else {
1794 ctx.peer_backends
1795 .get(&peer_idx)
1796 .is_some_and(|tx| tx.try_send(env).is_ok())
1797 };
1798 if sent {
1799 let _ = &ctx.mbuf_pool;
1800 tracing::debug!(
1801 target: "dynomite::read_repair",
1802 req_id = ctx.req_id,
1803 peer_idx,
1804 bytes = bytes.len(),
1805 "scheduled read-repair write",
1806 );
1807 } else {
1808 tracing::debug!(
1809 target: "dynomite::read_repair",
1810 req_id = ctx.req_id,
1811 peer_idx,
1812 "read-repair drop: backend channel unavailable or full",
1813 );
1814 }
1815 }
1816}
1817
1818#[cfg(test)]
1819mod tests {
1820 use super::*;
1821 use crate::cluster::peer::{Peer, PeerEndpoint, PeerState};
1822 use crate::conf::DataStore;
1823 use crate::hashkit::DynToken;
1824
1825 fn cfg(read: ConsistencyLevel, write: ConsistencyLevel) -> crate::cluster::PoolConfig {
1826 crate::cluster::PoolConfig {
1827 read_consistency: read,
1828 write_consistency: write,
1829 dc: "dc1".into(),
1830 rack: "rA".into(),
1831 ..crate::cluster::PoolConfig::default()
1832 }
1833 }
1834
1835 fn peer(idx: u32, dc: &str, rack: &str, tok: u32, is_local: bool, is_same: bool) -> Peer {
1836 let mut p = Peer::new(
1837 idx,
1838 PeerEndpoint::tcp("h".into(), 8101 + u16::try_from(idx).unwrap_or(0)),
1839 rack.into(),
1840 dc.into(),
1841 vec![DynToken::from_u32(tok)],
1842 is_local,
1843 is_same,
1844 false,
1845 );
1846 p.set_state(PeerState::Normal, 0);
1847 p
1848 }
1849
1850 fn pool(read: ConsistencyLevel, write: ConsistencyLevel, peers: Vec<Peer>) -> Arc<ServerPool> {
1851 let pool = ServerPool::new(cfg(read, write), peers);
1852 pool.preselect_remote_racks();
1853 Arc::new(pool)
1854 }
1855
1856 #[test]
1857 fn local_node_only_short_circuits() {
1858 let p = pool(
1859 ConsistencyLevel::DcOne,
1860 ConsistencyLevel::DcOne,
1861 vec![peer(0, "dc1", "rA", 10, true, true)],
1862 );
1863 let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
1864 req.set_routing(MsgRouting::LocalNodeOnly);
1865 assert_eq!(
1866 ClusterDispatcher::new(p).plan(&req, b"k"),
1867 DispatchPlan::LocalDatastore,
1868 );
1869 }
1870
1871 #[test]
1872 fn dc_one_read_targets_local_rack_when_present() {
1873 let p = pool(
1874 ConsistencyLevel::DcOne,
1875 ConsistencyLevel::DcOne,
1876 vec![
1877 peer(0, "dc1", "rA", 10, true, true),
1878 peer(1, "dc1", "rB", 20, false, true),
1879 peer(2, "dc2", "rA", 30, false, false),
1880 ],
1881 );
1882 let req = Msg::new(1, MsgType::ReqRedisGet, true);
1883 let plan = ClusterDispatcher::new(p).plan(&req, b"hello");
1885 assert!(matches!(plan, DispatchPlan::LocalDatastore));
1886 }
1887
1888 #[test]
1889 fn dc_quorum_fans_out_local_dc() {
1890 let p = pool(
1891 ConsistencyLevel::DcQuorum,
1892 ConsistencyLevel::DcQuorum,
1893 vec![
1894 peer(0, "dc1", "rA", 10, true, true),
1895 peer(1, "dc1", "rB", 20, false, true),
1896 peer(2, "dc2", "rA", 30, false, false),
1897 ],
1898 );
1899 let req = Msg::new(1, MsgType::ReqRedisGet, true);
1900 let plan = ClusterDispatcher::new(p).plan(&req, b"k");
1901 match plan {
1902 DispatchPlan::Replicas { targets: rs, .. } => {
1903 assert_eq!(rs.len(), 2);
1904 for r in rs {
1905 assert_eq!(r.dc, "dc1");
1906 }
1907 }
1908 _ => panic!("expected replicas"),
1909 }
1910 }
1911
1912 #[test]
1913 fn dc_each_safe_quorum_fans_out_per_dc() {
1914 let p = pool(
1915 ConsistencyLevel::DcEachSafeQuorum,
1916 ConsistencyLevel::DcEachSafeQuorum,
1917 vec![
1918 peer(0, "dc1", "rA", 10, true, true),
1919 peer(1, "dc2", "rA", 20, false, false),
1920 ],
1921 );
1922 let req = Msg::new(1, MsgType::ReqRedisGet, true);
1923 let plan = ClusterDispatcher::new(p).plan(&req, b"k");
1924 match plan {
1925 DispatchPlan::Replicas { targets: rs, .. } => {
1926 assert_eq!(rs.len(), 2);
1927 let dcs: Vec<&str> = rs.iter().map(|r| r.dc.as_str()).collect();
1928 assert!(dcs.contains(&"dc1"));
1929 assert!(dcs.contains(&"dc2"));
1930 }
1931 _ => panic!("expected replicas"),
1932 }
1933 }
1934
1935 #[test]
1936 fn no_routable_peers_returns_no_targets() {
1937 let mut p0 = peer(0, "dc1", "rA", 10, true, true);
1938 p0.set_state(PeerState::Down, 0);
1939 let p = pool(
1940 ConsistencyLevel::DcQuorum,
1941 ConsistencyLevel::DcQuorum,
1942 vec![p0],
1943 );
1944 let req = Msg::new(1, MsgType::ReqRedisGet, true);
1945 let plan = ClusterDispatcher::new(p).plan(&req, b"k");
1946 assert_eq!(plan, DispatchPlan::NoTargets);
1947 }
1948
1949 #[test]
1957 fn no_targets_error_response_carries_dynomite_wire_bytes() {
1958 let mut p0 = peer(0, "dc1", "rA", 10, true, true);
1959 p0.set_state(PeerState::Down, 0);
1960 let p = pool(
1961 ConsistencyLevel::DcQuorum,
1962 ConsistencyLevel::DcQuorum,
1963 vec![p0],
1964 );
1965 let disp = ClusterDispatcher::new(p);
1966 let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
1967 req.push_key(crate::msg::keypos::KeyPos::without_tag(b"k".to_vec()));
1968 let (tx, _rx) = mpsc::channel(1);
1969 let outcome = disp.dispatch(req, tx);
1970 match outcome {
1971 DispatchOutcome::Error(rsp) => {
1972 assert_eq!(rsp.ty(), MsgType::RspRedisError);
1973 assert!(rsp.flags().is_error);
1974 let bytes: Vec<u8> = rsp
1975 .mbufs()
1976 .iter()
1977 .flat_map(|b| b.readable().to_vec())
1978 .collect();
1979 assert!(
1980 !bytes.is_empty(),
1981 "NoTargets must produce on-wire bytes, not a 0-byte hang"
1982 );
1983 assert!(bytes.starts_with(b"-Dynomite: "));
1984 assert!(bytes.ends_with(b"\r\n"));
1985 assert_eq!(rsp.mlen() as usize, bytes.len());
1986 }
1987 other => panic!("expected DispatchOutcome::Error, got {other:?}"),
1988 }
1989 }
1990
1991 #[test]
1995 fn no_targets_error_response_memcache_wire_bytes() {
1996 let mut cfg = cfg(ConsistencyLevel::DcQuorum, ConsistencyLevel::DcQuorum);
2002 cfg.data_store = DataStore::Memcache;
2003 let mut p0 = peer(0, "dc1", "rA", 10, true, true);
2004 p0.set_state(PeerState::Down, 0);
2005 let pool_arc = ServerPool::new(cfg, vec![p0]);
2006 pool_arc.preselect_remote_racks();
2007 let disp = ClusterDispatcher::new(Arc::new(pool_arc));
2008 let mut req = Msg::new(1, MsgType::ReqMcGet, true);
2009 req.push_key(crate::msg::keypos::KeyPos::without_tag(b"k".to_vec()));
2010 let (tx, _rx) = mpsc::channel(1);
2011 let outcome = disp.dispatch(req, tx);
2012 match outcome {
2013 DispatchOutcome::Error(rsp) => {
2014 assert_eq!(rsp.ty(), MsgType::RspMcServerError);
2015 let bytes: Vec<u8> = rsp
2016 .mbufs()
2017 .iter()
2018 .flat_map(|b| b.readable().to_vec())
2019 .collect();
2020 assert!(
2021 !bytes.is_empty(),
2022 "NoTargets must produce on-wire bytes, not a 0-byte hang"
2023 );
2024 assert!(bytes.starts_with(b"SERVER_ERROR "));
2025 assert!(bytes.ends_with(b"\r\n"));
2026 }
2027 other => panic!("expected DispatchOutcome::Error, got {other:?}"),
2028 }
2029 }
2030
2031 use crate::cluster::pool::{BucketType, PoolConfig};
2032
2033 fn pool_with_bucket_types(
2034 pool_read: ConsistencyLevel,
2035 pool_write: ConsistencyLevel,
2036 bucket_types: Vec<BucketType>,
2037 default_bucket_type: Option<&str>,
2038 peers: Vec<Peer>,
2039 ) -> Arc<ServerPool> {
2040 let cfg = PoolConfig {
2041 read_consistency: pool_read,
2042 write_consistency: pool_write,
2043 dc: "dc1".into(),
2044 rack: "rA".into(),
2045 bucket_types,
2046 default_bucket_type: default_bucket_type.map(str::to_string),
2047 ..PoolConfig::default()
2048 };
2049 let pool = ServerPool::new(cfg, peers);
2050 pool.preselect_remote_racks();
2051 Arc::new(pool)
2052 }
2053
2054 fn three_local_peers() -> Vec<Peer> {
2055 vec![
2056 peer(0, "dc1", "rA", 10, true, true),
2057 peer(1, "dc1", "rB", 20, false, true),
2058 peer(2, "dc1", "rC", 30, false, true),
2059 ]
2060 }
2061
2062 #[test]
2063 fn bucket_type_overrides_pool_consistency() {
2064 let bts = vec![BucketType {
2066 name: "hot".into(),
2067 read_consistency: ConsistencyLevel::DcQuorum,
2068 write_consistency: ConsistencyLevel::DcQuorum,
2069 n_val: 0,
2070 }];
2071 let p = pool_with_bucket_types(
2072 ConsistencyLevel::DcOne,
2073 ConsistencyLevel::DcOne,
2074 bts,
2075 None,
2076 three_local_peers(),
2077 );
2078 let req = Msg::new(1, MsgType::ReqRedisGet, true);
2079 let plan = ClusterDispatcher::new(p).plan(&req, b"hot/key1");
2080 match plan {
2081 DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 3),
2082 other => panic!("expected DC_QUORUM fan-out, got {other:?}"),
2083 }
2084 }
2085
2086 #[test]
2087 fn slashless_key_falls_back_to_pool_default() {
2088 let bts = vec![BucketType {
2089 name: "hot".into(),
2090 read_consistency: ConsistencyLevel::DcQuorum,
2091 write_consistency: ConsistencyLevel::DcQuorum,
2092 n_val: 0,
2093 }];
2094 let p = pool_with_bucket_types(
2095 ConsistencyLevel::DcOne,
2096 ConsistencyLevel::DcOne,
2097 bts,
2098 None,
2099 three_local_peers(),
2100 );
2101 let req = Msg::new(1, MsgType::ReqRedisGet, true);
2102 let plan = ClusterDispatcher::new(p).plan(&req, b"plain-key");
2103 assert!(matches!(plan, DispatchPlan::LocalDatastore));
2106 }
2107
2108 #[test]
2109 fn unknown_bucket_uses_default_bucket_type_when_set() {
2110 let bts = vec![BucketType {
2111 name: "safe".into(),
2112 read_consistency: ConsistencyLevel::DcQuorum,
2113 write_consistency: ConsistencyLevel::DcQuorum,
2114 n_val: 0,
2115 }];
2116 let p = pool_with_bucket_types(
2117 ConsistencyLevel::DcOne,
2118 ConsistencyLevel::DcOne,
2119 bts,
2120 Some("safe"),
2121 three_local_peers(),
2122 );
2123 let req = Msg::new(1, MsgType::ReqRedisGet, true);
2124 let plan = ClusterDispatcher::new(p.clone()).plan(&req, b"plain-key");
2127 match plan {
2128 DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 3),
2129 other => panic!("expected DC_QUORUM via default bucket, got {other:?}"),
2130 }
2131 let plan = ClusterDispatcher::new(p).plan(&req, b"unknown-bucket/key");
2134 match plan {
2135 DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 3),
2136 other => panic!("expected DC_QUORUM via default bucket, got {other:?}"),
2137 }
2138 }
2139
2140 #[test]
2141 fn unknown_bucket_with_no_default_uses_pool_default() {
2142 let bts = vec![BucketType {
2143 name: "safe".into(),
2144 read_consistency: ConsistencyLevel::DcQuorum,
2145 write_consistency: ConsistencyLevel::DcQuorum,
2146 n_val: 0,
2147 }];
2148 let p = pool_with_bucket_types(
2149 ConsistencyLevel::DcOne,
2150 ConsistencyLevel::DcOne,
2151 bts,
2152 None,
2153 three_local_peers(),
2154 );
2155 let req = Msg::new(1, MsgType::ReqRedisGet, true);
2156 let plan = ClusterDispatcher::new(p).plan(&req, b"unknown-bucket/key");
2157 assert!(matches!(plan, DispatchPlan::LocalDatastore));
2158 }
2159
2160 #[test]
2161 fn n_val_one_caps_replicas_to_first_target() {
2162 let bts = vec![BucketType {
2163 name: "thin".into(),
2164 read_consistency: ConsistencyLevel::DcQuorum,
2165 write_consistency: ConsistencyLevel::DcQuorum,
2166 n_val: 1,
2167 }];
2168 let p = pool_with_bucket_types(
2169 ConsistencyLevel::DcOne,
2170 ConsistencyLevel::DcOne,
2171 bts,
2172 None,
2173 three_local_peers(),
2174 );
2175 let req = Msg::new(1, MsgType::ReqRedisGet, true);
2176 let plan = ClusterDispatcher::new(p).plan(&req, b"thin/key");
2177 match plan {
2178 DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 1),
2179 other => panic!("expected single-target plan, got {other:?}"),
2180 }
2181 }
2182
2183 #[test]
2184 fn n_val_two_caps_replicas_to_first_two_targets() {
2185 let bts = vec![BucketType {
2186 name: "medium".into(),
2187 read_consistency: ConsistencyLevel::DcQuorum,
2188 write_consistency: ConsistencyLevel::DcQuorum,
2189 n_val: 2,
2190 }];
2191 let p = pool_with_bucket_types(
2192 ConsistencyLevel::DcOne,
2193 ConsistencyLevel::DcOne,
2194 bts,
2195 None,
2196 three_local_peers(),
2197 );
2198 let req = Msg::new(1, MsgType::ReqRedisGet, true);
2199 let plan = ClusterDispatcher::new(p).plan(&req, b"medium/key");
2200 match plan {
2201 DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 2),
2202 other => panic!("expected two-target plan, got {other:?}"),
2203 }
2204 }
2205
2206 #[test]
2207 fn n_val_zero_does_not_cap() {
2208 let bts = vec![BucketType {
2209 name: "any".into(),
2210 read_consistency: ConsistencyLevel::DcQuorum,
2211 write_consistency: ConsistencyLevel::DcQuorum,
2212 n_val: 0,
2213 }];
2214 let p = pool_with_bucket_types(
2215 ConsistencyLevel::DcOne,
2216 ConsistencyLevel::DcOne,
2217 bts,
2218 None,
2219 three_local_peers(),
2220 );
2221 let req = Msg::new(1, MsgType::ReqRedisGet, true);
2222 let plan = ClusterDispatcher::new(p).plan(&req, b"any/key");
2223 match plan {
2224 DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 3),
2225 other => panic!("expected uncapped plan, got {other:?}"),
2226 }
2227 }
2228
2229 #[test]
2230 fn n_val_larger_than_replicas_is_a_no_op() {
2231 let bts = vec![BucketType {
2232 name: "big".into(),
2233 read_consistency: ConsistencyLevel::DcQuorum,
2234 write_consistency: ConsistencyLevel::DcQuorum,
2235 n_val: 7,
2236 }];
2237 let p = pool_with_bucket_types(
2238 ConsistencyLevel::DcOne,
2239 ConsistencyLevel::DcOne,
2240 bts,
2241 None,
2242 three_local_peers(),
2243 );
2244 let req = Msg::new(1, MsgType::ReqRedisGet, true);
2245 let plan = ClusterDispatcher::new(p).plan(&req, b"big/key");
2246 match plan {
2247 DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 3),
2248 other => panic!("expected uncapped plan, got {other:?}"),
2249 }
2250 }
2251
2252 #[test]
2256 fn no_targets_records_failure_metric() {
2257 let mut p0 = peer(0, "dc1", "rA", 10, true, true);
2258 p0.set_state(PeerState::Down, 0);
2259 let p = pool(
2260 ConsistencyLevel::DcQuorum,
2261 ConsistencyLevel::DcQuorum,
2262 vec![p0],
2263 );
2264 let metrics = Arc::new(crate::stats::FailureMetrics::new());
2265 let disp = ClusterDispatcher::new(p).with_failure_metrics(metrics.clone());
2266 let req = Msg::new(1, MsgType::ReqRedisGet, true);
2267 assert_eq!(disp.plan(&req, b"k"), DispatchPlan::NoTargets);
2268 let snap = metrics.snapshot();
2269 assert_eq!(snap.no_targets.len(), 1);
2270 let entry = &snap.no_targets[0];
2271 assert_eq!(entry.dc, "dc1");
2272 assert_eq!(entry.rack, "rA");
2273 assert_eq!(entry.consistency, ConsistencyLevel::DcQuorum);
2274 assert_eq!(entry.count, 1);
2275 }
2276
2277 #[tokio::test]
2283 async fn closed_backend_channel_records_closed_metric() {
2284 let p = pool(
2285 ConsistencyLevel::DcOne,
2286 ConsistencyLevel::DcOne,
2287 vec![peer(0, "dc1", "rA", 10, true, true)],
2288 );
2289 let (tx, rx) = mpsc::channel::<crate::net::server::OutboundRequest>(4);
2290 drop(rx);
2291 let metrics = Arc::new(crate::stats::FailureMetrics::new());
2292 let disp = ClusterDispatcher::new(p)
2293 .with_backend(tx)
2294 .with_failure_metrics(metrics.clone());
2295 let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
2296 let pool_buf = crate::io::mbuf::MbufPool::default();
2300 let mut buf = pool_buf.get();
2301 buf.copy_from_slice(b"PING\r\n");
2302 req.mbufs_mut().push_back(buf);
2303 let (resp_tx, _resp_rx) = mpsc::channel(1);
2304 let outcome = disp.dispatch(req, resp_tx);
2305 assert!(matches!(outcome, DispatchOutcome::Error(_)));
2306 let snap = metrics.snapshot();
2307 assert_eq!(snap.backend_send_closed, 1);
2308 assert_eq!(snap.backend_send_full, 0);
2309 }
2310
2311 #[test]
2316 fn two_peer_pool_with_one_down_records_per_key_no_targets() {
2317 let cfg = crate::cluster::PoolConfig {
2318 dc: "dc1".into(),
2319 rack: "rA".into(),
2320 read_consistency: ConsistencyLevel::DcQuorum,
2321 write_consistency: ConsistencyLevel::DcQuorum,
2322 ..crate::cluster::PoolConfig::default()
2323 };
2324 let p0 = peer(0, "dc1", "rA", 2_147_483_648, true, true);
2332 let mut p1 = peer(1, "dc1", "rA", 0, false, true);
2333 p1.set_state(PeerState::Down, 0);
2334 let pool_arc = ServerPool::new(cfg, vec![p0, p1]);
2335 pool_arc.preselect_remote_racks();
2336 let metrics = Arc::new(crate::stats::FailureMetrics::new());
2337 let disp = ClusterDispatcher::new(Arc::new(pool_arc)).with_failure_metrics(metrics.clone());
2338 let mut planned_no_targets = 0u64;
2339 let mut planned_routable = 0u64;
2340 for i in 0..100u32 {
2341 let key = format!("k{i:03}");
2342 let req = Msg::new(u64::from(i), MsgType::ReqRedisGet, true);
2343 match disp.plan(&req, key.as_bytes()) {
2344 DispatchPlan::NoTargets => planned_no_targets += 1,
2345 DispatchPlan::Replicas { .. } | DispatchPlan::LocalDatastore => {
2346 planned_routable += 1;
2347 }
2348 DispatchPlan::Drop => panic!("unexpected Drop in plan"),
2349 }
2350 }
2351 assert!(planned_no_targets > 0, "expected some NoTargets dispatches");
2352 assert!(planned_routable > 0, "expected some routable dispatches");
2353 let snap = metrics.snapshot();
2354 let counter_total: u64 = snap.no_targets.iter().map(|e| e.count).sum();
2355 assert_eq!(
2356 counter_total, planned_no_targets,
2357 "dispatch_no_targets_total must match observed NoTargets count",
2358 );
2359 assert_eq!(snap.backend_send_full, 0);
2362 assert_eq!(snap.backend_send_closed, 0);
2363 assert!(snap.peer_send_full.is_empty());
2364 assert!(snap.peer_send_closed.is_empty());
2365 }
2366}