1use std::sync::atomic::{AtomicU64, Ordering};
46use std::sync::Arc;
47
48use tokio::sync::mpsc;
49
50use crate::cluster::pool::ServerPool;
51use crate::cluster::snitch::{rack_distance, RackDistance};
52use crate::cluster::vnode;
53use crate::conf::HashType as ConfHashType;
54use crate::hashkit::{self, HashType};
55use crate::io::mbuf::MbufPool;
56use crate::msg::{ConsistencyLevel, Msg, MsgRouting, MsgType};
57use crate::net::dispatcher::{DispatchOutcome, Dispatcher, OutboundEnvelope, ServerSink};
58use crate::net::server::OutboundRequest;
59
60#[must_use]
74pub fn distribution_shadow_disagreement_total() -> u64 {
75 SHADOW_DISAGREEMENTS.load(Ordering::Relaxed)
76}
77
78pub fn reset_distribution_shadow_disagreement_total() {
89 SHADOW_DISAGREEMENTS.store(0, Ordering::Relaxed);
90}
91
92static SHADOW_DISAGREEMENTS: AtomicU64 = AtomicU64::new(0);
93
94fn bump_shadow_disagreement() {
95 SHADOW_DISAGREEMENTS.fetch_add(1, Ordering::Relaxed);
96}
97
98fn enter_plan_span(
104 req_id: u64,
105 plan: &DispatchPlan,
106) -> (tracing::Span, tracing::span::EnteredSpan) {
107 let req_span = tracing::Span::current();
108 let kind: &'static str = match plan {
109 DispatchPlan::Drop => "drop",
110 DispatchPlan::NoTargets => "no_targets",
111 DispatchPlan::LocalDatastore => "local_datastore",
112 DispatchPlan::Replicas { .. } => "replicas",
113 };
114 let targets = match plan {
115 DispatchPlan::Replicas { targets, .. } => targets.len(),
116 _ => 0,
117 };
118 let span = tracing::info_span!("dispatch.plan", req_id, plan = kind, targets,).entered();
119 (req_span, span)
120}
121
122fn map_hash(h: ConfHashType) -> HashType {
123 match h {
124 ConfHashType::OneAtATime => HashType::OneAtATime,
125 ConfHashType::Md5 => HashType::Md5,
126 ConfHashType::Crc16 => HashType::Crc16,
127 ConfHashType::Crc32 => HashType::Crc32,
128 ConfHashType::Crc32a => HashType::Crc32a,
129 ConfHashType::Fnv1_64 => HashType::Fnv1_64,
130 ConfHashType::Fnv1a64 => HashType::Fnv1a_64,
131 ConfHashType::Fnv1_32 => HashType::Fnv1_32,
132 ConfHashType::Fnv1a32 => HashType::Fnv1a_32,
133 ConfHashType::Hsieh => HashType::Hsieh,
134 ConfHashType::Murmur => HashType::Murmur,
135 ConfHashType::Jenkins => HashType::Jenkins,
136 ConfHashType::Murmur3 => HashType::Murmur3,
137 ConfHashType::Murmur3X64_64 => HashType::Murmur3X64_64,
138 }
139}
140
141#[derive(Clone, Debug, Eq, PartialEq)]
143pub struct ReplicaTarget {
144 pub peer_idx: u32,
146 pub dc: String,
148 pub rack: String,
150 pub is_local: bool,
152}
153
154#[derive(Clone, Debug, Eq, PartialEq)]
162pub enum DispatchPlan {
163 LocalDatastore,
165 Replicas {
171 targets: Vec<ReplicaTarget>,
173 consistency: ConsistencyLevel,
175 },
176 NoTargets,
179 Drop,
181}
182
183#[derive(Debug, Clone)]
185pub struct ClusterDispatcher {
186 pool: Arc<ServerPool>,
187 backend: Option<mpsc::Sender<OutboundRequest>>,
194 peer_backends: std::collections::HashMap<u32, mpsc::Sender<OutboundRequest>>,
202 mbuf_pool: MbufPool,
207 hint_store: Option<Arc<crate::cluster::hints::HintStore>>,
214 failure_metrics: Option<Arc<crate::stats::FailureMetrics>>,
220 vector_registry: Option<Arc<crate::vector::registry::VectorRegistry>>,
230}
231
232impl ClusterDispatcher {
233 #[must_use]
253 pub fn new(pool: Arc<ServerPool>) -> Self {
254 Self {
255 pool,
256 backend: None,
257 peer_backends: std::collections::HashMap::new(),
258 mbuf_pool: MbufPool::default(),
259 hint_store: None,
260 failure_metrics: None,
261 vector_registry: None,
262 }
263 }
264
265 #[must_use]
287 pub fn with_mbuf_pool(mut self, pool: MbufPool) -> Self {
288 self.mbuf_pool = pool;
289 self
290 }
291
292 #[must_use]
296 pub fn mbuf_pool(&self) -> &MbufPool {
297 &self.mbuf_pool
298 }
299
300 #[must_use]
309 pub fn with_backend(mut self, backend: mpsc::Sender<OutboundRequest>) -> Self {
310 self.backend = Some(backend);
311 self
312 }
313
314 #[must_use]
326 pub fn with_peer_backend(
327 mut self,
328 peer_idx: u32,
329 sender: mpsc::Sender<OutboundRequest>,
330 ) -> Self {
331 self.peer_backends.insert(peer_idx, sender);
332 self
333 }
334
335 #[must_use]
337 pub fn has_backend(&self) -> bool {
338 self.backend.is_some()
339 }
340
341 #[must_use]
343 pub fn peer_backend_count(&self) -> usize {
344 self.peer_backends.len()
345 }
346
347 #[must_use]
349 pub fn pool(&self) -> &Arc<ServerPool> {
350 &self.pool
351 }
352
353 #[must_use]
386 pub fn with_hint_store(mut self, store: Arc<crate::cluster::hints::HintStore>) -> Self {
387 self.hint_store = Some(store);
388 self
389 }
390
391 #[must_use]
393 pub fn hint_store(&self) -> Option<&Arc<crate::cluster::hints::HintStore>> {
394 self.hint_store.as_ref()
395 }
396
397 #[must_use]
427 pub fn with_failure_metrics(mut self, metrics: Arc<crate::stats::FailureMetrics>) -> Self {
428 self.failure_metrics = Some(metrics);
429 self
430 }
431
432 #[must_use]
434 pub fn failure_metrics(&self) -> Option<&Arc<crate::stats::FailureMetrics>> {
435 self.failure_metrics.as_ref()
436 }
437
438 #[must_use]
477 pub fn with_vector_registry(
478 mut self,
479 registry: Arc<crate::vector::registry::VectorRegistry>,
480 ) -> Self {
481 self.vector_registry = Some(registry);
482 self
483 }
484
485 #[must_use]
487 pub fn vector_registry(&self) -> Option<&Arc<crate::vector::registry::VectorRegistry>> {
488 self.vector_registry.as_ref()
489 }
490
491 #[must_use]
494 pub fn hinted_handoff_active(&self) -> bool {
495 self.hint_store.is_some() && self.pool.config().enable_hinted_handoff
496 }
497
498 #[must_use]
513 pub fn plan(&self, req: &Msg, key: &[u8]) -> DispatchPlan {
514 let cfg = self.pool.config();
515 let peers = self.pool.peers().read();
516 if peers.is_empty() {
517 self.record_no_targets_metric(cfg, ConsistencyLevel::default());
518 return DispatchPlan::NoTargets;
519 }
520 if matches!(req.routing(), MsgRouting::LocalNodeOnly) {
521 return DispatchPlan::LocalDatastore;
522 }
523 if key.is_empty() {
524 return DispatchPlan::LocalDatastore;
525 }
526 let token = hashkit::hash(map_hash(cfg.hash), key);
527 let key_hash64 = hashkit::hash64(map_hash(cfg.hash), key);
528 let bucket = crate::proto::redis::bucket_name(key);
529 let bucket_type = cfg.resolve_bucket_type(bucket);
530 let is_read = matches!(req.ty(), MsgType::Unknown) || req.flags().is_read;
531 let consistency = match (bucket_type, is_read) {
532 (Some(bt), true) => bt.read_consistency,
533 (Some(bt), false) => bt.write_consistency,
534 (None, true) => cfg.read_consistency,
535 (None, false) => cfg.write_consistency,
536 };
537 let n_val_cap = bucket_type.map_or(0, |bt| bt.n_val);
538 let dcs = self.pool.datacenters().read();
539 let include_down = self.hinted_handoff_active() && !is_read;
548 let routable = collect_routable(
549 &dcs,
550 &peers,
551 &token,
552 key_hash64,
553 cfg.distribution,
554 include_down,
555 );
556 if let Some(shadow) = cfg.distribution_shadow {
557 if shadow != cfg.distribution {
558 let shadow_routable =
559 collect_routable(&dcs, &peers, &token, key_hash64, shadow, include_down);
560 if !plans_agree(&routable, &shadow_routable) {
561 bump_shadow_disagreement();
562 tracing::debug!(
563 target: "dynomite::dispatch::shadow",
564 live = cfg.distribution.as_str(),
565 shadow = shadow.as_str(),
566 "shadow distribution disagreed on key route"
567 );
568 }
569 }
570 }
571 if routable.is_empty() {
572 self.record_no_targets_metric(cfg, consistency);
573 return DispatchPlan::NoTargets;
574 }
575 let (local, remote): (Vec<_>, Vec<_>) = routable
576 .into_iter()
577 .partition(|(dc_idx, _, _)| dcs[*dc_idx].name() == cfg.dc);
578 let plan =
579 plan_with_consistency(cfg, &dcs, &peers, consistency, req.routing(), local, remote);
580 let plan = cap_replicas(plan, n_val_cap);
581 if matches!(plan, DispatchPlan::NoTargets) {
582 self.record_no_targets_metric(cfg, consistency);
583 }
584 plan
585 }
586
587 fn record_no_targets_metric(
590 &self,
591 cfg: &crate::cluster::pool::PoolConfig,
592 consistency: ConsistencyLevel,
593 ) {
594 if let Some(m) = self.failure_metrics.as_ref() {
595 m.record_no_targets(&cfg.dc, &cfg.rack, consistency);
596 }
597 }
598
599 fn peer_dc_label(&self, peer_idx: u32) -> String {
603 let peers = self.pool.peers().read();
604 peers
605 .get(peer_idx as usize)
606 .map_or_else(|| self.pool.config().dc.clone(), |p| p.dc().to_string())
607 }
608}
609
610fn cap_replicas(plan: DispatchPlan, cap: u8) -> DispatchPlan {
616 if cap == 0 {
617 return plan;
618 }
619 let cap = cap as usize;
620 match plan {
621 DispatchPlan::Replicas {
622 mut targets,
623 consistency,
624 } if targets.len() > cap => {
625 targets.truncate(cap);
626 DispatchPlan::Replicas {
627 targets,
628 consistency,
629 }
630 }
631 other => other,
632 }
633}
634
635fn plans_agree(a: &[(usize, usize, u32)], b: &[(usize, usize, u32)]) -> bool {
636 if a.len() != b.len() {
637 return false;
638 }
639 let mut a_idx: Vec<u32> = a.iter().map(|t| t.2).collect();
640 let mut b_idx: Vec<u32> = b.iter().map(|t| t.2).collect();
641 a_idx.sort_unstable();
642 b_idx.sort_unstable();
643 a_idx == b_idx
644}
645
646fn collect_routable(
647 dcs: &[crate::cluster::Datacenter],
648 peers: &[crate::cluster::peer::Peer],
649 token: &crate::hashkit::DynToken,
650 hash64: u64,
651 distribution: crate::conf::Distribution,
652 include_down: bool,
653) -> Vec<(usize, usize, u32)> {
654 let mut routable: Vec<(usize, usize, u32)> = Vec::new();
655 for (dc_idx, dc) in dcs.iter().enumerate() {
656 for (rack_idx, rack) in dc.racks().iter().enumerate() {
657 let candidate = match (distribution, rack.random_slices()) {
658 (crate::conf::Distribution::RandomSlicing, Some(slices)) => {
659 slices.claimant_for(hash64).and_then(|name| {
666 peers.iter().find_map(|p| {
667 if p.dc() == dc.name()
668 && p.rack() == rack.name()
669 && p.endpoint().pname() == name
670 {
671 Some(p.idx())
672 } else {
673 None
674 }
675 })
676 })
677 }
678 _ => vnode::dispatch(rack.continuums(), token),
679 };
680 if let Some(peer_idx) = candidate {
681 if let Some(peer) = peers.get(peer_idx as usize) {
682 let state = peer.state();
683 let accept = state.is_routable()
684 || (include_down && matches!(state, crate::cluster::peer::PeerState::Down));
685 if accept {
686 routable.push((dc_idx, rack_idx, peer_idx));
687 }
688 }
689 }
690 }
691 }
692 routable
693}
694
695fn build_target(
696 dcs: &[crate::cluster::Datacenter],
697 peers: &[crate::cluster::peer::Peer],
698 dc_idx: usize,
699 rack_idx: usize,
700 peer_idx: u32,
701) -> ReplicaTarget {
702 let dc_name = dcs[dc_idx].name().to_string();
703 let rack_name = dcs[dc_idx].racks()[rack_idx].name().to_string();
704 let is_local = peers
705 .get(peer_idx as usize)
706 .is_some_and(crate::cluster::peer::Peer::is_local);
707 ReplicaTarget {
708 peer_idx,
709 dc: dc_name,
710 rack: rack_name,
711 is_local,
712 }
713}
714
715fn plan_with_consistency(
716 cfg: &crate::cluster::pool::PoolConfig,
717 dcs: &[crate::cluster::Datacenter],
718 peers: &[crate::cluster::peer::Peer],
719 consistency: ConsistencyLevel,
720 routing: MsgRouting,
721 local: Vec<(usize, usize, u32)>,
722 remote: Vec<(usize, usize, u32)>,
723) -> DispatchPlan {
724 let want_per_dc_fanout = matches!(consistency, ConsistencyLevel::DcEachSafeQuorum)
725 || matches!(routing, MsgRouting::AllNodesAllRacksAllDcs);
726 let mut targets: Vec<ReplicaTarget> = Vec::new();
727 match consistency {
728 ConsistencyLevel::DcOne => {
729 if local.is_empty() {
730 return DispatchPlan::NoTargets;
731 }
732 let mut best: Option<(RackDistance, (usize, usize, u32))> = None;
733 for (dc_idx, rack_idx, peer_idx) in local {
734 let rack_name = dcs[dc_idx].racks()[rack_idx].name();
735 let d = rack_distance(&cfg.dc, &cfg.rack, &cfg.dc, rack_name);
736 let take = match best {
737 None => true,
738 Some((bd, _)) => d.cost() < bd.cost(),
739 };
740 if take {
741 best = Some((d, (dc_idx, rack_idx, peer_idx)));
742 }
743 }
744 if let Some((_, (dc_idx, rack_idx, peer_idx))) = best {
745 let is_local_node = peers
746 .get(peer_idx as usize)
747 .is_some_and(crate::cluster::peer::Peer::is_local);
748 if is_local_node {
749 return DispatchPlan::LocalDatastore;
750 }
751 targets.push(build_target(dcs, peers, dc_idx, rack_idx, peer_idx));
752 }
753 }
754 ConsistencyLevel::DcQuorum | ConsistencyLevel::DcSafeQuorum => {
755 if local.is_empty() {
756 return DispatchPlan::NoTargets;
757 }
758 for (dc_idx, rack_idx, peer_idx) in local {
759 targets.push(build_target(dcs, peers, dc_idx, rack_idx, peer_idx));
760 }
761 }
762 ConsistencyLevel::DcEachSafeQuorum => {
763 if local.is_empty() && remote.is_empty() {
764 return DispatchPlan::NoTargets;
765 }
766 for (dc_idx, rack_idx, peer_idx) in local.iter().chain(remote.iter()) {
767 targets.push(build_target(dcs, peers, *dc_idx, *rack_idx, *peer_idx));
768 }
769 }
770 }
771 if want_per_dc_fanout && !remote.is_empty() {
772 for (dc_idx, rack_idx, peer_idx) in remote {
773 if !targets.iter().any(|t| t.peer_idx == peer_idx) {
774 targets.push(build_target(dcs, peers, dc_idx, rack_idx, peer_idx));
775 }
776 }
777 }
778 if targets.is_empty() {
779 return DispatchPlan::LocalDatastore;
780 }
781 DispatchPlan::Replicas {
782 targets,
783 consistency,
784 }
785}
786
787impl Dispatcher for ClusterDispatcher {
788 #[allow(
789 clippy::too_many_lines,
790 reason = "single dispatch fn must enumerate every plan; splitting hides the planner-to-effect mapping"
791 )]
792 fn dispatch(&self, req: Msg, responder: ServerSink) -> DispatchOutcome {
793 if req.flags().quit {
794 return DispatchOutcome::Drop;
795 }
796 if let Some(reg) = self.vector_registry.as_ref() {
802 if let Some(outcome) = self.intercept_vector_command(reg.as_ref(), &req) {
803 return outcome;
804 }
805 }
806 let key: Vec<u8> = req
814 .keys()
815 .first()
816 .map(|kp| kp.tag_bytes().to_vec())
817 .unwrap_or_default();
818 let plan = self.plan(&req, &key);
819 let (req_span, _plan_span) = enter_plan_span(req.id(), &plan);
820 match plan {
821 DispatchPlan::Drop => DispatchOutcome::Drop,
822 DispatchPlan::NoTargets => {
823 let err_type = if matches!(req.ty(), MsgType::ReqRedisGet | MsgType::ReqRedisSet) {
824 MsgType::RspRedisError
825 } else {
826 MsgType::RspMcServerError
827 };
828 let rsp = crate::msg::response::make_error(
829 &req,
830 err_type,
831 0,
832 crate::msg::DynErrorCode::DynomiteNoQuorumAchieved,
833 &self.mbuf_pool,
834 );
835 DispatchOutcome::Error(rsp)
836 }
837 DispatchPlan::LocalDatastore => {
838 if let Some(tx) = self.backend.as_ref() {
839 let bytes: Vec<u8> = req
844 .mbufs()
845 .iter()
846 .flat_map(|b| b.readable().to_vec())
847 .collect();
848 if bytes.is_empty() {
849 return DispatchOutcome::Drop;
853 }
854 let env = OutboundRequest {
855 bytes,
856 req_id: req.id(),
857 responder,
858 span: req_span.clone(),
859 ty: crate::proto::dnode::DmsgType::Req,
860 target_peer_idx: None,
861 };
862 if let Err(err) = tx.try_send(env) {
863 if let Some(m) = self.failure_metrics.as_ref() {
866 match err {
867 tokio::sync::mpsc::error::TrySendError::Full(_) => {
868 m.record_backend_send_full();
869 }
870 tokio::sync::mpsc::error::TrySendError::Closed(_) => {
871 m.record_backend_send_closed();
872 }
873 }
874 }
875 let err_type =
876 if matches!(req.ty(), MsgType::ReqRedisGet | MsgType::ReqRedisSet) {
877 MsgType::RspRedisError
878 } else {
879 MsgType::RspMcServerError
880 };
881 let rsp = crate::msg::response::make_error(
882 &req,
883 err_type,
884 0,
885 crate::msg::DynErrorCode::DynomiteNoQuorumAchieved,
886 &self.mbuf_pool,
887 );
888 return DispatchOutcome::Error(rsp);
889 }
890 }
891 DispatchOutcome::Pending
892 }
893 DispatchPlan::Replicas {
894 targets,
895 consistency,
896 } => self.dispatch_replicas(&req, &req_span, &targets, consistency, responder),
897 }
898 }
899}
900
901impl ClusterDispatcher {
902 fn dispatch_replicas(
925 &self,
926 req: &Msg,
927 req_span: &tracing::Span,
928 targets: &[ReplicaTarget],
929 consistency: ConsistencyLevel,
930 responder: ServerSink,
931 ) -> DispatchOutcome {
932 if targets.is_empty() {
933 return DispatchOutcome::Drop;
934 }
935 let bytes: Vec<u8> = req
939 .mbufs()
940 .iter()
941 .flat_map(|b| b.readable().to_vec())
942 .collect();
943 if bytes.is_empty() {
944 return DispatchOutcome::Drop;
945 }
946 let peer_states = self.snapshot_peer_states(targets);
950 let is_read = matches!(req.ty(), MsgType::Unknown) || req.flags().is_read;
951 let is_write = !is_read;
952 let handoff_active = self.hinted_handoff_active() && is_write;
953 if targets.len() == 1 {
956 return self.dispatch_replicas_direct(
957 req,
958 req_span,
959 targets,
960 &bytes,
961 &responder,
962 &HandoffCtx {
963 handoff_active,
964 peer_states: &peer_states,
965 },
966 );
967 }
968 let cfg = self.pool.config();
970 let local_dc = cfg.dc.clone();
971 let (intermediate_tx, intermediate_rx) =
976 mpsc::channel::<OutboundEnvelope>(targets.len() + 1);
977 let target_pairs: Vec<(u32, String)> =
980 targets.iter().map(|t| (t.peer_idx, t.dc.clone())).collect();
981 let repair_key: Option<Vec<u8>> = req
984 .keys()
985 .first()
986 .map(|kp| kp.tag_bytes().to_vec())
987 .filter(|k| !k.is_empty());
988 let repair_ctx = repair_key.map(|key| ReadRepairContext {
989 req_id: req.id(),
990 req_ty: req.ty(),
991 key,
992 mbuf_pool: self.mbuf_pool.clone(),
993 peer_backends: self.peer_backends.clone(),
994 local_backend: self.backend.clone(),
995 target_is_local: targets.iter().map(|t| (t.peer_idx, t.is_local)).collect(),
996 });
997 let mut sent = 0usize;
1000 let mut hinted = 0usize;
1001 for target in targets {
1002 let action = Self::choose_target_action(target, handoff_active, &peer_states);
1003 match action {
1004 TargetAction::Send => {
1005 if self.fanout_send(target, req, req_span, &bytes, &intermediate_tx) {
1006 sent += 1;
1007 } else if handoff_active
1008 && self.hint_target(target, &bytes, req, req_span, &intermediate_tx)
1009 {
1010 hinted += 1;
1011 }
1012 }
1013 TargetAction::Hint => {
1014 if self.hint_target(target, &bytes, req, req_span, &intermediate_tx) {
1015 hinted += 1;
1016 }
1017 }
1018 }
1019 }
1020 drop(intermediate_tx);
1025 if sent + hinted == 0 {
1026 return DispatchOutcome::Error(self.no_quorum_error(req));
1027 }
1028 let req_id = req.id();
1029 let req_ty = req.ty();
1030 let mbuf_pool = self.mbuf_pool.clone();
1031 let failure_metrics = self.failure_metrics.clone();
1032 tokio::spawn(coalesce_actor(
1033 req_id,
1034 req_ty,
1035 consistency,
1036 target_pairs,
1037 local_dc,
1038 intermediate_rx,
1039 responder,
1040 mbuf_pool,
1041 repair_ctx,
1042 failure_metrics,
1043 ));
1044 DispatchOutcome::Pending
1045 }
1046
1047 fn snapshot_peer_states(
1052 &self,
1053 targets: &[ReplicaTarget],
1054 ) -> std::collections::HashMap<u32, crate::cluster::peer::PeerState> {
1055 use crate::cluster::peer::PeerState;
1056 let peers = self.pool.peers().read();
1057 let mut out = std::collections::HashMap::with_capacity(targets.len());
1058 for t in targets {
1059 let state = if t.is_local {
1060 PeerState::Normal
1061 } else {
1062 peers
1063 .get(t.peer_idx as usize)
1064 .map_or(PeerState::Unknown, crate::cluster::peer::Peer::state)
1065 };
1066 out.insert(t.peer_idx, state);
1067 }
1068 out
1069 }
1070
1071 fn choose_target_action(
1072 target: &ReplicaTarget,
1073 handoff_active: bool,
1074 peer_states: &std::collections::HashMap<u32, crate::cluster::peer::PeerState>,
1075 ) -> TargetAction {
1076 use crate::cluster::peer::PeerState;
1077 if !handoff_active {
1078 return TargetAction::Send;
1079 }
1080 let state = peer_states
1081 .get(&target.peer_idx)
1082 .copied()
1083 .unwrap_or(PeerState::Unknown);
1084 match state {
1085 PeerState::Down => TargetAction::Hint,
1086 _ => TargetAction::Send,
1087 }
1088 }
1089
1090 fn fanout_send(
1093 &self,
1094 target: &ReplicaTarget,
1095 req: &Msg,
1096 req_span: &tracing::Span,
1097 bytes: &[u8],
1098 intermediate_tx: &mpsc::Sender<OutboundEnvelope>,
1099 ) -> bool {
1100 let env = OutboundRequest {
1101 bytes: bytes.to_vec(),
1102 req_id: req.id(),
1103 responder: intermediate_tx.clone(),
1104 span: req_span.clone(),
1105 ty: crate::proto::dnode::DmsgType::Req,
1106 target_peer_idx: Some(target.peer_idx),
1107 };
1108 let send_result = if target.is_local {
1109 self.backend.as_ref().map(|tx| tx.try_send(env))
1110 } else {
1111 self.peer_backends
1112 .get(&target.peer_idx)
1113 .map(|tx| tx.try_send(env))
1114 };
1115 match send_result {
1116 Some(Ok(())) => true,
1117 Some(Err(err)) => {
1118 self.observe_send_error(target, &err);
1119 false
1120 }
1121 None => false,
1122 }
1123 }
1124
1125 fn observe_send_error(
1130 &self,
1131 target: &ReplicaTarget,
1132 err: &tokio::sync::mpsc::error::TrySendError<OutboundRequest>,
1133 ) {
1134 let Some(m) = self.failure_metrics.as_ref() else {
1135 return;
1136 };
1137 if target.is_local {
1138 match err {
1139 tokio::sync::mpsc::error::TrySendError::Full(_) => m.record_backend_send_full(),
1140 tokio::sync::mpsc::error::TrySendError::Closed(_) => {
1141 m.record_backend_send_closed();
1142 }
1143 }
1144 } else {
1145 let peer_dc = self.peer_dc_label(target.peer_idx);
1146 match err {
1147 tokio::sync::mpsc::error::TrySendError::Full(_) => {
1148 m.record_peer_send_full(target.peer_idx, &peer_dc);
1149 }
1150 tokio::sync::mpsc::error::TrySendError::Closed(_) => {
1151 m.record_peer_send_closed(target.peer_idx, &peer_dc);
1152 }
1153 }
1154 }
1155 }
1156
1157 fn hint_target(
1161 &self,
1162 target: &ReplicaTarget,
1163 bytes: &[u8],
1164 req: &Msg,
1165 req_span: &tracing::Span,
1166 intermediate_tx: &mpsc::Sender<OutboundEnvelope>,
1167 ) -> bool {
1168 let Some(store) = self.hint_store.as_ref() else {
1169 return false;
1170 };
1171 let cfg = self.pool.config();
1172 let ttl = std::time::Duration::from_secs(cfg.hint_ttl_seconds.max(1));
1173 match store.enqueue(target.peer_idx, bytes.to_vec(), ttl) {
1174 Ok(()) => {}
1175 Err(e) => {
1176 tracing::debug!(
1177 target: "dynomite::hints",
1178 peer_idx = target.peer_idx,
1179 error = %e,
1180 "hint enqueue failed"
1181 );
1182 return false;
1183 }
1184 }
1185 let synth = synth_hint_reply(req, &self.mbuf_pool);
1186 let env = OutboundEnvelope {
1187 req_id: req.id(),
1188 rsp: synth,
1189 span: req_span.clone(),
1190 source_peer_idx: Some(target.peer_idx),
1191 };
1192 if intermediate_tx.try_send(env).is_err() {
1193 tracing::debug!(
1198 target: "dynomite::hints",
1199 peer_idx = target.peer_idx,
1200 "hint synth-reply could not be queued; coalescer absent"
1201 );
1202 }
1203 tracing::debug!(
1204 target: "dynomite::hints",
1205 peer_idx = target.peer_idx,
1206 bytes = bytes.len(),
1207 "stored hint for down peer"
1208 );
1209 true
1210 }
1211
1212 fn dispatch_replicas_direct(
1213 &self,
1214 req: &Msg,
1215 req_span: &tracing::Span,
1216 targets: &[ReplicaTarget],
1217 bytes: &[u8],
1218 responder: &ServerSink,
1219 ctx: &HandoffCtx<'_>,
1220 ) -> DispatchOutcome {
1221 debug_assert_eq!(targets.len(), 1);
1222 let target = &targets[0];
1223 if let TargetAction::Hint =
1227 Self::choose_target_action(target, ctx.handoff_active, ctx.peer_states)
1228 {
1229 if self.hint_single_target_direct(target, bytes, req, req_span, responder) {
1230 return DispatchOutcome::Pending;
1231 }
1232 return DispatchOutcome::Error(self.no_quorum_error(req));
1233 }
1234 let env = OutboundRequest {
1235 bytes: bytes.to_vec(),
1236 req_id: req.id(),
1237 responder: responder.clone(),
1238 span: req_span.clone(),
1239 ty: crate::proto::dnode::DmsgType::Req,
1240 target_peer_idx: Some(target.peer_idx),
1241 };
1242 let send_result = if target.is_local {
1243 self.backend.as_ref().map(|tx| tx.try_send(env))
1244 } else {
1245 self.peer_backends
1246 .get(&target.peer_idx)
1247 .map(|tx| tx.try_send(env))
1248 };
1249 let sent = match send_result {
1250 Some(Ok(())) => true,
1251 Some(Err(ref err)) => {
1252 self.observe_send_error(target, err);
1253 false
1254 }
1255 None => false,
1256 };
1257 if sent {
1258 return DispatchOutcome::Pending;
1259 }
1260 if ctx.handoff_active
1261 && self.hint_single_target_direct(target, bytes, req, req_span, responder)
1262 {
1263 return DispatchOutcome::Pending;
1264 }
1265 DispatchOutcome::Error(self.no_quorum_error(req))
1266 }
1267
1268 fn hint_single_target_direct(
1273 &self,
1274 target: &ReplicaTarget,
1275 bytes: &[u8],
1276 req: &Msg,
1277 req_span: &tracing::Span,
1278 responder: &ServerSink,
1279 ) -> bool {
1280 let Some(store) = self.hint_store.as_ref() else {
1281 return false;
1282 };
1283 let cfg = self.pool.config();
1284 let ttl = std::time::Duration::from_secs(cfg.hint_ttl_seconds.max(1));
1285 if let Err(e) = store.enqueue(target.peer_idx, bytes.to_vec(), ttl) {
1286 tracing::debug!(
1287 target: "dynomite::hints",
1288 peer_idx = target.peer_idx,
1289 error = %e,
1290 "hint enqueue failed (single-target)"
1291 );
1292 return false;
1293 }
1294 let synth = synth_hint_reply(req, &self.mbuf_pool);
1295 let env = OutboundEnvelope {
1296 req_id: req.id(),
1297 rsp: synth,
1298 span: req_span.clone(),
1299 source_peer_idx: Some(target.peer_idx),
1300 };
1301 let _ = responder.try_send(env);
1302 true
1303 }
1304
1305 fn no_quorum_error(&self, req: &Msg) -> Msg {
1306 let err_type = if matches!(req.ty(), MsgType::ReqRedisGet | MsgType::ReqRedisSet) {
1307 MsgType::RspRedisError
1308 } else {
1309 MsgType::RspMcServerError
1310 };
1311 crate::msg::response::make_error(
1312 req,
1313 err_type,
1314 0,
1315 crate::msg::DynErrorCode::DynomiteNoQuorumAchieved,
1316 &self.mbuf_pool,
1317 )
1318 }
1319
1320 fn intercept_vector_command(
1328 &self,
1329 registry: &crate::vector::registry::VectorRegistry,
1330 req: &Msg,
1331 ) -> Option<DispatchOutcome> {
1332 match req.ty() {
1333 MsgType::ReqRedisFtCreate
1334 | MsgType::ReqRedisFtSearch
1335 | MsgType::ReqRedisFtInfo
1336 | MsgType::ReqRedisFtList
1337 | MsgType::ReqRedisFtDropindex
1338 | MsgType::ReqRedisFtRegex
1339 | MsgType::ReqRedisFtUnknown => Some(self.run_ft_command(registry, req)),
1340 MsgType::ReqRedisHset => self.intercept_hset(registry, req),
1341 _ => None,
1342 }
1343 }
1344
1345 fn run_ft_command(
1348 &self,
1349 registry: &crate::vector::registry::VectorRegistry,
1350 req: &Msg,
1351 ) -> DispatchOutcome {
1352 let recovered_kw: Vec<u8>;
1360 let keyword: &[u8] = match req.ty() {
1361 MsgType::ReqRedisFtCreate => b"FT.CREATE",
1362 MsgType::ReqRedisFtSearch => b"FT.SEARCH",
1363 MsgType::ReqRedisFtInfo => b"FT.INFO",
1364 MsgType::ReqRedisFtList => b"FT.LIST",
1365 MsgType::ReqRedisFtDropindex => b"FT.DROPINDEX",
1366 MsgType::ReqRedisFtRegex => b"FT.REGEX",
1367 MsgType::ReqRedisFtUnknown => {
1368 recovered_kw = first_bulk_token(req).unwrap_or_else(|| b"FT.UNKNOWN".to_vec());
1369 recovered_kw.as_slice()
1370 }
1371 _ => return DispatchOutcome::Drop,
1376 };
1377 let mut args: Vec<&[u8]> = Vec::with_capacity(1 + req.keys().len() + req.args().len());
1378 args.push(keyword);
1379 for k in req.keys() {
1380 args.push(k.key());
1381 }
1382 for a in req.args() {
1383 args.push(a.bytes());
1384 }
1385 let bytes = crate::proto::redis::ft::dispatch(registry, &args);
1386 DispatchOutcome::Inline(synthetic_redis_reply(req, &self.mbuf_pool, &bytes))
1387 }
1388
1389 fn intercept_hset(
1395 &self,
1396 registry: &crate::vector::registry::VectorRegistry,
1397 req: &Msg,
1398 ) -> Option<DispatchOutcome> {
1399 let mut args: Vec<&[u8]> = Vec::with_capacity(req.keys().len() + req.args().len());
1400 for k in req.keys() {
1401 args.push(k.key());
1402 }
1403 for a in req.args() {
1404 args.push(a.bytes());
1405 }
1406 match crate::proto::redis::ft::maybe_index_hset(registry, &args) {
1407 Ok(_) => None,
1408 Err(e) => {
1409 let payload = format!("-ERR {e}\r\n");
1410 Some(DispatchOutcome::Error(synthetic_redis_reply(
1411 req,
1412 &self.mbuf_pool,
1413 payload.as_bytes(),
1414 )))
1415 }
1416 }
1417 }
1418}
1419
1420fn synthetic_redis_reply(req: &Msg, pool: &MbufPool, payload: &[u8]) -> Msg {
1426 let mut rsp = Msg::new(req.id(), MsgType::RspRedisStatus, false);
1427 rsp.set_parent_id(req.id());
1428 let mut written = 0usize;
1429 while written < payload.len() {
1430 let mut buf = pool.get();
1431 let n = buf.recv(&payload[written..]);
1432 debug_assert!(
1433 n > 0,
1434 "MbufPool returned a buffer with zero writable capacity"
1435 );
1436 rsp.mbufs_mut().push_back(buf);
1437 written += n;
1438 }
1439 rsp.recompute_mlen();
1440 rsp
1441}
1442
1443fn first_bulk_token(req: &Msg) -> Option<Vec<u8>> {
1453 let mut wire: Vec<u8> = Vec::new();
1454 for buf in req.mbufs() {
1455 wire.extend_from_slice(buf.readable());
1456 if wire.len() > 256 {
1457 break;
1458 }
1459 }
1460 let mut p = 0usize;
1461 if wire.first() == Some(&b'*') {
1462 let cr = wire.iter().position(|&b| b == b'\r')?;
1463 if wire.get(cr + 1) != Some(&b'\n') {
1464 return None;
1465 }
1466 p = cr + 2;
1467 }
1468 if wire.get(p) != Some(&b'$') {
1469 return None;
1470 }
1471 let header_start = p + 1;
1472 let header_cr = wire[header_start..]
1473 .iter()
1474 .position(|&b| b == b'\r')
1475 .map(|i| header_start + i)?;
1476 if wire.get(header_cr + 1) != Some(&b'\n') {
1477 return None;
1478 }
1479 let len_str = std::str::from_utf8(&wire[header_start..header_cr]).ok()?;
1480 let len: usize = len_str.parse().ok()?;
1481 let body_start = header_cr + 2;
1482 let body_end = body_start.checked_add(len)?;
1483 if wire.len() < body_end + 2 {
1484 return None;
1485 }
1486 Some(wire[body_start..body_end].to_vec())
1487}
1488
1489#[derive(Clone)]
1492struct ReadRepairContext {
1493 req_id: crate::core::types::MsgId,
1494 req_ty: MsgType,
1495 key: Vec<u8>,
1499 mbuf_pool: MbufPool,
1500 peer_backends: std::collections::HashMap<u32, mpsc::Sender<OutboundRequest>>,
1501 local_backend: Option<mpsc::Sender<OutboundRequest>>,
1502 target_is_local: std::collections::HashMap<u32, bool>,
1503}
1504
1505#[allow(
1507 clippy::too_many_arguments,
1508 reason = "actor task captures the entire dispatch context; bundling into a struct adds churn for no callsite gain"
1509)]
1510async fn coalesce_actor(
1511 req_id: crate::core::types::MsgId,
1512 req_ty: MsgType,
1513 consistency: ConsistencyLevel,
1514 targets: Vec<(u32, String)>,
1515 local_dc: String,
1516 mut intermediate_rx: mpsc::Receiver<OutboundEnvelope>,
1517 client_tx: ServerSink,
1518 mbuf_pool: MbufPool,
1519 repair_ctx: Option<ReadRepairContext>,
1520 failure_metrics: Option<Arc<crate::stats::FailureMetrics>>,
1521) {
1522 use crate::proto::redis::{CoalesceOutcome, CoalesceTracker};
1523 let mut tracker = CoalesceTracker::new(req_id, consistency, targets, &local_dc);
1524 let mut emitted = false;
1525 while let Some(env) = intermediate_rx.recv().await {
1526 let source = env.source_peer_idx.unwrap_or(u32::MAX);
1527 let span = env.span.clone();
1528 let outcome = tracker.record_reply(source, env.rsp);
1529 match outcome {
1530 CoalesceOutcome::Pending => {}
1531 CoalesceOutcome::Ready {
1532 winner,
1533 divergent_targets,
1534 } => {
1535 if !emitted {
1536 let winner_bytes: Vec<u8> = winner
1537 .mbufs()
1538 .iter()
1539 .flat_map(|b| b.readable().to_vec())
1540 .collect();
1541 let out_env = OutboundEnvelope {
1542 req_id,
1543 rsp: *winner,
1544 span: span.clone(),
1545 source_peer_idx: None,
1546 };
1547 let _ = client_tx.send(out_env).await;
1548 emitted = true;
1549 if !divergent_targets.is_empty() {
1550 if let Some(ctx) = repair_ctx.as_ref() {
1551 schedule_read_repair(ctx, &divergent_targets, &winner_bytes, &span);
1552 }
1553 }
1554 }
1555 }
1556 CoalesceOutcome::Error(reason) => {
1557 if !emitted {
1558 let err_type = if matches!(req_ty, MsgType::ReqRedisGet | MsgType::ReqRedisSet)
1559 {
1560 MsgType::RspRedisError
1561 } else {
1562 MsgType::RspMcServerError
1563 };
1564 let anchor = Msg::new(req_id, req_ty, true);
1565 let rsp = crate::msg::response::make_error(
1566 &anchor,
1567 err_type,
1568 0,
1569 crate::msg::DynErrorCode::DynomiteNoQuorumAchieved,
1570 &mbuf_pool,
1571 );
1572 let _ = client_tx
1573 .send(OutboundEnvelope {
1574 req_id,
1575 rsp,
1576 span: span.clone(),
1577 source_peer_idx: None,
1578 })
1579 .await;
1580 emitted = true;
1581 }
1582 tracing::debug!(target: "dynomite::coalesce", req_id, reason = %reason, "coalesce error");
1583 }
1584 }
1585 }
1586 if !emitted {
1587 if let Some(m) = failure_metrics.as_ref() {
1594 m.record_response_timeout(consistency);
1595 }
1596 let err_type = if matches!(req_ty, MsgType::ReqRedisGet | MsgType::ReqRedisSet) {
1597 MsgType::RspRedisError
1598 } else {
1599 MsgType::RspMcServerError
1600 };
1601 let anchor = Msg::new(req_id, req_ty, true);
1602 let rsp = crate::msg::response::make_error(
1603 &anchor,
1604 err_type,
1605 0,
1606 crate::msg::DynErrorCode::DynomiteNoQuorumAchieved,
1607 &mbuf_pool,
1608 );
1609 let _ = client_tx
1610 .send(OutboundEnvelope {
1611 req_id,
1612 rsp,
1613 span: tracing::Span::none(),
1614 source_peer_idx: None,
1615 })
1616 .await;
1617 }
1618}
1619
1620fn repair_sink() -> ServerSink {
1623 let (tx, mut rx) = mpsc::channel::<OutboundEnvelope>(8);
1624 tokio::spawn(async move {
1625 while rx.recv().await.is_some() {
1626 }
1629 });
1630 tx
1631}
1632
1633fn decode_winner_for_repair(payload: &[u8]) -> Option<RepairAction> {
1642 if payload == b"$-1\r\n" {
1643 return Some(RepairAction::Delete);
1644 }
1645 if !payload.starts_with(b"$") {
1646 return None;
1647 }
1648 let crlf = payload.iter().position(|&b| b == b'\r')?;
1650 if payload.get(crlf + 1).copied() != Some(b'\n') {
1651 return None;
1652 }
1653 let len_str = std::str::from_utf8(&payload[1..crlf]).ok()?;
1654 let len: usize = len_str.parse().ok()?;
1655 let body_start = crlf + 2;
1656 let body_end = body_start.checked_add(len)?;
1657 if payload.len() < body_end + 2 {
1658 return None;
1659 }
1660 if &payload[body_end..body_end + 2] != b"\r\n" {
1661 return None;
1662 }
1663 Some(RepairAction::Write(payload[body_start..body_end].to_vec()))
1664}
1665
1666struct HandoffCtx<'a> {
1672 handoff_active: bool,
1673 peer_states: &'a std::collections::HashMap<u32, crate::cluster::peer::PeerState>,
1674}
1675
1676#[derive(Clone, Copy, Debug, Eq, PartialEq)]
1679enum TargetAction {
1680 Send,
1682 Hint,
1685}
1686
1687fn synth_hint_reply(req: &Msg, pool: &MbufPool) -> Msg {
1698 crate::msg::response::make_simple_redis(req, pool, b"+OK\r\n")
1699}
1700
1701enum RepairAction {
1704 Write(Vec<u8>),
1706 Delete,
1709}
1710
1711fn build_repair_bytes(action: &RepairAction, key: &[u8]) -> Vec<u8> {
1713 match action {
1714 RepairAction::Write(value) => {
1715 let mut out = Vec::with_capacity(key.len() + value.len() + 32);
1716 out.extend_from_slice(b"*3\r\n$3\r\nSET\r\n$");
1717 out.extend_from_slice(key.len().to_string().as_bytes());
1718 out.extend_from_slice(b"\r\n");
1719 out.extend_from_slice(key);
1720 out.extend_from_slice(b"\r\n$");
1721 out.extend_from_slice(value.len().to_string().as_bytes());
1722 out.extend_from_slice(b"\r\n");
1723 out.extend_from_slice(value);
1724 out.extend_from_slice(b"\r\n");
1725 out
1726 }
1727 RepairAction::Delete => {
1728 let mut out = Vec::with_capacity(key.len() + 24);
1729 out.extend_from_slice(b"*2\r\n$3\r\nDEL\r\n$");
1730 out.extend_from_slice(key.len().to_string().as_bytes());
1731 out.extend_from_slice(b"\r\n");
1732 out.extend_from_slice(key);
1733 out.extend_from_slice(b"\r\n");
1734 out
1735 }
1736 }
1737}
1738
1739fn schedule_read_repair(
1757 ctx: &ReadRepairContext,
1758 divergent: &[u32],
1759 winner_bytes: &[u8],
1760 span: &tracing::Span,
1761) {
1762 if !matches!(ctx.req_ty, MsgType::ReqRedisGet) {
1763 return;
1764 }
1765 let Some(action) = decode_winner_for_repair(winner_bytes) else {
1766 return;
1767 };
1768 let bytes = build_repair_bytes(&action, &ctx.key);
1769 let sink = repair_sink();
1770 for &peer_idx in divergent {
1771 let is_local = ctx.target_is_local.get(&peer_idx).copied().unwrap_or(false);
1772 let env = OutboundRequest {
1773 bytes: bytes.clone(),
1774 req_id: ctx.req_id,
1775 responder: sink.clone(),
1776 span: span.clone(),
1777 ty: crate::proto::dnode::DmsgType::ReqForward,
1778 target_peer_idx: Some(peer_idx),
1779 };
1780 let sent = if is_local {
1781 ctx.local_backend
1782 .as_ref()
1783 .is_some_and(|tx| tx.try_send(env).is_ok())
1784 } else {
1785 ctx.peer_backends
1786 .get(&peer_idx)
1787 .is_some_and(|tx| tx.try_send(env).is_ok())
1788 };
1789 if sent {
1790 let _ = &ctx.mbuf_pool;
1791 tracing::debug!(
1792 target: "dynomite::read_repair",
1793 req_id = ctx.req_id,
1794 peer_idx,
1795 bytes = bytes.len(),
1796 "scheduled read-repair write",
1797 );
1798 } else {
1799 tracing::debug!(
1800 target: "dynomite::read_repair",
1801 req_id = ctx.req_id,
1802 peer_idx,
1803 "read-repair drop: backend channel unavailable or full",
1804 );
1805 }
1806 }
1807}
1808
1809#[cfg(test)]
1810mod tests {
1811 use super::*;
1812 use crate::cluster::peer::{Peer, PeerEndpoint, PeerState};
1813 use crate::conf::DataStore;
1814 use crate::hashkit::DynToken;
1815
1816 fn cfg(read: ConsistencyLevel, write: ConsistencyLevel) -> crate::cluster::PoolConfig {
1817 crate::cluster::PoolConfig {
1818 read_consistency: read,
1819 write_consistency: write,
1820 dc: "dc1".into(),
1821 rack: "rA".into(),
1822 ..crate::cluster::PoolConfig::default()
1823 }
1824 }
1825
1826 fn peer(idx: u32, dc: &str, rack: &str, tok: u32, is_local: bool, is_same: bool) -> Peer {
1827 let mut p = Peer::new(
1828 idx,
1829 PeerEndpoint::tcp("h".into(), 8101 + u16::try_from(idx).unwrap_or(0)),
1830 rack.into(),
1831 dc.into(),
1832 vec![DynToken::from_u32(tok)],
1833 is_local,
1834 is_same,
1835 false,
1836 );
1837 p.set_state(PeerState::Normal, 0);
1838 p
1839 }
1840
1841 fn pool(read: ConsistencyLevel, write: ConsistencyLevel, peers: Vec<Peer>) -> Arc<ServerPool> {
1842 let pool = ServerPool::new(cfg(read, write), peers);
1843 pool.preselect_remote_racks();
1844 Arc::new(pool)
1845 }
1846
1847 #[test]
1848 fn local_node_only_short_circuits() {
1849 let p = pool(
1850 ConsistencyLevel::DcOne,
1851 ConsistencyLevel::DcOne,
1852 vec![peer(0, "dc1", "rA", 10, true, true)],
1853 );
1854 let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
1855 req.set_routing(MsgRouting::LocalNodeOnly);
1856 assert_eq!(
1857 ClusterDispatcher::new(p).plan(&req, b"k"),
1858 DispatchPlan::LocalDatastore,
1859 );
1860 }
1861
1862 #[test]
1863 fn dc_one_read_targets_local_rack_when_present() {
1864 let p = pool(
1865 ConsistencyLevel::DcOne,
1866 ConsistencyLevel::DcOne,
1867 vec![
1868 peer(0, "dc1", "rA", 10, true, true),
1869 peer(1, "dc1", "rB", 20, false, true),
1870 peer(2, "dc2", "rA", 30, false, false),
1871 ],
1872 );
1873 let req = Msg::new(1, MsgType::ReqRedisGet, true);
1874 let plan = ClusterDispatcher::new(p).plan(&req, b"hello");
1876 assert!(matches!(plan, DispatchPlan::LocalDatastore));
1877 }
1878
1879 #[test]
1880 fn dc_quorum_fans_out_local_dc() {
1881 let p = pool(
1882 ConsistencyLevel::DcQuorum,
1883 ConsistencyLevel::DcQuorum,
1884 vec![
1885 peer(0, "dc1", "rA", 10, true, true),
1886 peer(1, "dc1", "rB", 20, false, true),
1887 peer(2, "dc2", "rA", 30, false, false),
1888 ],
1889 );
1890 let req = Msg::new(1, MsgType::ReqRedisGet, true);
1891 let plan = ClusterDispatcher::new(p).plan(&req, b"k");
1892 match plan {
1893 DispatchPlan::Replicas { targets: rs, .. } => {
1894 assert_eq!(rs.len(), 2);
1895 for r in rs {
1896 assert_eq!(r.dc, "dc1");
1897 }
1898 }
1899 _ => panic!("expected replicas"),
1900 }
1901 }
1902
1903 #[test]
1904 fn dc_each_safe_quorum_fans_out_per_dc() {
1905 let p = pool(
1906 ConsistencyLevel::DcEachSafeQuorum,
1907 ConsistencyLevel::DcEachSafeQuorum,
1908 vec![
1909 peer(0, "dc1", "rA", 10, true, true),
1910 peer(1, "dc2", "rA", 20, false, false),
1911 ],
1912 );
1913 let req = Msg::new(1, MsgType::ReqRedisGet, true);
1914 let plan = ClusterDispatcher::new(p).plan(&req, b"k");
1915 match plan {
1916 DispatchPlan::Replicas { targets: rs, .. } => {
1917 assert_eq!(rs.len(), 2);
1918 let dcs: Vec<&str> = rs.iter().map(|r| r.dc.as_str()).collect();
1919 assert!(dcs.contains(&"dc1"));
1920 assert!(dcs.contains(&"dc2"));
1921 }
1922 _ => panic!("expected replicas"),
1923 }
1924 }
1925
1926 #[test]
1927 fn no_routable_peers_returns_no_targets() {
1928 let mut p0 = peer(0, "dc1", "rA", 10, true, true);
1929 p0.set_state(PeerState::Down, 0);
1930 let p = pool(
1931 ConsistencyLevel::DcQuorum,
1932 ConsistencyLevel::DcQuorum,
1933 vec![p0],
1934 );
1935 let req = Msg::new(1, MsgType::ReqRedisGet, true);
1936 let plan = ClusterDispatcher::new(p).plan(&req, b"k");
1937 assert_eq!(plan, DispatchPlan::NoTargets);
1938 }
1939
1940 #[test]
1948 fn no_targets_error_response_carries_dynomite_wire_bytes() {
1949 let mut p0 = peer(0, "dc1", "rA", 10, true, true);
1950 p0.set_state(PeerState::Down, 0);
1951 let p = pool(
1952 ConsistencyLevel::DcQuorum,
1953 ConsistencyLevel::DcQuorum,
1954 vec![p0],
1955 );
1956 let disp = ClusterDispatcher::new(p);
1957 let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
1958 req.push_key(crate::msg::keypos::KeyPos::without_tag(b"k".to_vec()));
1959 let (tx, _rx) = mpsc::channel(1);
1960 let outcome = disp.dispatch(req, tx);
1961 match outcome {
1962 DispatchOutcome::Error(rsp) => {
1963 assert_eq!(rsp.ty(), MsgType::RspRedisError);
1964 assert!(rsp.flags().is_error);
1965 let bytes: Vec<u8> = rsp
1966 .mbufs()
1967 .iter()
1968 .flat_map(|b| b.readable().to_vec())
1969 .collect();
1970 assert!(
1971 !bytes.is_empty(),
1972 "NoTargets must produce on-wire bytes, not a 0-byte hang"
1973 );
1974 assert!(bytes.starts_with(b"-Dynomite: "));
1975 assert!(bytes.ends_with(b"\r\n"));
1976 assert_eq!(rsp.mlen() as usize, bytes.len());
1977 }
1978 other => panic!("expected DispatchOutcome::Error, got {other:?}"),
1979 }
1980 }
1981
1982 #[test]
1986 fn no_targets_error_response_memcache_wire_bytes() {
1987 let mut cfg = cfg(ConsistencyLevel::DcQuorum, ConsistencyLevel::DcQuorum);
1993 cfg.data_store = DataStore::Memcache;
1994 let mut p0 = peer(0, "dc1", "rA", 10, true, true);
1995 p0.set_state(PeerState::Down, 0);
1996 let pool_arc = ServerPool::new(cfg, vec![p0]);
1997 pool_arc.preselect_remote_racks();
1998 let disp = ClusterDispatcher::new(Arc::new(pool_arc));
1999 let mut req = Msg::new(1, MsgType::ReqMcGet, true);
2000 req.push_key(crate::msg::keypos::KeyPos::without_tag(b"k".to_vec()));
2001 let (tx, _rx) = mpsc::channel(1);
2002 let outcome = disp.dispatch(req, tx);
2003 match outcome {
2004 DispatchOutcome::Error(rsp) => {
2005 assert_eq!(rsp.ty(), MsgType::RspMcServerError);
2006 let bytes: Vec<u8> = rsp
2007 .mbufs()
2008 .iter()
2009 .flat_map(|b| b.readable().to_vec())
2010 .collect();
2011 assert!(
2012 !bytes.is_empty(),
2013 "NoTargets must produce on-wire bytes, not a 0-byte hang"
2014 );
2015 assert!(bytes.starts_with(b"SERVER_ERROR "));
2016 assert!(bytes.ends_with(b"\r\n"));
2017 }
2018 other => panic!("expected DispatchOutcome::Error, got {other:?}"),
2019 }
2020 }
2021
2022 use crate::cluster::pool::{BucketType, PoolConfig};
2023
2024 fn pool_with_bucket_types(
2025 pool_read: ConsistencyLevel,
2026 pool_write: ConsistencyLevel,
2027 bucket_types: Vec<BucketType>,
2028 default_bucket_type: Option<&str>,
2029 peers: Vec<Peer>,
2030 ) -> Arc<ServerPool> {
2031 let cfg = PoolConfig {
2032 read_consistency: pool_read,
2033 write_consistency: pool_write,
2034 dc: "dc1".into(),
2035 rack: "rA".into(),
2036 bucket_types,
2037 default_bucket_type: default_bucket_type.map(str::to_string),
2038 ..PoolConfig::default()
2039 };
2040 let pool = ServerPool::new(cfg, peers);
2041 pool.preselect_remote_racks();
2042 Arc::new(pool)
2043 }
2044
2045 fn three_local_peers() -> Vec<Peer> {
2046 vec![
2047 peer(0, "dc1", "rA", 10, true, true),
2048 peer(1, "dc1", "rB", 20, false, true),
2049 peer(2, "dc1", "rC", 30, false, true),
2050 ]
2051 }
2052
2053 #[test]
2054 fn bucket_type_overrides_pool_consistency() {
2055 let bts = vec![BucketType {
2057 name: "hot".into(),
2058 read_consistency: ConsistencyLevel::DcQuorum,
2059 write_consistency: ConsistencyLevel::DcQuorum,
2060 n_val: 0,
2061 }];
2062 let p = pool_with_bucket_types(
2063 ConsistencyLevel::DcOne,
2064 ConsistencyLevel::DcOne,
2065 bts,
2066 None,
2067 three_local_peers(),
2068 );
2069 let req = Msg::new(1, MsgType::ReqRedisGet, true);
2070 let plan = ClusterDispatcher::new(p).plan(&req, b"hot/key1");
2071 match plan {
2072 DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 3),
2073 other => panic!("expected DC_QUORUM fan-out, got {other:?}"),
2074 }
2075 }
2076
2077 #[test]
2078 fn slashless_key_falls_back_to_pool_default() {
2079 let bts = vec![BucketType {
2080 name: "hot".into(),
2081 read_consistency: ConsistencyLevel::DcQuorum,
2082 write_consistency: ConsistencyLevel::DcQuorum,
2083 n_val: 0,
2084 }];
2085 let p = pool_with_bucket_types(
2086 ConsistencyLevel::DcOne,
2087 ConsistencyLevel::DcOne,
2088 bts,
2089 None,
2090 three_local_peers(),
2091 );
2092 let req = Msg::new(1, MsgType::ReqRedisGet, true);
2093 let plan = ClusterDispatcher::new(p).plan(&req, b"plain-key");
2094 assert!(matches!(plan, DispatchPlan::LocalDatastore));
2097 }
2098
2099 #[test]
2100 fn unknown_bucket_uses_default_bucket_type_when_set() {
2101 let bts = vec![BucketType {
2102 name: "safe".into(),
2103 read_consistency: ConsistencyLevel::DcQuorum,
2104 write_consistency: ConsistencyLevel::DcQuorum,
2105 n_val: 0,
2106 }];
2107 let p = pool_with_bucket_types(
2108 ConsistencyLevel::DcOne,
2109 ConsistencyLevel::DcOne,
2110 bts,
2111 Some("safe"),
2112 three_local_peers(),
2113 );
2114 let req = Msg::new(1, MsgType::ReqRedisGet, true);
2115 let plan = ClusterDispatcher::new(p.clone()).plan(&req, b"plain-key");
2118 match plan {
2119 DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 3),
2120 other => panic!("expected DC_QUORUM via default bucket, got {other:?}"),
2121 }
2122 let plan = ClusterDispatcher::new(p).plan(&req, b"unknown-bucket/key");
2125 match plan {
2126 DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 3),
2127 other => panic!("expected DC_QUORUM via default bucket, got {other:?}"),
2128 }
2129 }
2130
2131 #[test]
2132 fn unknown_bucket_with_no_default_uses_pool_default() {
2133 let bts = vec![BucketType {
2134 name: "safe".into(),
2135 read_consistency: ConsistencyLevel::DcQuorum,
2136 write_consistency: ConsistencyLevel::DcQuorum,
2137 n_val: 0,
2138 }];
2139 let p = pool_with_bucket_types(
2140 ConsistencyLevel::DcOne,
2141 ConsistencyLevel::DcOne,
2142 bts,
2143 None,
2144 three_local_peers(),
2145 );
2146 let req = Msg::new(1, MsgType::ReqRedisGet, true);
2147 let plan = ClusterDispatcher::new(p).plan(&req, b"unknown-bucket/key");
2148 assert!(matches!(plan, DispatchPlan::LocalDatastore));
2149 }
2150
2151 #[test]
2152 fn n_val_one_caps_replicas_to_first_target() {
2153 let bts = vec![BucketType {
2154 name: "thin".into(),
2155 read_consistency: ConsistencyLevel::DcQuorum,
2156 write_consistency: ConsistencyLevel::DcQuorum,
2157 n_val: 1,
2158 }];
2159 let p = pool_with_bucket_types(
2160 ConsistencyLevel::DcOne,
2161 ConsistencyLevel::DcOne,
2162 bts,
2163 None,
2164 three_local_peers(),
2165 );
2166 let req = Msg::new(1, MsgType::ReqRedisGet, true);
2167 let plan = ClusterDispatcher::new(p).plan(&req, b"thin/key");
2168 match plan {
2169 DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 1),
2170 other => panic!("expected single-target plan, got {other:?}"),
2171 }
2172 }
2173
2174 #[test]
2175 fn n_val_two_caps_replicas_to_first_two_targets() {
2176 let bts = vec![BucketType {
2177 name: "medium".into(),
2178 read_consistency: ConsistencyLevel::DcQuorum,
2179 write_consistency: ConsistencyLevel::DcQuorum,
2180 n_val: 2,
2181 }];
2182 let p = pool_with_bucket_types(
2183 ConsistencyLevel::DcOne,
2184 ConsistencyLevel::DcOne,
2185 bts,
2186 None,
2187 three_local_peers(),
2188 );
2189 let req = Msg::new(1, MsgType::ReqRedisGet, true);
2190 let plan = ClusterDispatcher::new(p).plan(&req, b"medium/key");
2191 match plan {
2192 DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 2),
2193 other => panic!("expected two-target plan, got {other:?}"),
2194 }
2195 }
2196
2197 #[test]
2198 fn n_val_zero_does_not_cap() {
2199 let bts = vec![BucketType {
2200 name: "any".into(),
2201 read_consistency: ConsistencyLevel::DcQuorum,
2202 write_consistency: ConsistencyLevel::DcQuorum,
2203 n_val: 0,
2204 }];
2205 let p = pool_with_bucket_types(
2206 ConsistencyLevel::DcOne,
2207 ConsistencyLevel::DcOne,
2208 bts,
2209 None,
2210 three_local_peers(),
2211 );
2212 let req = Msg::new(1, MsgType::ReqRedisGet, true);
2213 let plan = ClusterDispatcher::new(p).plan(&req, b"any/key");
2214 match plan {
2215 DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 3),
2216 other => panic!("expected uncapped plan, got {other:?}"),
2217 }
2218 }
2219
2220 #[test]
2221 fn n_val_larger_than_replicas_is_a_no_op() {
2222 let bts = vec![BucketType {
2223 name: "big".into(),
2224 read_consistency: ConsistencyLevel::DcQuorum,
2225 write_consistency: ConsistencyLevel::DcQuorum,
2226 n_val: 7,
2227 }];
2228 let p = pool_with_bucket_types(
2229 ConsistencyLevel::DcOne,
2230 ConsistencyLevel::DcOne,
2231 bts,
2232 None,
2233 three_local_peers(),
2234 );
2235 let req = Msg::new(1, MsgType::ReqRedisGet, true);
2236 let plan = ClusterDispatcher::new(p).plan(&req, b"big/key");
2237 match plan {
2238 DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 3),
2239 other => panic!("expected uncapped plan, got {other:?}"),
2240 }
2241 }
2242
2243 #[test]
2247 fn no_targets_records_failure_metric() {
2248 let mut p0 = peer(0, "dc1", "rA", 10, true, true);
2249 p0.set_state(PeerState::Down, 0);
2250 let p = pool(
2251 ConsistencyLevel::DcQuorum,
2252 ConsistencyLevel::DcQuorum,
2253 vec![p0],
2254 );
2255 let metrics = Arc::new(crate::stats::FailureMetrics::new());
2256 let disp = ClusterDispatcher::new(p).with_failure_metrics(metrics.clone());
2257 let req = Msg::new(1, MsgType::ReqRedisGet, true);
2258 assert_eq!(disp.plan(&req, b"k"), DispatchPlan::NoTargets);
2259 let snap = metrics.snapshot();
2260 assert_eq!(snap.no_targets.len(), 1);
2261 let entry = &snap.no_targets[0];
2262 assert_eq!(entry.dc, "dc1");
2263 assert_eq!(entry.rack, "rA");
2264 assert_eq!(entry.consistency, ConsistencyLevel::DcQuorum);
2265 assert_eq!(entry.count, 1);
2266 }
2267
2268 #[tokio::test]
2274 async fn closed_backend_channel_records_closed_metric() {
2275 let p = pool(
2276 ConsistencyLevel::DcOne,
2277 ConsistencyLevel::DcOne,
2278 vec![peer(0, "dc1", "rA", 10, true, true)],
2279 );
2280 let (tx, rx) = mpsc::channel::<crate::net::server::OutboundRequest>(4);
2281 drop(rx);
2282 let metrics = Arc::new(crate::stats::FailureMetrics::new());
2283 let disp = ClusterDispatcher::new(p)
2284 .with_backend(tx)
2285 .with_failure_metrics(metrics.clone());
2286 let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
2287 let pool_buf = crate::io::mbuf::MbufPool::default();
2291 let mut buf = pool_buf.get();
2292 buf.copy_from_slice(b"PING\r\n");
2293 req.mbufs_mut().push_back(buf);
2294 let (resp_tx, _resp_rx) = mpsc::channel(1);
2295 let outcome = disp.dispatch(req, resp_tx);
2296 assert!(matches!(outcome, DispatchOutcome::Error(_)));
2297 let snap = metrics.snapshot();
2298 assert_eq!(snap.backend_send_closed, 1);
2299 assert_eq!(snap.backend_send_full, 0);
2300 }
2301
2302 #[test]
2307 fn two_peer_pool_with_one_down_records_per_key_no_targets() {
2308 let cfg = crate::cluster::PoolConfig {
2309 dc: "dc1".into(),
2310 rack: "rA".into(),
2311 read_consistency: ConsistencyLevel::DcQuorum,
2312 write_consistency: ConsistencyLevel::DcQuorum,
2313 ..crate::cluster::PoolConfig::default()
2314 };
2315 let p0 = peer(0, "dc1", "rA", 2_147_483_648, true, true);
2323 let mut p1 = peer(1, "dc1", "rA", 0, false, true);
2324 p1.set_state(PeerState::Down, 0);
2325 let pool_arc = ServerPool::new(cfg, vec![p0, p1]);
2326 pool_arc.preselect_remote_racks();
2327 let metrics = Arc::new(crate::stats::FailureMetrics::new());
2328 let disp = ClusterDispatcher::new(Arc::new(pool_arc)).with_failure_metrics(metrics.clone());
2329 let mut planned_no_targets = 0u64;
2330 let mut planned_routable = 0u64;
2331 for i in 0..100u32 {
2332 let key = format!("k{i:03}");
2333 let req = Msg::new(u64::from(i), MsgType::ReqRedisGet, true);
2334 match disp.plan(&req, key.as_bytes()) {
2335 DispatchPlan::NoTargets => planned_no_targets += 1,
2336 DispatchPlan::Replicas { .. } | DispatchPlan::LocalDatastore => {
2337 planned_routable += 1;
2338 }
2339 DispatchPlan::Drop => panic!("unexpected Drop in plan"),
2340 }
2341 }
2342 assert!(planned_no_targets > 0, "expected some NoTargets dispatches");
2343 assert!(planned_routable > 0, "expected some routable dispatches");
2344 let snap = metrics.snapshot();
2345 let counter_total: u64 = snap.no_targets.iter().map(|e| e.count).sum();
2346 assert_eq!(
2347 counter_total, planned_no_targets,
2348 "dispatch_no_targets_total must match observed NoTargets count",
2349 );
2350 assert_eq!(snap.backend_send_full, 0);
2353 assert_eq!(snap.backend_send_closed, 0);
2354 assert!(snap.peer_send_full.is_empty());
2355 assert!(snap.peer_send_closed.is_empty());
2356 }
2357}