1use std::sync::atomic::{AtomicU64, Ordering};
46use std::sync::Arc;
47
48use tokio::sync::mpsc;
49
50use crate::cluster::pool::ServerPool;
51use crate::cluster::snitch::{rack_distance, RackDistance};
52use crate::cluster::vnode;
53use crate::conf::HashType as ConfHashType;
54use crate::hashkit::{self, HashType};
55use crate::io::mbuf::MbufPool;
56use crate::msg::{ConsistencyLevel, Msg, MsgRouting, MsgType};
57use crate::net::dispatcher::{DispatchOutcome, Dispatcher, OutboundEnvelope, ServerSink};
58use crate::net::server::OutboundRequest;
59
60#[must_use]
74pub fn distribution_shadow_disagreement_total() -> u64 {
75 SHADOW_DISAGREEMENTS.load(Ordering::Relaxed)
76}
77
78pub fn reset_distribution_shadow_disagreement_total() {
89 SHADOW_DISAGREEMENTS.store(0, Ordering::Relaxed);
90}
91
92static SHADOW_DISAGREEMENTS: AtomicU64 = AtomicU64::new(0);
93
94fn bump_shadow_disagreement() {
95 SHADOW_DISAGREEMENTS.fetch_add(1, Ordering::Relaxed);
96}
97
98fn enter_plan_span(
104 req_id: u64,
105 plan: &DispatchPlan,
106) -> (tracing::Span, tracing::span::EnteredSpan) {
107 let req_span = tracing::Span::current();
108 let kind: &'static str = match plan {
109 DispatchPlan::Drop => "drop",
110 DispatchPlan::NoTargets => "no_targets",
111 DispatchPlan::LocalDatastore => "local_datastore",
112 DispatchPlan::Replicas { .. } => "replicas",
113 };
114 let targets = match plan {
115 DispatchPlan::Replicas { targets, .. } => targets.len(),
116 _ => 0,
117 };
118 let span = tracing::info_span!("dispatch.plan", req_id, plan = kind, targets,).entered();
119 (req_span, span)
120}
121
122fn map_hash(h: ConfHashType) -> HashType {
123 match h {
124 ConfHashType::OneAtATime => HashType::OneAtATime,
125 ConfHashType::Md5 => HashType::Md5,
126 ConfHashType::Crc16 => HashType::Crc16,
127 ConfHashType::Crc32 => HashType::Crc32,
128 ConfHashType::Crc32a => HashType::Crc32a,
129 ConfHashType::Fnv1_64 => HashType::Fnv1_64,
130 ConfHashType::Fnv1a64 => HashType::Fnv1a_64,
131 ConfHashType::Fnv1_32 => HashType::Fnv1_32,
132 ConfHashType::Fnv1a32 => HashType::Fnv1a_32,
133 ConfHashType::Hsieh => HashType::Hsieh,
134 ConfHashType::Murmur => HashType::Murmur,
135 ConfHashType::Jenkins => HashType::Jenkins,
136 ConfHashType::Murmur3 => HashType::Murmur3,
137 ConfHashType::Murmur3X64_64 => HashType::Murmur3X64_64,
138 }
139}
140
141#[derive(Clone, Debug, Eq, PartialEq)]
143pub struct ReplicaTarget {
144 pub peer_idx: u32,
146 pub dc: String,
148 pub rack: String,
150 pub is_local: bool,
152}
153
154#[derive(Clone, Debug, Eq, PartialEq)]
162pub enum DispatchPlan {
163 LocalDatastore,
165 Replicas {
171 targets: Vec<ReplicaTarget>,
173 consistency: ConsistencyLevel,
175 },
176 NoTargets,
179 Drop,
181}
182
183#[derive(Debug, Clone)]
185pub struct ClusterDispatcher {
186 pool: Arc<ServerPool>,
187 backend: Option<mpsc::Sender<OutboundRequest>>,
194 peer_backends: std::collections::HashMap<u32, mpsc::Sender<OutboundRequest>>,
202 mbuf_pool: MbufPool,
207 hint_store: Option<Arc<crate::cluster::hints::HintStore>>,
214 failure_metrics: Option<Arc<crate::stats::FailureMetrics>>,
220 command_extension: Option<Arc<dyn crate::embed::CommandExtension>>,
231}
232
233impl ClusterDispatcher {
234 #[must_use]
254 pub fn new(pool: Arc<ServerPool>) -> Self {
255 Self {
256 pool,
257 backend: None,
258 peer_backends: std::collections::HashMap::new(),
259 mbuf_pool: MbufPool::default(),
260 hint_store: None,
261 failure_metrics: None,
262 command_extension: None,
263 }
264 }
265
266 #[must_use]
288 pub fn with_mbuf_pool(mut self, pool: MbufPool) -> Self {
289 self.mbuf_pool = pool;
290 self
291 }
292
293 #[must_use]
297 pub fn mbuf_pool(&self) -> &MbufPool {
298 &self.mbuf_pool
299 }
300
301 #[must_use]
310 pub fn with_backend(mut self, backend: mpsc::Sender<OutboundRequest>) -> Self {
311 self.backend = Some(backend);
312 self
313 }
314
315 #[must_use]
327 pub fn with_peer_backend(
328 mut self,
329 peer_idx: u32,
330 sender: mpsc::Sender<OutboundRequest>,
331 ) -> Self {
332 self.peer_backends.insert(peer_idx, sender);
333 self
334 }
335
336 #[must_use]
338 pub fn has_backend(&self) -> bool {
339 self.backend.is_some()
340 }
341
342 #[must_use]
344 pub fn peer_backend_count(&self) -> usize {
345 self.peer_backends.len()
346 }
347
348 #[must_use]
350 pub fn pool(&self) -> &Arc<ServerPool> {
351 &self.pool
352 }
353
354 #[must_use]
387 pub fn with_hint_store(mut self, store: Arc<crate::cluster::hints::HintStore>) -> Self {
388 self.hint_store = Some(store);
389 self
390 }
391
392 #[must_use]
394 pub fn hint_store(&self) -> Option<&Arc<crate::cluster::hints::HintStore>> {
395 self.hint_store.as_ref()
396 }
397
398 #[must_use]
428 pub fn with_failure_metrics(mut self, metrics: Arc<crate::stats::FailureMetrics>) -> Self {
429 self.failure_metrics = Some(metrics);
430 self
431 }
432
433 #[must_use]
435 pub fn failure_metrics(&self) -> Option<&Arc<crate::stats::FailureMetrics>> {
436 self.failure_metrics.as_ref()
437 }
438
439 #[must_use]
482 pub fn with_command_extension(mut self, ext: Arc<dyn crate::embed::CommandExtension>) -> Self {
483 self.command_extension = Some(ext);
484 self
485 }
486
487 #[must_use]
489 pub fn command_extension(&self) -> Option<&Arc<dyn crate::embed::CommandExtension>> {
490 self.command_extension.as_ref()
491 }
492
493 #[must_use]
496 pub fn hinted_handoff_active(&self) -> bool {
497 self.hint_store.is_some() && self.pool.config().enable_hinted_handoff
498 }
499
500 #[must_use]
515 pub fn plan(&self, req: &Msg, key: &[u8]) -> DispatchPlan {
516 let cfg = self.pool.config();
517 let peers = self.pool.peers().read();
518 if peers.is_empty() {
519 self.record_no_targets_metric(cfg, ConsistencyLevel::default());
520 return DispatchPlan::NoTargets;
521 }
522 if matches!(req.routing(), MsgRouting::LocalNodeOnly) {
523 return DispatchPlan::LocalDatastore;
524 }
525 if key.is_empty() {
526 return DispatchPlan::LocalDatastore;
527 }
528 let token = hashkit::hash(map_hash(cfg.hash), key);
529 let key_hash64 = hashkit::hash64(map_hash(cfg.hash), key);
530 let bucket = crate::proto::redis::bucket_name(key);
531 let bucket_type = cfg.resolve_bucket_type(bucket);
532 let is_read = matches!(req.ty(), MsgType::Unknown) || req.flags().is_read;
533 let consistency = match (bucket_type, is_read) {
534 (Some(bt), true) => bt.read_consistency,
535 (Some(bt), false) => bt.write_consistency,
536 (None, true) => cfg.read_consistency,
537 (None, false) => cfg.write_consistency,
538 };
539 let n_val_cap = bucket_type.map_or(0, |bt| bt.n_val);
540 let dcs = self.pool.datacenters().read();
541 let include_down = self.hinted_handoff_active() && !is_read;
550 let routable = collect_routable(
551 &dcs,
552 &peers,
553 &token,
554 key_hash64,
555 cfg.distribution,
556 include_down,
557 );
558 if let Some(shadow) = cfg.distribution_shadow {
559 if shadow != cfg.distribution {
560 let shadow_routable =
561 collect_routable(&dcs, &peers, &token, key_hash64, shadow, include_down);
562 if !plans_agree(&routable, &shadow_routable) {
563 bump_shadow_disagreement();
564 tracing::debug!(
565 target: "dynomite::dispatch::shadow",
566 live = cfg.distribution.as_str(),
567 shadow = shadow.as_str(),
568 "shadow distribution disagreed on key route"
569 );
570 }
571 }
572 }
573 if routable.is_empty() {
574 self.record_no_targets_metric(cfg, consistency);
575 return DispatchPlan::NoTargets;
576 }
577 let (local, remote): (Vec<_>, Vec<_>) = routable
578 .into_iter()
579 .partition(|(dc_idx, _, _)| dcs[*dc_idx].name() == cfg.dc);
580 let plan =
581 plan_with_consistency(cfg, &dcs, &peers, consistency, req.routing(), local, remote);
582 let plan = cap_replicas(plan, n_val_cap);
583 if matches!(plan, DispatchPlan::NoTargets) {
584 self.record_no_targets_metric(cfg, consistency);
585 }
586 plan
587 }
588
589 fn record_no_targets_metric(
592 &self,
593 cfg: &crate::cluster::pool::PoolConfig,
594 consistency: ConsistencyLevel,
595 ) {
596 if let Some(m) = self.failure_metrics.as_ref() {
597 m.record_no_targets(&cfg.dc, &cfg.rack, consistency);
598 }
599 }
600
601 fn peer_dc_label(&self, peer_idx: u32) -> String {
605 let peers = self.pool.peers().read();
606 peers
607 .get(peer_idx as usize)
608 .map_or_else(|| self.pool.config().dc.clone(), |p| p.dc().to_string())
609 }
610}
611
612fn cap_replicas(plan: DispatchPlan, cap: u8) -> DispatchPlan {
618 if cap == 0 {
619 return plan;
620 }
621 let cap = cap as usize;
622 match plan {
623 DispatchPlan::Replicas {
624 mut targets,
625 consistency,
626 } if targets.len() > cap => {
627 targets.truncate(cap);
628 DispatchPlan::Replicas {
629 targets,
630 consistency,
631 }
632 }
633 other => other,
634 }
635}
636
637fn plans_agree(a: &[(usize, usize, u32)], b: &[(usize, usize, u32)]) -> bool {
638 if a.len() != b.len() {
639 return false;
640 }
641 let mut a_idx: Vec<u32> = a.iter().map(|t| t.2).collect();
642 let mut b_idx: Vec<u32> = b.iter().map(|t| t.2).collect();
643 a_idx.sort_unstable();
644 b_idx.sort_unstable();
645 a_idx == b_idx
646}
647
648fn collect_routable(
649 dcs: &[crate::cluster::Datacenter],
650 peers: &[crate::cluster::peer::Peer],
651 token: &crate::hashkit::DynToken,
652 hash64: u64,
653 distribution: crate::conf::Distribution,
654 include_down: bool,
655) -> Vec<(usize, usize, u32)> {
656 let mut routable: Vec<(usize, usize, u32)> = Vec::new();
657 for (dc_idx, dc) in dcs.iter().enumerate() {
658 for (rack_idx, rack) in dc.racks().iter().enumerate() {
659 let candidate = match (distribution, rack.random_slices()) {
660 (crate::conf::Distribution::RandomSlicing, Some(slices)) => {
661 slices.claimant_for(hash64).and_then(|name| {
668 peers.iter().find_map(|p| {
669 if p.dc() == dc.name()
670 && p.rack() == rack.name()
671 && p.endpoint().pname() == name
672 {
673 Some(p.idx())
674 } else {
675 None
676 }
677 })
678 })
679 }
680 _ => vnode::dispatch(rack.continuums(), token),
681 };
682 if let Some(peer_idx) = candidate {
683 if let Some(peer) = peers.get(peer_idx as usize) {
684 let state = peer.state();
685 let accept = state.is_routable()
686 || (include_down && matches!(state, crate::cluster::peer::PeerState::Down));
687 if accept {
688 routable.push((dc_idx, rack_idx, peer_idx));
689 }
690 }
691 }
692 }
693 }
694 routable
695}
696
697fn build_target(
698 dcs: &[crate::cluster::Datacenter],
699 peers: &[crate::cluster::peer::Peer],
700 dc_idx: usize,
701 rack_idx: usize,
702 peer_idx: u32,
703) -> ReplicaTarget {
704 let dc_name = dcs[dc_idx].name().to_string();
705 let rack_name = dcs[dc_idx].racks()[rack_idx].name().to_string();
706 let is_local = peers
707 .get(peer_idx as usize)
708 .is_some_and(crate::cluster::peer::Peer::is_local);
709 ReplicaTarget {
710 peer_idx,
711 dc: dc_name,
712 rack: rack_name,
713 is_local,
714 }
715}
716
717fn plan_with_consistency(
718 cfg: &crate::cluster::pool::PoolConfig,
719 dcs: &[crate::cluster::Datacenter],
720 peers: &[crate::cluster::peer::Peer],
721 consistency: ConsistencyLevel,
722 routing: MsgRouting,
723 local: Vec<(usize, usize, u32)>,
724 remote: Vec<(usize, usize, u32)>,
725) -> DispatchPlan {
726 let want_per_dc_fanout = matches!(consistency, ConsistencyLevel::DcEachSafeQuorum)
727 || matches!(routing, MsgRouting::AllNodesAllRacksAllDcs);
728 let mut targets: Vec<ReplicaTarget> = Vec::new();
729 match consistency {
730 ConsistencyLevel::DcOne => {
731 if local.is_empty() {
732 return DispatchPlan::NoTargets;
733 }
734 let mut best: Option<(RackDistance, (usize, usize, u32))> = None;
735 for (dc_idx, rack_idx, peer_idx) in local {
736 let rack_name = dcs[dc_idx].racks()[rack_idx].name();
737 let d = rack_distance(&cfg.dc, &cfg.rack, &cfg.dc, rack_name);
738 let take = match best {
739 None => true,
740 Some((bd, _)) => d.cost() < bd.cost(),
741 };
742 if take {
743 best = Some((d, (dc_idx, rack_idx, peer_idx)));
744 }
745 }
746 if let Some((_, (dc_idx, rack_idx, peer_idx))) = best {
747 let is_local_node = peers
748 .get(peer_idx as usize)
749 .is_some_and(crate::cluster::peer::Peer::is_local);
750 if is_local_node {
751 return DispatchPlan::LocalDatastore;
752 }
753 targets.push(build_target(dcs, peers, dc_idx, rack_idx, peer_idx));
754 }
755 }
756 ConsistencyLevel::DcQuorum | ConsistencyLevel::DcSafeQuorum => {
757 if local.is_empty() {
758 return DispatchPlan::NoTargets;
759 }
760 for (dc_idx, rack_idx, peer_idx) in local {
761 targets.push(build_target(dcs, peers, dc_idx, rack_idx, peer_idx));
762 }
763 }
764 ConsistencyLevel::DcEachSafeQuorum => {
765 if local.is_empty() && remote.is_empty() {
766 return DispatchPlan::NoTargets;
767 }
768 for (dc_idx, rack_idx, peer_idx) in local.iter().chain(remote.iter()) {
769 targets.push(build_target(dcs, peers, *dc_idx, *rack_idx, *peer_idx));
770 }
771 }
772 }
773 if want_per_dc_fanout && !remote.is_empty() {
774 for (dc_idx, rack_idx, peer_idx) in remote {
775 if !targets.iter().any(|t| t.peer_idx == peer_idx) {
776 targets.push(build_target(dcs, peers, dc_idx, rack_idx, peer_idx));
777 }
778 }
779 }
780 if targets.is_empty() {
781 return DispatchPlan::LocalDatastore;
782 }
783 DispatchPlan::Replicas {
784 targets,
785 consistency,
786 }
787}
788
789impl Dispatcher for ClusterDispatcher {
790 #[allow(
791 clippy::too_many_lines,
792 reason = "single dispatch fn must enumerate every plan; splitting hides the planner-to-effect mapping"
793 )]
794 fn dispatch(&self, req: Msg, responder: ServerSink) -> DispatchOutcome {
795 if req.flags().quit {
796 return DispatchOutcome::Drop;
797 }
798 if let Some(ext) = self.command_extension.as_ref() {
804 if let Some(outcome) = self.intercept_command(ext.as_ref(), &req) {
805 return outcome;
806 }
807 }
808 let key: Vec<u8> = req
816 .keys()
817 .first()
818 .map(|kp| kp.tag_bytes().to_vec())
819 .unwrap_or_default();
820 let plan = self.plan(&req, &key);
821 let (req_span, _plan_span) = enter_plan_span(req.id(), &plan);
822 match plan {
823 DispatchPlan::Drop => DispatchOutcome::Drop,
824 DispatchPlan::NoTargets => {
825 let err_type = if matches!(req.ty(), MsgType::ReqRedisGet | MsgType::ReqRedisSet) {
826 MsgType::RspRedisError
827 } else {
828 MsgType::RspMcServerError
829 };
830 let rsp = crate::msg::response::make_error(
831 &req,
832 err_type,
833 0,
834 crate::msg::DynErrorCode::DynomiteNoQuorumAchieved,
835 &self.mbuf_pool,
836 );
837 DispatchOutcome::Error(rsp)
838 }
839 DispatchPlan::LocalDatastore => {
840 if let Some(tx) = self.backend.as_ref() {
841 let bytes: Vec<u8> = req
846 .mbufs()
847 .iter()
848 .flat_map(|b| b.readable().to_vec())
849 .collect();
850 if bytes.is_empty() {
851 return DispatchOutcome::Drop;
855 }
856 let env = OutboundRequest {
857 bytes,
858 req_id: req.id(),
859 responder,
860 span: req_span.clone(),
861 ty: crate::proto::dnode::DmsgType::Req,
862 target_peer_idx: None,
863 };
864 if let Err(err) = tx.try_send(env) {
865 if let Some(m) = self.failure_metrics.as_ref() {
868 match err {
869 tokio::sync::mpsc::error::TrySendError::Full(_) => {
870 m.record_backend_send_full();
871 }
872 tokio::sync::mpsc::error::TrySendError::Closed(_) => {
873 m.record_backend_send_closed();
874 }
875 }
876 }
877 let err_type =
878 if matches!(req.ty(), MsgType::ReqRedisGet | MsgType::ReqRedisSet) {
879 MsgType::RspRedisError
880 } else {
881 MsgType::RspMcServerError
882 };
883 let rsp = crate::msg::response::make_error(
884 &req,
885 err_type,
886 0,
887 crate::msg::DynErrorCode::DynomiteNoQuorumAchieved,
888 &self.mbuf_pool,
889 );
890 return DispatchOutcome::Error(rsp);
891 }
892 }
893 DispatchOutcome::Pending
894 }
895 DispatchPlan::Replicas {
896 targets,
897 consistency,
898 } => self.dispatch_replicas(&req, &req_span, &targets, consistency, responder),
899 }
900 }
901}
902
903impl ClusterDispatcher {
904 fn dispatch_replicas(
927 &self,
928 req: &Msg,
929 req_span: &tracing::Span,
930 targets: &[ReplicaTarget],
931 consistency: ConsistencyLevel,
932 responder: ServerSink,
933 ) -> DispatchOutcome {
934 if targets.is_empty() {
935 return DispatchOutcome::Drop;
936 }
937 let bytes: Vec<u8> = req
941 .mbufs()
942 .iter()
943 .flat_map(|b| b.readable().to_vec())
944 .collect();
945 if bytes.is_empty() {
946 return DispatchOutcome::Drop;
947 }
948 let peer_states = self.snapshot_peer_states(targets);
952 let is_read = matches!(req.ty(), MsgType::Unknown) || req.flags().is_read;
953 let is_write = !is_read;
954 let handoff_active = self.hinted_handoff_active() && is_write;
955 if targets.len() == 1 {
958 return self.dispatch_replicas_direct(
959 req,
960 req_span,
961 targets,
962 &bytes,
963 &responder,
964 &HandoffCtx {
965 handoff_active,
966 peer_states: &peer_states,
967 },
968 );
969 }
970 let cfg = self.pool.config();
972 let local_dc = cfg.dc.clone();
973 let (intermediate_tx, intermediate_rx) =
978 mpsc::channel::<OutboundEnvelope>(targets.len() + 1);
979 let target_pairs: Vec<(u32, String)> =
982 targets.iter().map(|t| (t.peer_idx, t.dc.clone())).collect();
983 let repair_key: Option<Vec<u8>> = req
986 .keys()
987 .first()
988 .map(|kp| kp.tag_bytes().to_vec())
989 .filter(|k| !k.is_empty());
990 let repair_ctx = repair_key.map(|key| ReadRepairContext {
991 req_id: req.id(),
992 req_ty: req.ty(),
993 key,
994 mbuf_pool: self.mbuf_pool.clone(),
995 peer_backends: self.peer_backends.clone(),
996 local_backend: self.backend.clone(),
997 target_is_local: targets.iter().map(|t| (t.peer_idx, t.is_local)).collect(),
998 });
999 let mut sent = 0usize;
1002 let mut hinted = 0usize;
1003 for target in targets {
1004 let action = Self::choose_target_action(target, handoff_active, &peer_states);
1005 match action {
1006 TargetAction::Send => {
1007 if self.fanout_send(target, req, req_span, &bytes, &intermediate_tx) {
1008 sent += 1;
1009 } else if handoff_active
1010 && self.hint_target(target, &bytes, req, req_span, &intermediate_tx)
1011 {
1012 hinted += 1;
1013 }
1014 }
1015 TargetAction::Hint => {
1016 if self.hint_target(target, &bytes, req, req_span, &intermediate_tx) {
1017 hinted += 1;
1018 }
1019 }
1020 }
1021 }
1022 drop(intermediate_tx);
1027 if sent + hinted == 0 {
1028 return DispatchOutcome::Error(self.no_quorum_error(req));
1029 }
1030 let req_id = req.id();
1031 let req_ty = req.ty();
1032 let mbuf_pool = self.mbuf_pool.clone();
1033 let failure_metrics = self.failure_metrics.clone();
1034 tokio::spawn(coalesce_actor(
1035 req_id,
1036 req_ty,
1037 consistency,
1038 target_pairs,
1039 local_dc,
1040 intermediate_rx,
1041 responder,
1042 mbuf_pool,
1043 repair_ctx,
1044 failure_metrics,
1045 ));
1046 DispatchOutcome::Pending
1047 }
1048
1049 fn snapshot_peer_states(
1054 &self,
1055 targets: &[ReplicaTarget],
1056 ) -> std::collections::HashMap<u32, crate::cluster::peer::PeerState> {
1057 use crate::cluster::peer::PeerState;
1058 let peers = self.pool.peers().read();
1059 let mut out = std::collections::HashMap::with_capacity(targets.len());
1060 for t in targets {
1061 let state = if t.is_local {
1062 PeerState::Normal
1063 } else {
1064 peers
1065 .get(t.peer_idx as usize)
1066 .map_or(PeerState::Unknown, crate::cluster::peer::Peer::state)
1067 };
1068 out.insert(t.peer_idx, state);
1069 }
1070 out
1071 }
1072
1073 fn choose_target_action(
1074 target: &ReplicaTarget,
1075 handoff_active: bool,
1076 peer_states: &std::collections::HashMap<u32, crate::cluster::peer::PeerState>,
1077 ) -> TargetAction {
1078 use crate::cluster::peer::PeerState;
1079 if !handoff_active {
1080 return TargetAction::Send;
1081 }
1082 let state = peer_states
1083 .get(&target.peer_idx)
1084 .copied()
1085 .unwrap_or(PeerState::Unknown);
1086 match state {
1087 PeerState::Down => TargetAction::Hint,
1088 _ => TargetAction::Send,
1089 }
1090 }
1091
1092 fn fanout_send(
1095 &self,
1096 target: &ReplicaTarget,
1097 req: &Msg,
1098 req_span: &tracing::Span,
1099 bytes: &[u8],
1100 intermediate_tx: &mpsc::Sender<OutboundEnvelope>,
1101 ) -> bool {
1102 let env = OutboundRequest {
1103 bytes: bytes.to_vec(),
1104 req_id: req.id(),
1105 responder: intermediate_tx.clone(),
1106 span: req_span.clone(),
1107 ty: crate::proto::dnode::DmsgType::Req,
1108 target_peer_idx: Some(target.peer_idx),
1109 };
1110 let send_result = if target.is_local {
1111 self.backend.as_ref().map(|tx| tx.try_send(env))
1112 } else {
1113 self.peer_backends
1114 .get(&target.peer_idx)
1115 .map(|tx| tx.try_send(env))
1116 };
1117 match send_result {
1118 Some(Ok(())) => true,
1119 Some(Err(err)) => {
1120 self.observe_send_error(target, &err);
1121 false
1122 }
1123 None => false,
1124 }
1125 }
1126
1127 fn observe_send_error(
1132 &self,
1133 target: &ReplicaTarget,
1134 err: &tokio::sync::mpsc::error::TrySendError<OutboundRequest>,
1135 ) {
1136 let Some(m) = self.failure_metrics.as_ref() else {
1137 return;
1138 };
1139 if target.is_local {
1140 match err {
1141 tokio::sync::mpsc::error::TrySendError::Full(_) => m.record_backend_send_full(),
1142 tokio::sync::mpsc::error::TrySendError::Closed(_) => {
1143 m.record_backend_send_closed();
1144 }
1145 }
1146 } else {
1147 let peer_dc = self.peer_dc_label(target.peer_idx);
1148 match err {
1149 tokio::sync::mpsc::error::TrySendError::Full(_) => {
1150 m.record_peer_send_full(target.peer_idx, &peer_dc);
1151 }
1152 tokio::sync::mpsc::error::TrySendError::Closed(_) => {
1153 m.record_peer_send_closed(target.peer_idx, &peer_dc);
1154 }
1155 }
1156 }
1157 }
1158
1159 fn hint_target(
1163 &self,
1164 target: &ReplicaTarget,
1165 bytes: &[u8],
1166 req: &Msg,
1167 req_span: &tracing::Span,
1168 intermediate_tx: &mpsc::Sender<OutboundEnvelope>,
1169 ) -> bool {
1170 let Some(store) = self.hint_store.as_ref() else {
1171 return false;
1172 };
1173 let cfg = self.pool.config();
1174 let ttl = std::time::Duration::from_secs(cfg.hint_ttl_seconds.max(1));
1175 match store.enqueue(target.peer_idx, bytes.to_vec(), ttl) {
1176 Ok(()) => {}
1177 Err(e) => {
1178 tracing::debug!(
1179 target: "dynomite::hints",
1180 peer_idx = target.peer_idx,
1181 error = %e,
1182 "hint enqueue failed"
1183 );
1184 return false;
1185 }
1186 }
1187 let synth = synth_hint_reply(req, &self.mbuf_pool);
1188 let env = OutboundEnvelope {
1189 req_id: req.id(),
1190 rsp: synth,
1191 span: req_span.clone(),
1192 source_peer_idx: Some(target.peer_idx),
1193 };
1194 if intermediate_tx.try_send(env).is_err() {
1195 tracing::debug!(
1200 target: "dynomite::hints",
1201 peer_idx = target.peer_idx,
1202 "hint synth-reply could not be queued; coalescer absent"
1203 );
1204 }
1205 tracing::debug!(
1206 target: "dynomite::hints",
1207 peer_idx = target.peer_idx,
1208 bytes = bytes.len(),
1209 "stored hint for down peer"
1210 );
1211 true
1212 }
1213
1214 fn dispatch_replicas_direct(
1215 &self,
1216 req: &Msg,
1217 req_span: &tracing::Span,
1218 targets: &[ReplicaTarget],
1219 bytes: &[u8],
1220 responder: &ServerSink,
1221 ctx: &HandoffCtx<'_>,
1222 ) -> DispatchOutcome {
1223 debug_assert_eq!(targets.len(), 1);
1224 let target = &targets[0];
1225 if let TargetAction::Hint =
1229 Self::choose_target_action(target, ctx.handoff_active, ctx.peer_states)
1230 {
1231 if self.hint_single_target_direct(target, bytes, req, req_span, responder) {
1232 return DispatchOutcome::Pending;
1233 }
1234 return DispatchOutcome::Error(self.no_quorum_error(req));
1235 }
1236 let env = OutboundRequest {
1237 bytes: bytes.to_vec(),
1238 req_id: req.id(),
1239 responder: responder.clone(),
1240 span: req_span.clone(),
1241 ty: crate::proto::dnode::DmsgType::Req,
1242 target_peer_idx: Some(target.peer_idx),
1243 };
1244 let send_result = if target.is_local {
1245 self.backend.as_ref().map(|tx| tx.try_send(env))
1246 } else {
1247 self.peer_backends
1248 .get(&target.peer_idx)
1249 .map(|tx| tx.try_send(env))
1250 };
1251 let sent = match send_result {
1252 Some(Ok(())) => true,
1253 Some(Err(ref err)) => {
1254 self.observe_send_error(target, err);
1255 false
1256 }
1257 None => false,
1258 };
1259 if sent {
1260 return DispatchOutcome::Pending;
1261 }
1262 if ctx.handoff_active
1263 && self.hint_single_target_direct(target, bytes, req, req_span, responder)
1264 {
1265 return DispatchOutcome::Pending;
1266 }
1267 DispatchOutcome::Error(self.no_quorum_error(req))
1268 }
1269
1270 fn hint_single_target_direct(
1275 &self,
1276 target: &ReplicaTarget,
1277 bytes: &[u8],
1278 req: &Msg,
1279 req_span: &tracing::Span,
1280 responder: &ServerSink,
1281 ) -> bool {
1282 let Some(store) = self.hint_store.as_ref() else {
1283 return false;
1284 };
1285 let cfg = self.pool.config();
1286 let ttl = std::time::Duration::from_secs(cfg.hint_ttl_seconds.max(1));
1287 if let Err(e) = store.enqueue(target.peer_idx, bytes.to_vec(), ttl) {
1288 tracing::debug!(
1289 target: "dynomite::hints",
1290 peer_idx = target.peer_idx,
1291 error = %e,
1292 "hint enqueue failed (single-target)"
1293 );
1294 return false;
1295 }
1296 let synth = synth_hint_reply(req, &self.mbuf_pool);
1297 let env = OutboundEnvelope {
1298 req_id: req.id(),
1299 rsp: synth,
1300 span: req_span.clone(),
1301 source_peer_idx: Some(target.peer_idx),
1302 };
1303 let _ = responder.try_send(env);
1304 true
1305 }
1306
1307 fn no_quorum_error(&self, req: &Msg) -> Msg {
1308 let err_type = if matches!(req.ty(), MsgType::ReqRedisGet | MsgType::ReqRedisSet) {
1309 MsgType::RspRedisError
1310 } else {
1311 MsgType::RspMcServerError
1312 };
1313 crate::msg::response::make_error(
1314 req,
1315 err_type,
1316 0,
1317 crate::msg::DynErrorCode::DynomiteNoQuorumAchieved,
1318 &self.mbuf_pool,
1319 )
1320 }
1321
1322 fn intercept_command(
1330 &self,
1331 ext: &dyn crate::embed::CommandExtension,
1332 req: &Msg,
1333 ) -> Option<DispatchOutcome> {
1334 if ext.handles_msg_type(req.ty()) {
1335 return Some(self.run_extension_command(ext, req));
1336 }
1337 if matches!(req.ty(), MsgType::ReqRedisHset) {
1338 return self.intercept_extension_hset(ext, req);
1339 }
1340 None
1341 }
1342
1343 fn run_extension_command(
1350 &self,
1351 ext: &dyn crate::embed::CommandExtension,
1352 req: &Msg,
1353 ) -> DispatchOutcome {
1354 let recovered_kw: Vec<u8>;
1362 let keyword: &[u8] = match req.ty() {
1363 MsgType::ReqRedisFtCreate => b"FT.CREATE",
1364 MsgType::ReqRedisFtSearch => b"FT.SEARCH",
1365 MsgType::ReqRedisFtInfo => b"FT.INFO",
1366 MsgType::ReqRedisFtList => b"FT.LIST",
1367 MsgType::ReqRedisFtDropindex => b"FT.DROPINDEX",
1368 MsgType::ReqRedisFtRegex => b"FT.REGEX",
1369 MsgType::ReqRedisFtUnknown => {
1370 recovered_kw = first_bulk_token(req).unwrap_or_else(|| b"FT.UNKNOWN".to_vec());
1371 recovered_kw.as_slice()
1372 }
1373 _ => return DispatchOutcome::Drop,
1378 };
1379 let mut args: Vec<&[u8]> = Vec::with_capacity(1 + req.keys().len() + req.args().len());
1380 args.push(keyword);
1381 for k in req.keys() {
1382 args.push(k.key());
1383 }
1384 for a in req.args() {
1385 args.push(a.bytes());
1386 }
1387 let bytes = ext.try_dispatch(&args).unwrap_or_else(|| {
1388 let kw = String::from_utf8_lossy(keyword);
1389 format!("-ERR not supported in this build: {kw}\r\n").into_bytes()
1390 });
1391 DispatchOutcome::Inline(synthetic_redis_reply(req, &self.mbuf_pool, &bytes))
1392 }
1393
1394 fn intercept_extension_hset(
1400 &self,
1401 ext: &dyn crate::embed::CommandExtension,
1402 req: &Msg,
1403 ) -> Option<DispatchOutcome> {
1404 let mut args: Vec<&[u8]> = Vec::with_capacity(req.keys().len() + req.args().len());
1405 for k in req.keys() {
1406 args.push(k.key());
1407 }
1408 for a in req.args() {
1409 args.push(a.bytes());
1410 }
1411 match ext.try_intercept_hset(&args) {
1412 crate::embed::HsetOutcome::Absorbed | crate::embed::HsetOutcome::NotIndexed => None,
1413 crate::embed::HsetOutcome::Error(message) => {
1414 let payload = format!("-ERR {message}\r\n");
1415 Some(DispatchOutcome::Error(synthetic_redis_reply(
1416 req,
1417 &self.mbuf_pool,
1418 payload.as_bytes(),
1419 )))
1420 }
1421 }
1422 }
1423}
1424
1425fn synthetic_redis_reply(req: &Msg, pool: &MbufPool, payload: &[u8]) -> Msg {
1431 let mut rsp = Msg::new(req.id(), MsgType::RspRedisStatus, false);
1432 rsp.set_parent_id(req.id());
1433 let mut written = 0usize;
1434 while written < payload.len() {
1435 let mut buf = pool.get();
1436 let n = buf.recv(&payload[written..]);
1437 debug_assert!(
1438 n > 0,
1439 "MbufPool returned a buffer with zero writable capacity"
1440 );
1441 rsp.mbufs_mut().push_back(buf);
1442 written += n;
1443 }
1444 rsp.recompute_mlen();
1445 rsp
1446}
1447
1448fn first_bulk_token(req: &Msg) -> Option<Vec<u8>> {
1458 let mut wire: Vec<u8> = Vec::new();
1459 for buf in req.mbufs() {
1460 wire.extend_from_slice(buf.readable());
1461 if wire.len() > 256 {
1462 break;
1463 }
1464 }
1465 let mut p = 0usize;
1466 if wire.first() == Some(&b'*') {
1467 let cr = wire.iter().position(|&b| b == b'\r')?;
1468 if wire.get(cr + 1) != Some(&b'\n') {
1469 return None;
1470 }
1471 p = cr + 2;
1472 }
1473 if wire.get(p) != Some(&b'$') {
1474 return None;
1475 }
1476 let header_start = p + 1;
1477 let header_cr = wire[header_start..]
1478 .iter()
1479 .position(|&b| b == b'\r')
1480 .map(|i| header_start + i)?;
1481 if wire.get(header_cr + 1) != Some(&b'\n') {
1482 return None;
1483 }
1484 let len_str = std::str::from_utf8(&wire[header_start..header_cr]).ok()?;
1485 let len: usize = len_str.parse().ok()?;
1486 let body_start = header_cr + 2;
1487 let body_end = body_start.checked_add(len)?;
1488 if wire.len() < body_end + 2 {
1489 return None;
1490 }
1491 Some(wire[body_start..body_end].to_vec())
1492}
1493
1494#[derive(Clone)]
1497struct ReadRepairContext {
1498 req_id: crate::core::types::MsgId,
1499 req_ty: MsgType,
1500 key: Vec<u8>,
1504 mbuf_pool: MbufPool,
1505 peer_backends: std::collections::HashMap<u32, mpsc::Sender<OutboundRequest>>,
1506 local_backend: Option<mpsc::Sender<OutboundRequest>>,
1507 target_is_local: std::collections::HashMap<u32, bool>,
1508}
1509
1510#[allow(
1512 clippy::too_many_arguments,
1513 reason = "actor task captures the entire dispatch context; bundling into a struct adds churn for no callsite gain"
1514)]
1515async fn coalesce_actor(
1516 req_id: crate::core::types::MsgId,
1517 req_ty: MsgType,
1518 consistency: ConsistencyLevel,
1519 targets: Vec<(u32, String)>,
1520 local_dc: String,
1521 mut intermediate_rx: mpsc::Receiver<OutboundEnvelope>,
1522 client_tx: ServerSink,
1523 mbuf_pool: MbufPool,
1524 repair_ctx: Option<ReadRepairContext>,
1525 failure_metrics: Option<Arc<crate::stats::FailureMetrics>>,
1526) {
1527 use crate::proto::redis::{CoalesceOutcome, CoalesceTracker};
1528 let mut tracker = CoalesceTracker::new(req_id, consistency, targets, &local_dc);
1529 let mut emitted = false;
1530 while let Some(env) = intermediate_rx.recv().await {
1531 let source = env.source_peer_idx.unwrap_or(u32::MAX);
1532 let span = env.span.clone();
1533 let outcome = tracker.record_reply(source, env.rsp);
1534 match outcome {
1535 CoalesceOutcome::Pending => {}
1536 CoalesceOutcome::Ready {
1537 winner,
1538 divergent_targets,
1539 } => {
1540 if !emitted {
1541 let winner_bytes: Vec<u8> = winner
1542 .mbufs()
1543 .iter()
1544 .flat_map(|b| b.readable().to_vec())
1545 .collect();
1546 let out_env = OutboundEnvelope {
1547 req_id,
1548 rsp: *winner,
1549 span: span.clone(),
1550 source_peer_idx: None,
1551 };
1552 let _ = client_tx.send(out_env).await;
1553 emitted = true;
1554 if !divergent_targets.is_empty() {
1555 if let Some(ctx) = repair_ctx.as_ref() {
1556 schedule_read_repair(ctx, &divergent_targets, &winner_bytes, &span);
1557 }
1558 }
1559 }
1560 }
1561 CoalesceOutcome::Error(reason) => {
1562 if !emitted {
1563 let err_type = if matches!(req_ty, MsgType::ReqRedisGet | MsgType::ReqRedisSet)
1564 {
1565 MsgType::RspRedisError
1566 } else {
1567 MsgType::RspMcServerError
1568 };
1569 let anchor = Msg::new(req_id, req_ty, true);
1570 let rsp = crate::msg::response::make_error(
1571 &anchor,
1572 err_type,
1573 0,
1574 crate::msg::DynErrorCode::DynomiteNoQuorumAchieved,
1575 &mbuf_pool,
1576 );
1577 let _ = client_tx
1578 .send(OutboundEnvelope {
1579 req_id,
1580 rsp,
1581 span: span.clone(),
1582 source_peer_idx: None,
1583 })
1584 .await;
1585 emitted = true;
1586 }
1587 tracing::debug!(target: "dynomite::coalesce", req_id, reason = %reason, "coalesce error");
1588 }
1589 }
1590 }
1591 if !emitted {
1592 if let Some(m) = failure_metrics.as_ref() {
1599 m.record_response_timeout(consistency);
1600 }
1601 let err_type = if matches!(req_ty, MsgType::ReqRedisGet | MsgType::ReqRedisSet) {
1602 MsgType::RspRedisError
1603 } else {
1604 MsgType::RspMcServerError
1605 };
1606 let anchor = Msg::new(req_id, req_ty, true);
1607 let rsp = crate::msg::response::make_error(
1608 &anchor,
1609 err_type,
1610 0,
1611 crate::msg::DynErrorCode::DynomiteNoQuorumAchieved,
1612 &mbuf_pool,
1613 );
1614 let _ = client_tx
1615 .send(OutboundEnvelope {
1616 req_id,
1617 rsp,
1618 span: tracing::Span::none(),
1619 source_peer_idx: None,
1620 })
1621 .await;
1622 }
1623}
1624
1625fn repair_sink() -> ServerSink {
1628 let (tx, mut rx) = mpsc::channel::<OutboundEnvelope>(8);
1629 tokio::spawn(async move {
1630 while rx.recv().await.is_some() {
1631 }
1634 });
1635 tx
1636}
1637
1638fn decode_winner_for_repair(payload: &[u8]) -> Option<RepairAction> {
1647 if payload == b"$-1\r\n" {
1648 return Some(RepairAction::Delete);
1649 }
1650 if !payload.starts_with(b"$") {
1651 return None;
1652 }
1653 let crlf = payload.iter().position(|&b| b == b'\r')?;
1655 if payload.get(crlf + 1).copied() != Some(b'\n') {
1656 return None;
1657 }
1658 let len_str = std::str::from_utf8(&payload[1..crlf]).ok()?;
1659 let len: usize = len_str.parse().ok()?;
1660 let body_start = crlf + 2;
1661 let body_end = body_start.checked_add(len)?;
1662 if payload.len() < body_end + 2 {
1663 return None;
1664 }
1665 if &payload[body_end..body_end + 2] != b"\r\n" {
1666 return None;
1667 }
1668 Some(RepairAction::Write(payload[body_start..body_end].to_vec()))
1669}
1670
1671struct HandoffCtx<'a> {
1677 handoff_active: bool,
1678 peer_states: &'a std::collections::HashMap<u32, crate::cluster::peer::PeerState>,
1679}
1680
1681#[derive(Clone, Copy, Debug, Eq, PartialEq)]
1684enum TargetAction {
1685 Send,
1687 Hint,
1690}
1691
1692fn synth_hint_reply(req: &Msg, pool: &MbufPool) -> Msg {
1703 crate::msg::response::make_simple_redis(req, pool, b"+OK\r\n")
1704}
1705
1706enum RepairAction {
1709 Write(Vec<u8>),
1711 Delete,
1714}
1715
1716fn build_repair_bytes(action: &RepairAction, key: &[u8]) -> Vec<u8> {
1718 match action {
1719 RepairAction::Write(value) => {
1720 let mut out = Vec::with_capacity(key.len() + value.len() + 32);
1721 out.extend_from_slice(b"*3\r\n$3\r\nSET\r\n$");
1722 out.extend_from_slice(key.len().to_string().as_bytes());
1723 out.extend_from_slice(b"\r\n");
1724 out.extend_from_slice(key);
1725 out.extend_from_slice(b"\r\n$");
1726 out.extend_from_slice(value.len().to_string().as_bytes());
1727 out.extend_from_slice(b"\r\n");
1728 out.extend_from_slice(value);
1729 out.extend_from_slice(b"\r\n");
1730 out
1731 }
1732 RepairAction::Delete => {
1733 let mut out = Vec::with_capacity(key.len() + 24);
1734 out.extend_from_slice(b"*2\r\n$3\r\nDEL\r\n$");
1735 out.extend_from_slice(key.len().to_string().as_bytes());
1736 out.extend_from_slice(b"\r\n");
1737 out.extend_from_slice(key);
1738 out.extend_from_slice(b"\r\n");
1739 out
1740 }
1741 }
1742}
1743
1744fn schedule_read_repair(
1762 ctx: &ReadRepairContext,
1763 divergent: &[u32],
1764 winner_bytes: &[u8],
1765 span: &tracing::Span,
1766) {
1767 if !matches!(ctx.req_ty, MsgType::ReqRedisGet) {
1768 return;
1769 }
1770 let Some(action) = decode_winner_for_repair(winner_bytes) else {
1771 return;
1772 };
1773 let bytes = build_repair_bytes(&action, &ctx.key);
1774 let sink = repair_sink();
1775 for &peer_idx in divergent {
1776 let is_local = ctx.target_is_local.get(&peer_idx).copied().unwrap_or(false);
1777 let env = OutboundRequest {
1778 bytes: bytes.clone(),
1779 req_id: ctx.req_id,
1780 responder: sink.clone(),
1781 span: span.clone(),
1782 ty: crate::proto::dnode::DmsgType::ReqForward,
1783 target_peer_idx: Some(peer_idx),
1784 };
1785 let sent = if is_local {
1786 ctx.local_backend
1787 .as_ref()
1788 .is_some_and(|tx| tx.try_send(env).is_ok())
1789 } else {
1790 ctx.peer_backends
1791 .get(&peer_idx)
1792 .is_some_and(|tx| tx.try_send(env).is_ok())
1793 };
1794 if sent {
1795 let _ = &ctx.mbuf_pool;
1796 tracing::debug!(
1797 target: "dynomite::read_repair",
1798 req_id = ctx.req_id,
1799 peer_idx,
1800 bytes = bytes.len(),
1801 "scheduled read-repair write",
1802 );
1803 } else {
1804 tracing::debug!(
1805 target: "dynomite::read_repair",
1806 req_id = ctx.req_id,
1807 peer_idx,
1808 "read-repair drop: backend channel unavailable or full",
1809 );
1810 }
1811 }
1812}
1813
1814#[cfg(test)]
1815mod tests {
1816 use super::*;
1817 use crate::cluster::peer::{Peer, PeerEndpoint, PeerState};
1818 use crate::conf::DataStore;
1819 use crate::hashkit::DynToken;
1820
1821 fn cfg(read: ConsistencyLevel, write: ConsistencyLevel) -> crate::cluster::PoolConfig {
1822 crate::cluster::PoolConfig {
1823 read_consistency: read,
1824 write_consistency: write,
1825 dc: "dc1".into(),
1826 rack: "rA".into(),
1827 ..crate::cluster::PoolConfig::default()
1828 }
1829 }
1830
1831 fn peer(idx: u32, dc: &str, rack: &str, tok: u32, is_local: bool, is_same: bool) -> Peer {
1832 let mut p = Peer::new(
1833 idx,
1834 PeerEndpoint::tcp("h".into(), 8101 + u16::try_from(idx).unwrap_or(0)),
1835 rack.into(),
1836 dc.into(),
1837 vec![DynToken::from_u32(tok)],
1838 is_local,
1839 is_same,
1840 false,
1841 );
1842 p.set_state(PeerState::Normal, 0);
1843 p
1844 }
1845
1846 fn pool(read: ConsistencyLevel, write: ConsistencyLevel, peers: Vec<Peer>) -> Arc<ServerPool> {
1847 let pool = ServerPool::new(cfg(read, write), peers);
1848 pool.preselect_remote_racks();
1849 Arc::new(pool)
1850 }
1851
1852 #[test]
1853 fn local_node_only_short_circuits() {
1854 let p = pool(
1855 ConsistencyLevel::DcOne,
1856 ConsistencyLevel::DcOne,
1857 vec![peer(0, "dc1", "rA", 10, true, true)],
1858 );
1859 let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
1860 req.set_routing(MsgRouting::LocalNodeOnly);
1861 assert_eq!(
1862 ClusterDispatcher::new(p).plan(&req, b"k"),
1863 DispatchPlan::LocalDatastore,
1864 );
1865 }
1866
1867 #[test]
1868 fn dc_one_read_targets_local_rack_when_present() {
1869 let p = pool(
1870 ConsistencyLevel::DcOne,
1871 ConsistencyLevel::DcOne,
1872 vec![
1873 peer(0, "dc1", "rA", 10, true, true),
1874 peer(1, "dc1", "rB", 20, false, true),
1875 peer(2, "dc2", "rA", 30, false, false),
1876 ],
1877 );
1878 let req = Msg::new(1, MsgType::ReqRedisGet, true);
1879 let plan = ClusterDispatcher::new(p).plan(&req, b"hello");
1881 assert!(matches!(plan, DispatchPlan::LocalDatastore));
1882 }
1883
1884 #[test]
1885 fn dc_quorum_fans_out_local_dc() {
1886 let p = pool(
1887 ConsistencyLevel::DcQuorum,
1888 ConsistencyLevel::DcQuorum,
1889 vec![
1890 peer(0, "dc1", "rA", 10, true, true),
1891 peer(1, "dc1", "rB", 20, false, true),
1892 peer(2, "dc2", "rA", 30, false, false),
1893 ],
1894 );
1895 let req = Msg::new(1, MsgType::ReqRedisGet, true);
1896 let plan = ClusterDispatcher::new(p).plan(&req, b"k");
1897 match plan {
1898 DispatchPlan::Replicas { targets: rs, .. } => {
1899 assert_eq!(rs.len(), 2);
1900 for r in rs {
1901 assert_eq!(r.dc, "dc1");
1902 }
1903 }
1904 _ => panic!("expected replicas"),
1905 }
1906 }
1907
1908 #[test]
1909 fn dc_each_safe_quorum_fans_out_per_dc() {
1910 let p = pool(
1911 ConsistencyLevel::DcEachSafeQuorum,
1912 ConsistencyLevel::DcEachSafeQuorum,
1913 vec![
1914 peer(0, "dc1", "rA", 10, true, true),
1915 peer(1, "dc2", "rA", 20, false, false),
1916 ],
1917 );
1918 let req = Msg::new(1, MsgType::ReqRedisGet, true);
1919 let plan = ClusterDispatcher::new(p).plan(&req, b"k");
1920 match plan {
1921 DispatchPlan::Replicas { targets: rs, .. } => {
1922 assert_eq!(rs.len(), 2);
1923 let dcs: Vec<&str> = rs.iter().map(|r| r.dc.as_str()).collect();
1924 assert!(dcs.contains(&"dc1"));
1925 assert!(dcs.contains(&"dc2"));
1926 }
1927 _ => panic!("expected replicas"),
1928 }
1929 }
1930
1931 #[test]
1932 fn no_routable_peers_returns_no_targets() {
1933 let mut p0 = peer(0, "dc1", "rA", 10, true, true);
1934 p0.set_state(PeerState::Down, 0);
1935 let p = pool(
1936 ConsistencyLevel::DcQuorum,
1937 ConsistencyLevel::DcQuorum,
1938 vec![p0],
1939 );
1940 let req = Msg::new(1, MsgType::ReqRedisGet, true);
1941 let plan = ClusterDispatcher::new(p).plan(&req, b"k");
1942 assert_eq!(plan, DispatchPlan::NoTargets);
1943 }
1944
1945 #[test]
1953 fn no_targets_error_response_carries_dynomite_wire_bytes() {
1954 let mut p0 = peer(0, "dc1", "rA", 10, true, true);
1955 p0.set_state(PeerState::Down, 0);
1956 let p = pool(
1957 ConsistencyLevel::DcQuorum,
1958 ConsistencyLevel::DcQuorum,
1959 vec![p0],
1960 );
1961 let disp = ClusterDispatcher::new(p);
1962 let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
1963 req.push_key(crate::msg::keypos::KeyPos::without_tag(b"k".to_vec()));
1964 let (tx, _rx) = mpsc::channel(1);
1965 let outcome = disp.dispatch(req, tx);
1966 match outcome {
1967 DispatchOutcome::Error(rsp) => {
1968 assert_eq!(rsp.ty(), MsgType::RspRedisError);
1969 assert!(rsp.flags().is_error);
1970 let bytes: Vec<u8> = rsp
1971 .mbufs()
1972 .iter()
1973 .flat_map(|b| b.readable().to_vec())
1974 .collect();
1975 assert!(
1976 !bytes.is_empty(),
1977 "NoTargets must produce on-wire bytes, not a 0-byte hang"
1978 );
1979 assert!(bytes.starts_with(b"-Dynomite: "));
1980 assert!(bytes.ends_with(b"\r\n"));
1981 assert_eq!(rsp.mlen() as usize, bytes.len());
1982 }
1983 other => panic!("expected DispatchOutcome::Error, got {other:?}"),
1984 }
1985 }
1986
1987 #[test]
1991 fn no_targets_error_response_memcache_wire_bytes() {
1992 let mut cfg = cfg(ConsistencyLevel::DcQuorum, ConsistencyLevel::DcQuorum);
1998 cfg.data_store = DataStore::Memcache;
1999 let mut p0 = peer(0, "dc1", "rA", 10, true, true);
2000 p0.set_state(PeerState::Down, 0);
2001 let pool_arc = ServerPool::new(cfg, vec![p0]);
2002 pool_arc.preselect_remote_racks();
2003 let disp = ClusterDispatcher::new(Arc::new(pool_arc));
2004 let mut req = Msg::new(1, MsgType::ReqMcGet, true);
2005 req.push_key(crate::msg::keypos::KeyPos::without_tag(b"k".to_vec()));
2006 let (tx, _rx) = mpsc::channel(1);
2007 let outcome = disp.dispatch(req, tx);
2008 match outcome {
2009 DispatchOutcome::Error(rsp) => {
2010 assert_eq!(rsp.ty(), MsgType::RspMcServerError);
2011 let bytes: Vec<u8> = rsp
2012 .mbufs()
2013 .iter()
2014 .flat_map(|b| b.readable().to_vec())
2015 .collect();
2016 assert!(
2017 !bytes.is_empty(),
2018 "NoTargets must produce on-wire bytes, not a 0-byte hang"
2019 );
2020 assert!(bytes.starts_with(b"SERVER_ERROR "));
2021 assert!(bytes.ends_with(b"\r\n"));
2022 }
2023 other => panic!("expected DispatchOutcome::Error, got {other:?}"),
2024 }
2025 }
2026
2027 use crate::cluster::pool::{BucketType, PoolConfig};
2028
2029 fn pool_with_bucket_types(
2030 pool_read: ConsistencyLevel,
2031 pool_write: ConsistencyLevel,
2032 bucket_types: Vec<BucketType>,
2033 default_bucket_type: Option<&str>,
2034 peers: Vec<Peer>,
2035 ) -> Arc<ServerPool> {
2036 let cfg = PoolConfig {
2037 read_consistency: pool_read,
2038 write_consistency: pool_write,
2039 dc: "dc1".into(),
2040 rack: "rA".into(),
2041 bucket_types,
2042 default_bucket_type: default_bucket_type.map(str::to_string),
2043 ..PoolConfig::default()
2044 };
2045 let pool = ServerPool::new(cfg, peers);
2046 pool.preselect_remote_racks();
2047 Arc::new(pool)
2048 }
2049
2050 fn three_local_peers() -> Vec<Peer> {
2051 vec![
2052 peer(0, "dc1", "rA", 10, true, true),
2053 peer(1, "dc1", "rB", 20, false, true),
2054 peer(2, "dc1", "rC", 30, false, true),
2055 ]
2056 }
2057
2058 #[test]
2059 fn bucket_type_overrides_pool_consistency() {
2060 let bts = vec![BucketType {
2062 name: "hot".into(),
2063 read_consistency: ConsistencyLevel::DcQuorum,
2064 write_consistency: ConsistencyLevel::DcQuorum,
2065 n_val: 0,
2066 }];
2067 let p = pool_with_bucket_types(
2068 ConsistencyLevel::DcOne,
2069 ConsistencyLevel::DcOne,
2070 bts,
2071 None,
2072 three_local_peers(),
2073 );
2074 let req = Msg::new(1, MsgType::ReqRedisGet, true);
2075 let plan = ClusterDispatcher::new(p).plan(&req, b"hot/key1");
2076 match plan {
2077 DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 3),
2078 other => panic!("expected DC_QUORUM fan-out, got {other:?}"),
2079 }
2080 }
2081
2082 #[test]
2083 fn slashless_key_falls_back_to_pool_default() {
2084 let bts = vec![BucketType {
2085 name: "hot".into(),
2086 read_consistency: ConsistencyLevel::DcQuorum,
2087 write_consistency: ConsistencyLevel::DcQuorum,
2088 n_val: 0,
2089 }];
2090 let p = pool_with_bucket_types(
2091 ConsistencyLevel::DcOne,
2092 ConsistencyLevel::DcOne,
2093 bts,
2094 None,
2095 three_local_peers(),
2096 );
2097 let req = Msg::new(1, MsgType::ReqRedisGet, true);
2098 let plan = ClusterDispatcher::new(p).plan(&req, b"plain-key");
2099 assert!(matches!(plan, DispatchPlan::LocalDatastore));
2102 }
2103
2104 #[test]
2105 fn unknown_bucket_uses_default_bucket_type_when_set() {
2106 let bts = vec![BucketType {
2107 name: "safe".into(),
2108 read_consistency: ConsistencyLevel::DcQuorum,
2109 write_consistency: ConsistencyLevel::DcQuorum,
2110 n_val: 0,
2111 }];
2112 let p = pool_with_bucket_types(
2113 ConsistencyLevel::DcOne,
2114 ConsistencyLevel::DcOne,
2115 bts,
2116 Some("safe"),
2117 three_local_peers(),
2118 );
2119 let req = Msg::new(1, MsgType::ReqRedisGet, true);
2120 let plan = ClusterDispatcher::new(p.clone()).plan(&req, b"plain-key");
2123 match plan {
2124 DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 3),
2125 other => panic!("expected DC_QUORUM via default bucket, got {other:?}"),
2126 }
2127 let plan = ClusterDispatcher::new(p).plan(&req, b"unknown-bucket/key");
2130 match plan {
2131 DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 3),
2132 other => panic!("expected DC_QUORUM via default bucket, got {other:?}"),
2133 }
2134 }
2135
2136 #[test]
2137 fn unknown_bucket_with_no_default_uses_pool_default() {
2138 let bts = vec![BucketType {
2139 name: "safe".into(),
2140 read_consistency: ConsistencyLevel::DcQuorum,
2141 write_consistency: ConsistencyLevel::DcQuorum,
2142 n_val: 0,
2143 }];
2144 let p = pool_with_bucket_types(
2145 ConsistencyLevel::DcOne,
2146 ConsistencyLevel::DcOne,
2147 bts,
2148 None,
2149 three_local_peers(),
2150 );
2151 let req = Msg::new(1, MsgType::ReqRedisGet, true);
2152 let plan = ClusterDispatcher::new(p).plan(&req, b"unknown-bucket/key");
2153 assert!(matches!(plan, DispatchPlan::LocalDatastore));
2154 }
2155
2156 #[test]
2157 fn n_val_one_caps_replicas_to_first_target() {
2158 let bts = vec![BucketType {
2159 name: "thin".into(),
2160 read_consistency: ConsistencyLevel::DcQuorum,
2161 write_consistency: ConsistencyLevel::DcQuorum,
2162 n_val: 1,
2163 }];
2164 let p = pool_with_bucket_types(
2165 ConsistencyLevel::DcOne,
2166 ConsistencyLevel::DcOne,
2167 bts,
2168 None,
2169 three_local_peers(),
2170 );
2171 let req = Msg::new(1, MsgType::ReqRedisGet, true);
2172 let plan = ClusterDispatcher::new(p).plan(&req, b"thin/key");
2173 match plan {
2174 DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 1),
2175 other => panic!("expected single-target plan, got {other:?}"),
2176 }
2177 }
2178
2179 #[test]
2180 fn n_val_two_caps_replicas_to_first_two_targets() {
2181 let bts = vec![BucketType {
2182 name: "medium".into(),
2183 read_consistency: ConsistencyLevel::DcQuorum,
2184 write_consistency: ConsistencyLevel::DcQuorum,
2185 n_val: 2,
2186 }];
2187 let p = pool_with_bucket_types(
2188 ConsistencyLevel::DcOne,
2189 ConsistencyLevel::DcOne,
2190 bts,
2191 None,
2192 three_local_peers(),
2193 );
2194 let req = Msg::new(1, MsgType::ReqRedisGet, true);
2195 let plan = ClusterDispatcher::new(p).plan(&req, b"medium/key");
2196 match plan {
2197 DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 2),
2198 other => panic!("expected two-target plan, got {other:?}"),
2199 }
2200 }
2201
2202 #[test]
2203 fn n_val_zero_does_not_cap() {
2204 let bts = vec![BucketType {
2205 name: "any".into(),
2206 read_consistency: ConsistencyLevel::DcQuorum,
2207 write_consistency: ConsistencyLevel::DcQuorum,
2208 n_val: 0,
2209 }];
2210 let p = pool_with_bucket_types(
2211 ConsistencyLevel::DcOne,
2212 ConsistencyLevel::DcOne,
2213 bts,
2214 None,
2215 three_local_peers(),
2216 );
2217 let req = Msg::new(1, MsgType::ReqRedisGet, true);
2218 let plan = ClusterDispatcher::new(p).plan(&req, b"any/key");
2219 match plan {
2220 DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 3),
2221 other => panic!("expected uncapped plan, got {other:?}"),
2222 }
2223 }
2224
2225 #[test]
2226 fn n_val_larger_than_replicas_is_a_no_op() {
2227 let bts = vec![BucketType {
2228 name: "big".into(),
2229 read_consistency: ConsistencyLevel::DcQuorum,
2230 write_consistency: ConsistencyLevel::DcQuorum,
2231 n_val: 7,
2232 }];
2233 let p = pool_with_bucket_types(
2234 ConsistencyLevel::DcOne,
2235 ConsistencyLevel::DcOne,
2236 bts,
2237 None,
2238 three_local_peers(),
2239 );
2240 let req = Msg::new(1, MsgType::ReqRedisGet, true);
2241 let plan = ClusterDispatcher::new(p).plan(&req, b"big/key");
2242 match plan {
2243 DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 3),
2244 other => panic!("expected uncapped plan, got {other:?}"),
2245 }
2246 }
2247
2248 #[test]
2252 fn no_targets_records_failure_metric() {
2253 let mut p0 = peer(0, "dc1", "rA", 10, true, true);
2254 p0.set_state(PeerState::Down, 0);
2255 let p = pool(
2256 ConsistencyLevel::DcQuorum,
2257 ConsistencyLevel::DcQuorum,
2258 vec![p0],
2259 );
2260 let metrics = Arc::new(crate::stats::FailureMetrics::new());
2261 let disp = ClusterDispatcher::new(p).with_failure_metrics(metrics.clone());
2262 let req = Msg::new(1, MsgType::ReqRedisGet, true);
2263 assert_eq!(disp.plan(&req, b"k"), DispatchPlan::NoTargets);
2264 let snap = metrics.snapshot();
2265 assert_eq!(snap.no_targets.len(), 1);
2266 let entry = &snap.no_targets[0];
2267 assert_eq!(entry.dc, "dc1");
2268 assert_eq!(entry.rack, "rA");
2269 assert_eq!(entry.consistency, ConsistencyLevel::DcQuorum);
2270 assert_eq!(entry.count, 1);
2271 }
2272
2273 #[tokio::test]
2279 async fn closed_backend_channel_records_closed_metric() {
2280 let p = pool(
2281 ConsistencyLevel::DcOne,
2282 ConsistencyLevel::DcOne,
2283 vec![peer(0, "dc1", "rA", 10, true, true)],
2284 );
2285 let (tx, rx) = mpsc::channel::<crate::net::server::OutboundRequest>(4);
2286 drop(rx);
2287 let metrics = Arc::new(crate::stats::FailureMetrics::new());
2288 let disp = ClusterDispatcher::new(p)
2289 .with_backend(tx)
2290 .with_failure_metrics(metrics.clone());
2291 let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
2292 let pool_buf = crate::io::mbuf::MbufPool::default();
2296 let mut buf = pool_buf.get();
2297 buf.copy_from_slice(b"PING\r\n");
2298 req.mbufs_mut().push_back(buf);
2299 let (resp_tx, _resp_rx) = mpsc::channel(1);
2300 let outcome = disp.dispatch(req, resp_tx);
2301 assert!(matches!(outcome, DispatchOutcome::Error(_)));
2302 let snap = metrics.snapshot();
2303 assert_eq!(snap.backend_send_closed, 1);
2304 assert_eq!(snap.backend_send_full, 0);
2305 }
2306
2307 #[test]
2312 fn two_peer_pool_with_one_down_records_per_key_no_targets() {
2313 let cfg = crate::cluster::PoolConfig {
2314 dc: "dc1".into(),
2315 rack: "rA".into(),
2316 read_consistency: ConsistencyLevel::DcQuorum,
2317 write_consistency: ConsistencyLevel::DcQuorum,
2318 ..crate::cluster::PoolConfig::default()
2319 };
2320 let p0 = peer(0, "dc1", "rA", 2_147_483_648, true, true);
2328 let mut p1 = peer(1, "dc1", "rA", 0, false, true);
2329 p1.set_state(PeerState::Down, 0);
2330 let pool_arc = ServerPool::new(cfg, vec![p0, p1]);
2331 pool_arc.preselect_remote_racks();
2332 let metrics = Arc::new(crate::stats::FailureMetrics::new());
2333 let disp = ClusterDispatcher::new(Arc::new(pool_arc)).with_failure_metrics(metrics.clone());
2334 let mut planned_no_targets = 0u64;
2335 let mut planned_routable = 0u64;
2336 for i in 0..100u32 {
2337 let key = format!("k{i:03}");
2338 let req = Msg::new(u64::from(i), MsgType::ReqRedisGet, true);
2339 match disp.plan(&req, key.as_bytes()) {
2340 DispatchPlan::NoTargets => planned_no_targets += 1,
2341 DispatchPlan::Replicas { .. } | DispatchPlan::LocalDatastore => {
2342 planned_routable += 1;
2343 }
2344 DispatchPlan::Drop => panic!("unexpected Drop in plan"),
2345 }
2346 }
2347 assert!(planned_no_targets > 0, "expected some NoTargets dispatches");
2348 assert!(planned_routable > 0, "expected some routable dispatches");
2349 let snap = metrics.snapshot();
2350 let counter_total: u64 = snap.no_targets.iter().map(|e| e.count).sum();
2351 assert_eq!(
2352 counter_total, planned_no_targets,
2353 "dispatch_no_targets_total must match observed NoTargets count",
2354 );
2355 assert_eq!(snap.backend_send_full, 0);
2358 assert_eq!(snap.backend_send_closed, 0);
2359 assert!(snap.peer_send_full.is_empty());
2360 assert!(snap.peer_send_closed.is_empty());
2361 }
2362}