use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::mpsc;
use crate::cluster::pool::ServerPool;
use crate::cluster::snitch::{rack_distance, RackDistance};
use crate::cluster::vnode;
use crate::conf::HashType as ConfHashType;
use crate::hashkit::{self, HashType};
use crate::io::mbuf::MbufPool;
use crate::msg::{ConsistencyLevel, Msg, MsgRouting, MsgType};
use crate::net::dispatcher::{DispatchOutcome, Dispatcher, OutboundEnvelope, ServerSink};
use crate::net::server::OutboundRequest;
#[must_use]
pub fn distribution_shadow_disagreement_total() -> u64 {
SHADOW_DISAGREEMENTS.load(Ordering::Relaxed)
}
pub fn reset_distribution_shadow_disagreement_total() {
SHADOW_DISAGREEMENTS.store(0, Ordering::Relaxed);
}
static SHADOW_DISAGREEMENTS: AtomicU64 = AtomicU64::new(0);
fn bump_shadow_disagreement() {
SHADOW_DISAGREEMENTS.fetch_add(1, Ordering::Relaxed);
}
fn enter_plan_span(
req_id: u64,
plan: &DispatchPlan,
) -> (tracing::Span, tracing::span::EnteredSpan) {
let req_span = tracing::Span::current();
let kind: &'static str = match plan {
DispatchPlan::Drop => "drop",
DispatchPlan::NoTargets => "no_targets",
DispatchPlan::LocalDatastore => "local_datastore",
DispatchPlan::Replicas { .. } => "replicas",
};
let targets = match plan {
DispatchPlan::Replicas { targets, .. } => targets.len(),
_ => 0,
};
let span = tracing::info_span!("dispatch.plan", req_id, plan = kind, targets,).entered();
(req_span, span)
}
fn map_hash(h: ConfHashType) -> HashType {
match h {
ConfHashType::OneAtATime => HashType::OneAtATime,
ConfHashType::Md5 => HashType::Md5,
ConfHashType::Crc16 => HashType::Crc16,
ConfHashType::Crc32 => HashType::Crc32,
ConfHashType::Crc32a => HashType::Crc32a,
ConfHashType::Fnv1_64 => HashType::Fnv1_64,
ConfHashType::Fnv1a64 => HashType::Fnv1a_64,
ConfHashType::Fnv1_32 => HashType::Fnv1_32,
ConfHashType::Fnv1a32 => HashType::Fnv1a_32,
ConfHashType::Hsieh => HashType::Hsieh,
ConfHashType::Murmur => HashType::Murmur,
ConfHashType::Jenkins => HashType::Jenkins,
ConfHashType::Murmur3 => HashType::Murmur3,
ConfHashType::Murmur3X64_64 => HashType::Murmur3X64_64,
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ReplicaTarget {
pub peer_idx: u32,
pub dc: String,
pub rack: String,
pub is_local: bool,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum DispatchPlan {
LocalDatastore,
Replicas {
targets: Vec<ReplicaTarget>,
consistency: ConsistencyLevel,
},
NoTargets,
Drop,
}
#[derive(Debug, Clone)]
pub struct ClusterDispatcher {
pool: Arc<ServerPool>,
backend: Option<mpsc::Sender<OutboundRequest>>,
peer_backends: std::collections::HashMap<u32, mpsc::Sender<OutboundRequest>>,
mbuf_pool: MbufPool,
hint_store: Option<Arc<crate::cluster::hints::HintStore>>,
failure_metrics: Option<Arc<crate::stats::FailureMetrics>>,
command_extension: Option<Arc<dyn crate::embed::CommandExtension>>,
}
impl ClusterDispatcher {
#[must_use]
pub fn new(pool: Arc<ServerPool>) -> Self {
Self {
pool,
backend: None,
peer_backends: std::collections::HashMap::new(),
mbuf_pool: MbufPool::default(),
hint_store: None,
failure_metrics: None,
command_extension: None,
}
}
#[must_use]
pub fn with_mbuf_pool(mut self, pool: MbufPool) -> Self {
self.mbuf_pool = pool;
self
}
#[must_use]
pub fn mbuf_pool(&self) -> &MbufPool {
&self.mbuf_pool
}
#[must_use]
pub fn with_backend(mut self, backend: mpsc::Sender<OutboundRequest>) -> Self {
self.backend = Some(backend);
self
}
#[must_use]
pub fn with_peer_backend(
mut self,
peer_idx: u32,
sender: mpsc::Sender<OutboundRequest>,
) -> Self {
self.peer_backends.insert(peer_idx, sender);
self
}
#[must_use]
pub fn has_backend(&self) -> bool {
self.backend.is_some()
}
#[must_use]
pub fn peer_backend_count(&self) -> usize {
self.peer_backends.len()
}
#[must_use]
pub fn pool(&self) -> &Arc<ServerPool> {
&self.pool
}
#[must_use]
pub fn with_hint_store(mut self, store: Arc<crate::cluster::hints::HintStore>) -> Self {
self.hint_store = Some(store);
self
}
#[must_use]
pub fn hint_store(&self) -> Option<&Arc<crate::cluster::hints::HintStore>> {
self.hint_store.as_ref()
}
#[must_use]
pub fn with_failure_metrics(mut self, metrics: Arc<crate::stats::FailureMetrics>) -> Self {
self.failure_metrics = Some(metrics);
self
}
#[must_use]
pub fn failure_metrics(&self) -> Option<&Arc<crate::stats::FailureMetrics>> {
self.failure_metrics.as_ref()
}
#[must_use]
pub fn with_command_extension(mut self, ext: Arc<dyn crate::embed::CommandExtension>) -> Self {
self.command_extension = Some(ext);
self
}
#[must_use]
pub fn command_extension(&self) -> Option<&Arc<dyn crate::embed::CommandExtension>> {
self.command_extension.as_ref()
}
#[must_use]
pub fn hinted_handoff_active(&self) -> bool {
self.hint_store.is_some() && self.pool.config().enable_hinted_handoff
}
#[must_use]
pub fn plan(&self, req: &Msg, key: &[u8]) -> DispatchPlan {
let cfg = self.pool.config();
let peers = self.pool.peers().read();
if peers.is_empty() {
self.record_no_targets_metric(cfg, ConsistencyLevel::default());
return DispatchPlan::NoTargets;
}
if matches!(req.routing(), MsgRouting::LocalNodeOnly) {
return DispatchPlan::LocalDatastore;
}
if key.is_empty() {
return DispatchPlan::LocalDatastore;
}
let token = hashkit::hash(map_hash(cfg.hash), key);
let key_hash64 = hashkit::hash64(map_hash(cfg.hash), key);
let bucket = crate::proto::redis::bucket_name(key);
let bucket_type = cfg.resolve_bucket_type(bucket);
let is_read = matches!(req.ty(), MsgType::Unknown) || req.flags().is_read;
let consistency = match (bucket_type, is_read) {
(Some(bt), true) => bt.read_consistency,
(Some(bt), false) => bt.write_consistency,
(None, true) => cfg.read_consistency,
(None, false) => cfg.write_consistency,
};
let n_val_cap = bucket_type.map_or(0, |bt| bt.n_val);
let dcs = self.pool.datacenters().read();
let include_down = self.hinted_handoff_active() && !is_read;
let routable = collect_routable(
&dcs,
&peers,
&token,
key_hash64,
cfg.distribution,
include_down,
);
if let Some(shadow) = cfg.distribution_shadow {
if shadow != cfg.distribution {
let shadow_routable =
collect_routable(&dcs, &peers, &token, key_hash64, shadow, include_down);
if !plans_agree(&routable, &shadow_routable) {
bump_shadow_disagreement();
tracing::debug!(
target: "dynomite::dispatch::shadow",
live = cfg.distribution.as_str(),
shadow = shadow.as_str(),
"shadow distribution disagreed on key route"
);
}
}
}
if routable.is_empty() {
self.record_no_targets_metric(cfg, consistency);
return DispatchPlan::NoTargets;
}
let (local, remote): (Vec<_>, Vec<_>) = routable
.into_iter()
.partition(|(dc_idx, _, _)| dcs[*dc_idx].name() == cfg.dc);
let plan =
plan_with_consistency(cfg, &dcs, &peers, consistency, req.routing(), local, remote);
let plan = cap_replicas(plan, n_val_cap);
if matches!(plan, DispatchPlan::NoTargets) {
self.record_no_targets_metric(cfg, consistency);
}
plan
}
fn record_no_targets_metric(
&self,
cfg: &crate::cluster::pool::PoolConfig,
consistency: ConsistencyLevel,
) {
if let Some(m) = self.failure_metrics.as_ref() {
m.record_no_targets(&cfg.dc, &cfg.rack, consistency);
}
}
fn peer_dc_label(&self, peer_idx: u32) -> String {
let peers = self.pool.peers().read();
peers
.get(peer_idx as usize)
.map_or_else(|| self.pool.config().dc.clone(), |p| p.dc().to_string())
}
}
fn cap_replicas(plan: DispatchPlan, cap: u8) -> DispatchPlan {
if cap == 0 {
return plan;
}
let cap = cap as usize;
match plan {
DispatchPlan::Replicas {
mut targets,
consistency,
} if targets.len() > cap => {
targets.truncate(cap);
DispatchPlan::Replicas {
targets,
consistency,
}
}
other => other,
}
}
fn plans_agree(a: &[(usize, usize, u32)], b: &[(usize, usize, u32)]) -> bool {
if a.len() != b.len() {
return false;
}
let mut a_idx: Vec<u32> = a.iter().map(|t| t.2).collect();
let mut b_idx: Vec<u32> = b.iter().map(|t| t.2).collect();
a_idx.sort_unstable();
b_idx.sort_unstable();
a_idx == b_idx
}
fn collect_routable(
dcs: &[crate::cluster::Datacenter],
peers: &[crate::cluster::peer::Peer],
token: &crate::hashkit::DynToken,
hash64: u64,
distribution: crate::conf::Distribution,
include_down: bool,
) -> Vec<(usize, usize, u32)> {
let mut routable: Vec<(usize, usize, u32)> = Vec::new();
for (dc_idx, dc) in dcs.iter().enumerate() {
for (rack_idx, rack) in dc.racks().iter().enumerate() {
let candidate = match (distribution, rack.random_slices()) {
(crate::conf::Distribution::RandomSlicing, Some(slices)) => {
slices.claimant_for(hash64).and_then(|name| {
peers.iter().find_map(|p| {
if p.dc() == dc.name()
&& p.rack() == rack.name()
&& p.endpoint().pname() == name
{
Some(p.idx())
} else {
None
}
})
})
}
_ => vnode::dispatch(rack.continuums(), token),
};
if let Some(peer_idx) = candidate {
if let Some(peer) = peers.get(peer_idx as usize) {
let state = peer.state();
let accept = state.is_routable()
|| (include_down && matches!(state, crate::cluster::peer::PeerState::Down));
if accept {
routable.push((dc_idx, rack_idx, peer_idx));
}
}
}
}
}
routable
}
fn build_target(
dcs: &[crate::cluster::Datacenter],
peers: &[crate::cluster::peer::Peer],
dc_idx: usize,
rack_idx: usize,
peer_idx: u32,
) -> ReplicaTarget {
let dc_name = dcs[dc_idx].name().to_string();
let rack_name = dcs[dc_idx].racks()[rack_idx].name().to_string();
let is_local = peers
.get(peer_idx as usize)
.is_some_and(crate::cluster::peer::Peer::is_local);
ReplicaTarget {
peer_idx,
dc: dc_name,
rack: rack_name,
is_local,
}
}
fn plan_with_consistency(
cfg: &crate::cluster::pool::PoolConfig,
dcs: &[crate::cluster::Datacenter],
peers: &[crate::cluster::peer::Peer],
consistency: ConsistencyLevel,
routing: MsgRouting,
local: Vec<(usize, usize, u32)>,
remote: Vec<(usize, usize, u32)>,
) -> DispatchPlan {
let want_per_dc_fanout = matches!(consistency, ConsistencyLevel::DcEachSafeQuorum)
|| matches!(routing, MsgRouting::AllNodesAllRacksAllDcs);
let mut targets: Vec<ReplicaTarget> = Vec::new();
match consistency {
ConsistencyLevel::DcOne => {
if local.is_empty() {
return DispatchPlan::NoTargets;
}
let mut best: Option<(RackDistance, (usize, usize, u32))> = None;
for (dc_idx, rack_idx, peer_idx) in local {
let rack_name = dcs[dc_idx].racks()[rack_idx].name();
let d = rack_distance(&cfg.dc, &cfg.rack, &cfg.dc, rack_name);
let take = match best {
None => true,
Some((bd, _)) => d.cost() < bd.cost(),
};
if take {
best = Some((d, (dc_idx, rack_idx, peer_idx)));
}
}
if let Some((_, (dc_idx, rack_idx, peer_idx))) = best {
let is_local_node = peers
.get(peer_idx as usize)
.is_some_and(crate::cluster::peer::Peer::is_local);
if is_local_node {
return DispatchPlan::LocalDatastore;
}
targets.push(build_target(dcs, peers, dc_idx, rack_idx, peer_idx));
}
}
ConsistencyLevel::DcQuorum | ConsistencyLevel::DcSafeQuorum => {
if local.is_empty() {
return DispatchPlan::NoTargets;
}
for (dc_idx, rack_idx, peer_idx) in local {
targets.push(build_target(dcs, peers, dc_idx, rack_idx, peer_idx));
}
}
ConsistencyLevel::DcEachSafeQuorum => {
if local.is_empty() && remote.is_empty() {
return DispatchPlan::NoTargets;
}
for (dc_idx, rack_idx, peer_idx) in local.iter().chain(remote.iter()) {
targets.push(build_target(dcs, peers, *dc_idx, *rack_idx, *peer_idx));
}
}
}
if want_per_dc_fanout && !remote.is_empty() {
for (dc_idx, rack_idx, peer_idx) in remote {
if !targets.iter().any(|t| t.peer_idx == peer_idx) {
targets.push(build_target(dcs, peers, dc_idx, rack_idx, peer_idx));
}
}
}
if targets.is_empty() {
return DispatchPlan::LocalDatastore;
}
DispatchPlan::Replicas {
targets,
consistency,
}
}
impl Dispatcher for ClusterDispatcher {
#[allow(
clippy::too_many_lines,
reason = "single dispatch fn must enumerate every plan; splitting hides the planner-to-effect mapping"
)]
fn dispatch(&self, req: Msg, responder: ServerSink) -> DispatchOutcome {
if req.flags().quit {
return DispatchOutcome::Drop;
}
if let Some(ext) = self.command_extension.as_ref() {
if let Some(outcome) = self.intercept_command(ext.as_ref(), &req) {
return outcome;
}
}
let key: Vec<u8> = req
.keys()
.first()
.map(|kp| kp.tag_bytes().to_vec())
.unwrap_or_default();
let plan = self.plan(&req, &key);
let (req_span, _plan_span) = enter_plan_span(req.id(), &plan);
match plan {
DispatchPlan::Drop => DispatchOutcome::Drop,
DispatchPlan::NoTargets => {
let err_type = if matches!(req.ty(), MsgType::ReqRedisGet | MsgType::ReqRedisSet) {
MsgType::RspRedisError
} else {
MsgType::RspMcServerError
};
let rsp = crate::msg::response::make_error(
&req,
err_type,
0,
crate::msg::DynErrorCode::DynomiteNoQuorumAchieved,
&self.mbuf_pool,
);
DispatchOutcome::Error(rsp)
}
DispatchPlan::LocalDatastore => {
if let Some(tx) = self.backend.as_ref() {
let bytes: Vec<u8> = req
.mbufs()
.iter()
.flat_map(|b| b.readable().to_vec())
.collect();
if bytes.is_empty() {
return DispatchOutcome::Drop;
}
let env = OutboundRequest {
bytes,
req_id: req.id(),
responder,
span: req_span.clone(),
ty: crate::proto::dnode::DmsgType::Req,
target_peer_idx: None,
};
if let Err(err) = tx.try_send(env) {
if let Some(m) = self.failure_metrics.as_ref() {
match err {
tokio::sync::mpsc::error::TrySendError::Full(_) => {
m.record_backend_send_full();
}
tokio::sync::mpsc::error::TrySendError::Closed(_) => {
m.record_backend_send_closed();
}
}
}
let err_type =
if matches!(req.ty(), MsgType::ReqRedisGet | MsgType::ReqRedisSet) {
MsgType::RspRedisError
} else {
MsgType::RspMcServerError
};
let rsp = crate::msg::response::make_error(
&req,
err_type,
0,
crate::msg::DynErrorCode::DynomiteNoQuorumAchieved,
&self.mbuf_pool,
);
return DispatchOutcome::Error(rsp);
}
}
DispatchOutcome::Pending
}
DispatchPlan::Replicas {
targets,
consistency,
} => self.dispatch_replicas(&req, &req_span, &targets, consistency, responder),
}
}
}
impl ClusterDispatcher {
fn dispatch_replicas(
&self,
req: &Msg,
req_span: &tracing::Span,
targets: &[ReplicaTarget],
consistency: ConsistencyLevel,
responder: ServerSink,
) -> DispatchOutcome {
if targets.is_empty() {
return DispatchOutcome::Drop;
}
let bytes: Vec<u8> = req
.mbufs()
.iter()
.flat_map(|b| b.readable().to_vec())
.collect();
if bytes.is_empty() {
return DispatchOutcome::Drop;
}
let peer_states = self.snapshot_peer_states(targets);
let is_read = matches!(req.ty(), MsgType::Unknown) || req.flags().is_read;
let is_write = !is_read;
let handoff_active = self.hinted_handoff_active() && is_write;
if targets.len() == 1 {
return self.dispatch_replicas_direct(
req,
req_span,
targets,
&bytes,
&responder,
&HandoffCtx {
handoff_active,
peer_states: &peer_states,
},
);
}
let cfg = self.pool.config();
let local_dc = cfg.dc.clone();
let (intermediate_tx, intermediate_rx) =
mpsc::channel::<OutboundEnvelope>(targets.len() + 1);
let target_pairs: Vec<(u32, String)> =
targets.iter().map(|t| (t.peer_idx, t.dc.clone())).collect();
let repair_key: Option<Vec<u8>> = req
.keys()
.first()
.map(|kp| kp.tag_bytes().to_vec())
.filter(|k| !k.is_empty());
let repair_ctx = repair_key.map(|key| ReadRepairContext {
req_id: req.id(),
req_ty: req.ty(),
key,
mbuf_pool: self.mbuf_pool.clone(),
peer_backends: self.peer_backends.clone(),
local_backend: self.backend.clone(),
target_is_local: targets.iter().map(|t| (t.peer_idx, t.is_local)).collect(),
});
let mut sent = 0usize;
let mut hinted = 0usize;
for target in targets {
let action = Self::choose_target_action(target, handoff_active, &peer_states);
match action {
TargetAction::Send => {
if self.fanout_send(target, req, req_span, &bytes, &intermediate_tx) {
sent += 1;
} else if handoff_active
&& self.hint_target(target, &bytes, req, req_span, &intermediate_tx)
{
hinted += 1;
}
}
TargetAction::Hint => {
if self.hint_target(target, &bytes, req, req_span, &intermediate_tx) {
hinted += 1;
}
}
}
}
drop(intermediate_tx);
if sent + hinted == 0 {
return DispatchOutcome::Error(self.no_quorum_error(req));
}
let req_id = req.id();
let req_ty = req.ty();
let mbuf_pool = self.mbuf_pool.clone();
let failure_metrics = self.failure_metrics.clone();
tokio::spawn(coalesce_actor(
req_id,
req_ty,
consistency,
target_pairs,
local_dc,
intermediate_rx,
responder,
mbuf_pool,
repair_ctx,
failure_metrics,
));
DispatchOutcome::Pending
}
fn snapshot_peer_states(
&self,
targets: &[ReplicaTarget],
) -> std::collections::HashMap<u32, crate::cluster::peer::PeerState> {
use crate::cluster::peer::PeerState;
let peers = self.pool.peers().read();
let mut out = std::collections::HashMap::with_capacity(targets.len());
for t in targets {
let state = if t.is_local {
PeerState::Normal
} else {
peers
.get(t.peer_idx as usize)
.map_or(PeerState::Unknown, crate::cluster::peer::Peer::state)
};
out.insert(t.peer_idx, state);
}
out
}
fn choose_target_action(
target: &ReplicaTarget,
handoff_active: bool,
peer_states: &std::collections::HashMap<u32, crate::cluster::peer::PeerState>,
) -> TargetAction {
use crate::cluster::peer::PeerState;
if !handoff_active {
return TargetAction::Send;
}
let state = peer_states
.get(&target.peer_idx)
.copied()
.unwrap_or(PeerState::Unknown);
match state {
PeerState::Down => TargetAction::Hint,
_ => TargetAction::Send,
}
}
fn fanout_send(
&self,
target: &ReplicaTarget,
req: &Msg,
req_span: &tracing::Span,
bytes: &[u8],
intermediate_tx: &mpsc::Sender<OutboundEnvelope>,
) -> bool {
let env = OutboundRequest {
bytes: bytes.to_vec(),
req_id: req.id(),
responder: intermediate_tx.clone(),
span: req_span.clone(),
ty: crate::proto::dnode::DmsgType::Req,
target_peer_idx: Some(target.peer_idx),
};
let send_result = if target.is_local {
self.backend.as_ref().map(|tx| tx.try_send(env))
} else {
self.peer_backends
.get(&target.peer_idx)
.map(|tx| tx.try_send(env))
};
match send_result {
Some(Ok(())) => true,
Some(Err(err)) => {
self.observe_send_error(target, &err);
false
}
None => false,
}
}
fn observe_send_error(
&self,
target: &ReplicaTarget,
err: &tokio::sync::mpsc::error::TrySendError<OutboundRequest>,
) {
let Some(m) = self.failure_metrics.as_ref() else {
return;
};
if target.is_local {
match err {
tokio::sync::mpsc::error::TrySendError::Full(_) => m.record_backend_send_full(),
tokio::sync::mpsc::error::TrySendError::Closed(_) => {
m.record_backend_send_closed();
}
}
} else {
let peer_dc = self.peer_dc_label(target.peer_idx);
match err {
tokio::sync::mpsc::error::TrySendError::Full(_) => {
m.record_peer_send_full(target.peer_idx, &peer_dc);
}
tokio::sync::mpsc::error::TrySendError::Closed(_) => {
m.record_peer_send_closed(target.peer_idx, &peer_dc);
}
}
}
}
fn hint_target(
&self,
target: &ReplicaTarget,
bytes: &[u8],
req: &Msg,
req_span: &tracing::Span,
intermediate_tx: &mpsc::Sender<OutboundEnvelope>,
) -> bool {
let Some(store) = self.hint_store.as_ref() else {
return false;
};
let cfg = self.pool.config();
let ttl = std::time::Duration::from_secs(cfg.hint_ttl_seconds.max(1));
match store.enqueue(target.peer_idx, bytes.to_vec(), ttl) {
Ok(()) => {}
Err(e) => {
tracing::debug!(
target: "dynomite::hints",
peer_idx = target.peer_idx,
error = %e,
"hint enqueue failed"
);
return false;
}
}
let synth = synth_hint_reply(req, &self.mbuf_pool);
let env = OutboundEnvelope {
req_id: req.id(),
rsp: synth,
span: req_span.clone(),
source_peer_idx: Some(target.peer_idx),
};
if intermediate_tx.try_send(env).is_err() {
tracing::debug!(
target: "dynomite::hints",
peer_idx = target.peer_idx,
"hint synth-reply could not be queued; coalescer absent"
);
}
tracing::debug!(
target: "dynomite::hints",
peer_idx = target.peer_idx,
bytes = bytes.len(),
"stored hint for down peer"
);
true
}
fn dispatch_replicas_direct(
&self,
req: &Msg,
req_span: &tracing::Span,
targets: &[ReplicaTarget],
bytes: &[u8],
responder: &ServerSink,
ctx: &HandoffCtx<'_>,
) -> DispatchOutcome {
debug_assert_eq!(targets.len(), 1);
let target = &targets[0];
if let TargetAction::Hint =
Self::choose_target_action(target, ctx.handoff_active, ctx.peer_states)
{
if self.hint_single_target_direct(target, bytes, req, req_span, responder) {
return DispatchOutcome::Pending;
}
return DispatchOutcome::Error(self.no_quorum_error(req));
}
let env = OutboundRequest {
bytes: bytes.to_vec(),
req_id: req.id(),
responder: responder.clone(),
span: req_span.clone(),
ty: crate::proto::dnode::DmsgType::Req,
target_peer_idx: Some(target.peer_idx),
};
let send_result = if target.is_local {
self.backend.as_ref().map(|tx| tx.try_send(env))
} else {
self.peer_backends
.get(&target.peer_idx)
.map(|tx| tx.try_send(env))
};
let sent = match send_result {
Some(Ok(())) => true,
Some(Err(ref err)) => {
self.observe_send_error(target, err);
false
}
None => false,
};
if sent {
return DispatchOutcome::Pending;
}
if ctx.handoff_active
&& self.hint_single_target_direct(target, bytes, req, req_span, responder)
{
return DispatchOutcome::Pending;
}
DispatchOutcome::Error(self.no_quorum_error(req))
}
fn hint_single_target_direct(
&self,
target: &ReplicaTarget,
bytes: &[u8],
req: &Msg,
req_span: &tracing::Span,
responder: &ServerSink,
) -> bool {
let Some(store) = self.hint_store.as_ref() else {
return false;
};
let cfg = self.pool.config();
let ttl = std::time::Duration::from_secs(cfg.hint_ttl_seconds.max(1));
if let Err(e) = store.enqueue(target.peer_idx, bytes.to_vec(), ttl) {
tracing::debug!(
target: "dynomite::hints",
peer_idx = target.peer_idx,
error = %e,
"hint enqueue failed (single-target)"
);
return false;
}
let synth = synth_hint_reply(req, &self.mbuf_pool);
let env = OutboundEnvelope {
req_id: req.id(),
rsp: synth,
span: req_span.clone(),
source_peer_idx: Some(target.peer_idx),
};
let _ = responder.try_send(env);
true
}
fn no_quorum_error(&self, req: &Msg) -> Msg {
let err_type = if matches!(req.ty(), MsgType::ReqRedisGet | MsgType::ReqRedisSet) {
MsgType::RspRedisError
} else {
MsgType::RspMcServerError
};
crate::msg::response::make_error(
req,
err_type,
0,
crate::msg::DynErrorCode::DynomiteNoQuorumAchieved,
&self.mbuf_pool,
)
}
fn intercept_command(
&self,
ext: &dyn crate::embed::CommandExtension,
req: &Msg,
) -> Option<DispatchOutcome> {
if ext.handles_msg_type(req.ty()) {
return Some(self.run_extension_command(ext, req));
}
if matches!(req.ty(), MsgType::ReqRedisHset) {
return self.intercept_extension_hset(ext, req);
}
None
}
fn run_extension_command(
&self,
ext: &dyn crate::embed::CommandExtension,
req: &Msg,
) -> DispatchOutcome {
let recovered_kw: Vec<u8>;
let keyword: &[u8] = match req.ty() {
MsgType::ReqRedisFtCreate => b"FT.CREATE",
MsgType::ReqRedisFtSearch => b"FT.SEARCH",
MsgType::ReqRedisFtInfo => b"FT.INFO",
MsgType::ReqRedisFtList => b"FT.LIST",
MsgType::ReqRedisFtDropindex => b"FT.DROPINDEX",
MsgType::ReqRedisFtRegex => b"FT.REGEX",
MsgType::ReqRedisFtUnknown => {
recovered_kw = first_bulk_token(req).unwrap_or_else(|| b"FT.UNKNOWN".to_vec());
recovered_kw.as_slice()
}
_ => return DispatchOutcome::Drop,
};
let mut args: Vec<&[u8]> = Vec::with_capacity(1 + req.keys().len() + req.args().len());
args.push(keyword);
for k in req.keys() {
args.push(k.key());
}
for a in req.args() {
args.push(a.bytes());
}
let bytes = ext.try_dispatch(&args).unwrap_or_else(|| {
let kw = String::from_utf8_lossy(keyword);
format!("-ERR not supported in this build: {kw}\r\n").into_bytes()
});
DispatchOutcome::Inline(synthetic_redis_reply(req, &self.mbuf_pool, &bytes))
}
fn intercept_extension_hset(
&self,
ext: &dyn crate::embed::CommandExtension,
req: &Msg,
) -> Option<DispatchOutcome> {
let mut args: Vec<&[u8]> = Vec::with_capacity(req.keys().len() + req.args().len());
for k in req.keys() {
args.push(k.key());
}
for a in req.args() {
args.push(a.bytes());
}
match ext.try_intercept_hset(&args) {
crate::embed::HsetOutcome::Absorbed | crate::embed::HsetOutcome::NotIndexed => None,
crate::embed::HsetOutcome::Error(message) => {
let payload = format!("-ERR {message}\r\n");
Some(DispatchOutcome::Error(synthetic_redis_reply(
req,
&self.mbuf_pool,
payload.as_bytes(),
)))
}
}
}
}
fn synthetic_redis_reply(req: &Msg, pool: &MbufPool, payload: &[u8]) -> Msg {
let mut rsp = Msg::new(req.id(), MsgType::RspRedisStatus, false);
rsp.set_parent_id(req.id());
let mut written = 0usize;
while written < payload.len() {
let mut buf = pool.get();
let n = buf.recv(&payload[written..]);
debug_assert!(
n > 0,
"MbufPool returned a buffer with zero writable capacity"
);
rsp.mbufs_mut().push_back(buf);
written += n;
}
rsp.recompute_mlen();
rsp
}
fn first_bulk_token(req: &Msg) -> Option<Vec<u8>> {
let mut wire: Vec<u8> = Vec::new();
for buf in req.mbufs() {
wire.extend_from_slice(buf.readable());
if wire.len() > 256 {
break;
}
}
let mut p = 0usize;
if wire.first() == Some(&b'*') {
let cr = wire.iter().position(|&b| b == b'\r')?;
if wire.get(cr + 1) != Some(&b'\n') {
return None;
}
p = cr + 2;
}
if wire.get(p) != Some(&b'$') {
return None;
}
let header_start = p + 1;
let header_cr = wire[header_start..]
.iter()
.position(|&b| b == b'\r')
.map(|i| header_start + i)?;
if wire.get(header_cr + 1) != Some(&b'\n') {
return None;
}
let len_str = std::str::from_utf8(&wire[header_start..header_cr]).ok()?;
let len: usize = len_str.parse().ok()?;
let body_start = header_cr + 2;
let body_end = body_start.checked_add(len)?;
if wire.len() < body_end + 2 {
return None;
}
Some(wire[body_start..body_end].to_vec())
}
#[derive(Clone)]
struct ReadRepairContext {
req_id: crate::core::types::MsgId,
req_ty: MsgType,
key: Vec<u8>,
mbuf_pool: MbufPool,
peer_backends: std::collections::HashMap<u32, mpsc::Sender<OutboundRequest>>,
local_backend: Option<mpsc::Sender<OutboundRequest>>,
target_is_local: std::collections::HashMap<u32, bool>,
}
#[allow(
clippy::too_many_arguments,
reason = "actor task captures the entire dispatch context; bundling into a struct adds churn for no callsite gain"
)]
async fn coalesce_actor(
req_id: crate::core::types::MsgId,
req_ty: MsgType,
consistency: ConsistencyLevel,
targets: Vec<(u32, String)>,
local_dc: String,
mut intermediate_rx: mpsc::Receiver<OutboundEnvelope>,
client_tx: ServerSink,
mbuf_pool: MbufPool,
repair_ctx: Option<ReadRepairContext>,
failure_metrics: Option<Arc<crate::stats::FailureMetrics>>,
) {
use crate::proto::redis::{CoalesceOutcome, CoalesceTracker};
let mut tracker = CoalesceTracker::new(req_id, consistency, targets, &local_dc);
let mut emitted = false;
while let Some(env) = intermediate_rx.recv().await {
let source = env.source_peer_idx.unwrap_or(u32::MAX);
let span = env.span.clone();
let outcome = tracker.record_reply(source, env.rsp);
match outcome {
CoalesceOutcome::Pending => {}
CoalesceOutcome::Ready {
winner,
divergent_targets,
} => {
if !emitted {
let winner_bytes: Vec<u8> = winner
.mbufs()
.iter()
.flat_map(|b| b.readable().to_vec())
.collect();
let out_env = OutboundEnvelope {
req_id,
rsp: *winner,
span: span.clone(),
source_peer_idx: None,
};
let _ = client_tx.send(out_env).await;
emitted = true;
if !divergent_targets.is_empty() {
if let Some(ctx) = repair_ctx.as_ref() {
schedule_read_repair(ctx, &divergent_targets, &winner_bytes, &span);
}
}
}
}
CoalesceOutcome::Error(reason) => {
if !emitted {
let err_type = if matches!(req_ty, MsgType::ReqRedisGet | MsgType::ReqRedisSet)
{
MsgType::RspRedisError
} else {
MsgType::RspMcServerError
};
let anchor = Msg::new(req_id, req_ty, true);
let rsp = crate::msg::response::make_error(
&anchor,
err_type,
0,
crate::msg::DynErrorCode::DynomiteNoQuorumAchieved,
&mbuf_pool,
);
let _ = client_tx
.send(OutboundEnvelope {
req_id,
rsp,
span: span.clone(),
source_peer_idx: None,
})
.await;
emitted = true;
}
tracing::debug!(target: "dynomite::coalesce", req_id, reason = %reason, "coalesce error");
}
}
}
if !emitted {
if let Some(m) = failure_metrics.as_ref() {
m.record_response_timeout(consistency);
}
let err_type = if matches!(req_ty, MsgType::ReqRedisGet | MsgType::ReqRedisSet) {
MsgType::RspRedisError
} else {
MsgType::RspMcServerError
};
let anchor = Msg::new(req_id, req_ty, true);
let rsp = crate::msg::response::make_error(
&anchor,
err_type,
0,
crate::msg::DynErrorCode::DynomiteNoQuorumAchieved,
&mbuf_pool,
);
let _ = client_tx
.send(OutboundEnvelope {
req_id,
rsp,
span: tracing::Span::none(),
source_peer_idx: None,
})
.await;
}
}
fn repair_sink() -> ServerSink {
let (tx, mut rx) = mpsc::channel::<OutboundEnvelope>(8);
tokio::spawn(async move {
while rx.recv().await.is_some() {
}
});
tx
}
fn decode_winner_for_repair(payload: &[u8]) -> Option<RepairAction> {
if payload == b"$-1\r\n" {
return Some(RepairAction::Delete);
}
if !payload.starts_with(b"$") {
return None;
}
let crlf = payload.iter().position(|&b| b == b'\r')?;
if payload.get(crlf + 1).copied() != Some(b'\n') {
return None;
}
let len_str = std::str::from_utf8(&payload[1..crlf]).ok()?;
let len: usize = len_str.parse().ok()?;
let body_start = crlf + 2;
let body_end = body_start.checked_add(len)?;
if payload.len() < body_end + 2 {
return None;
}
if &payload[body_end..body_end + 2] != b"\r\n" {
return None;
}
Some(RepairAction::Write(payload[body_start..body_end].to_vec()))
}
struct HandoffCtx<'a> {
handoff_active: bool,
peer_states: &'a std::collections::HashMap<u32, crate::cluster::peer::PeerState>,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum TargetAction {
Send,
Hint,
}
fn synth_hint_reply(req: &Msg, pool: &MbufPool) -> Msg {
crate::msg::response::make_simple_redis(req, pool, b"+OK\r\n")
}
enum RepairAction {
Write(Vec<u8>),
Delete,
}
fn build_repair_bytes(action: &RepairAction, key: &[u8]) -> Vec<u8> {
match action {
RepairAction::Write(value) => {
let mut out = Vec::with_capacity(key.len() + value.len() + 32);
out.extend_from_slice(b"*3\r\n$3\r\nSET\r\n$");
out.extend_from_slice(key.len().to_string().as_bytes());
out.extend_from_slice(b"\r\n");
out.extend_from_slice(key);
out.extend_from_slice(b"\r\n$");
out.extend_from_slice(value.len().to_string().as_bytes());
out.extend_from_slice(b"\r\n");
out.extend_from_slice(value);
out.extend_from_slice(b"\r\n");
out
}
RepairAction::Delete => {
let mut out = Vec::with_capacity(key.len() + 24);
out.extend_from_slice(b"*2\r\n$3\r\nDEL\r\n$");
out.extend_from_slice(key.len().to_string().as_bytes());
out.extend_from_slice(b"\r\n");
out.extend_from_slice(key);
out.extend_from_slice(b"\r\n");
out
}
}
}
fn schedule_read_repair(
ctx: &ReadRepairContext,
divergent: &[u32],
winner_bytes: &[u8],
span: &tracing::Span,
) {
if !matches!(ctx.req_ty, MsgType::ReqRedisGet) {
return;
}
let Some(action) = decode_winner_for_repair(winner_bytes) else {
return;
};
let bytes = build_repair_bytes(&action, &ctx.key);
let sink = repair_sink();
for &peer_idx in divergent {
let is_local = ctx.target_is_local.get(&peer_idx).copied().unwrap_or(false);
let env = OutboundRequest {
bytes: bytes.clone(),
req_id: ctx.req_id,
responder: sink.clone(),
span: span.clone(),
ty: crate::proto::dnode::DmsgType::ReqForward,
target_peer_idx: Some(peer_idx),
};
let sent = if is_local {
ctx.local_backend
.as_ref()
.is_some_and(|tx| tx.try_send(env).is_ok())
} else {
ctx.peer_backends
.get(&peer_idx)
.is_some_and(|tx| tx.try_send(env).is_ok())
};
if sent {
let _ = &ctx.mbuf_pool;
tracing::debug!(
target: "dynomite::read_repair",
req_id = ctx.req_id,
peer_idx,
bytes = bytes.len(),
"scheduled read-repair write",
);
} else {
tracing::debug!(
target: "dynomite::read_repair",
req_id = ctx.req_id,
peer_idx,
"read-repair drop: backend channel unavailable or full",
);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cluster::peer::{Peer, PeerEndpoint, PeerState};
use crate::conf::DataStore;
use crate::hashkit::DynToken;
fn cfg(read: ConsistencyLevel, write: ConsistencyLevel) -> crate::cluster::PoolConfig {
crate::cluster::PoolConfig {
read_consistency: read,
write_consistency: write,
dc: "dc1".into(),
rack: "rA".into(),
..crate::cluster::PoolConfig::default()
}
}
fn peer(idx: u32, dc: &str, rack: &str, tok: u32, is_local: bool, is_same: bool) -> Peer {
let mut p = Peer::new(
idx,
PeerEndpoint::tcp("h".into(), 8101 + u16::try_from(idx).unwrap_or(0)),
rack.into(),
dc.into(),
vec![DynToken::from_u32(tok)],
is_local,
is_same,
false,
);
p.set_state(PeerState::Normal, 0);
p
}
fn pool(read: ConsistencyLevel, write: ConsistencyLevel, peers: Vec<Peer>) -> Arc<ServerPool> {
let pool = ServerPool::new(cfg(read, write), peers);
pool.preselect_remote_racks();
Arc::new(pool)
}
#[test]
fn local_node_only_short_circuits() {
let p = pool(
ConsistencyLevel::DcOne,
ConsistencyLevel::DcOne,
vec![peer(0, "dc1", "rA", 10, true, true)],
);
let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
req.set_routing(MsgRouting::LocalNodeOnly);
assert_eq!(
ClusterDispatcher::new(p).plan(&req, b"k"),
DispatchPlan::LocalDatastore,
);
}
#[test]
fn dc_one_read_targets_local_rack_when_present() {
let p = pool(
ConsistencyLevel::DcOne,
ConsistencyLevel::DcOne,
vec![
peer(0, "dc1", "rA", 10, true, true),
peer(1, "dc1", "rB", 20, false, true),
peer(2, "dc2", "rA", 30, false, false),
],
);
let req = Msg::new(1, MsgType::ReqRedisGet, true);
let plan = ClusterDispatcher::new(p).plan(&req, b"hello");
assert!(matches!(plan, DispatchPlan::LocalDatastore));
}
#[test]
fn dc_quorum_fans_out_local_dc() {
let p = pool(
ConsistencyLevel::DcQuorum,
ConsistencyLevel::DcQuorum,
vec![
peer(0, "dc1", "rA", 10, true, true),
peer(1, "dc1", "rB", 20, false, true),
peer(2, "dc2", "rA", 30, false, false),
],
);
let req = Msg::new(1, MsgType::ReqRedisGet, true);
let plan = ClusterDispatcher::new(p).plan(&req, b"k");
match plan {
DispatchPlan::Replicas { targets: rs, .. } => {
assert_eq!(rs.len(), 2);
for r in rs {
assert_eq!(r.dc, "dc1");
}
}
_ => panic!("expected replicas"),
}
}
#[test]
fn dc_each_safe_quorum_fans_out_per_dc() {
let p = pool(
ConsistencyLevel::DcEachSafeQuorum,
ConsistencyLevel::DcEachSafeQuorum,
vec![
peer(0, "dc1", "rA", 10, true, true),
peer(1, "dc2", "rA", 20, false, false),
],
);
let req = Msg::new(1, MsgType::ReqRedisGet, true);
let plan = ClusterDispatcher::new(p).plan(&req, b"k");
match plan {
DispatchPlan::Replicas { targets: rs, .. } => {
assert_eq!(rs.len(), 2);
let dcs: Vec<&str> = rs.iter().map(|r| r.dc.as_str()).collect();
assert!(dcs.contains(&"dc1"));
assert!(dcs.contains(&"dc2"));
}
_ => panic!("expected replicas"),
}
}
#[test]
fn no_routable_peers_returns_no_targets() {
let mut p0 = peer(0, "dc1", "rA", 10, true, true);
p0.set_state(PeerState::Down, 0);
let p = pool(
ConsistencyLevel::DcQuorum,
ConsistencyLevel::DcQuorum,
vec![p0],
);
let req = Msg::new(1, MsgType::ReqRedisGet, true);
let plan = ClusterDispatcher::new(p).plan(&req, b"k");
assert_eq!(plan, DispatchPlan::NoTargets);
}
#[test]
fn no_targets_error_response_carries_dynomite_wire_bytes() {
let mut p0 = peer(0, "dc1", "rA", 10, true, true);
p0.set_state(PeerState::Down, 0);
let p = pool(
ConsistencyLevel::DcQuorum,
ConsistencyLevel::DcQuorum,
vec![p0],
);
let disp = ClusterDispatcher::new(p);
let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
req.push_key(crate::msg::keypos::KeyPos::without_tag(b"k".to_vec()));
let (tx, _rx) = mpsc::channel(1);
let outcome = disp.dispatch(req, tx);
match outcome {
DispatchOutcome::Error(rsp) => {
assert_eq!(rsp.ty(), MsgType::RspRedisError);
assert!(rsp.flags().is_error);
let bytes: Vec<u8> = rsp
.mbufs()
.iter()
.flat_map(|b| b.readable().to_vec())
.collect();
assert!(
!bytes.is_empty(),
"NoTargets must produce on-wire bytes, not a 0-byte hang"
);
assert!(bytes.starts_with(b"-Dynomite: "));
assert!(bytes.ends_with(b"\r\n"));
assert_eq!(rsp.mlen() as usize, bytes.len());
}
other => panic!("expected DispatchOutcome::Error, got {other:?}"),
}
}
#[test]
fn no_targets_error_response_memcache_wire_bytes() {
let mut cfg = cfg(ConsistencyLevel::DcQuorum, ConsistencyLevel::DcQuorum);
cfg.data_store = DataStore::Memcache;
let mut p0 = peer(0, "dc1", "rA", 10, true, true);
p0.set_state(PeerState::Down, 0);
let pool_arc = ServerPool::new(cfg, vec![p0]);
pool_arc.preselect_remote_racks();
let disp = ClusterDispatcher::new(Arc::new(pool_arc));
let mut req = Msg::new(1, MsgType::ReqMcGet, true);
req.push_key(crate::msg::keypos::KeyPos::without_tag(b"k".to_vec()));
let (tx, _rx) = mpsc::channel(1);
let outcome = disp.dispatch(req, tx);
match outcome {
DispatchOutcome::Error(rsp) => {
assert_eq!(rsp.ty(), MsgType::RspMcServerError);
let bytes: Vec<u8> = rsp
.mbufs()
.iter()
.flat_map(|b| b.readable().to_vec())
.collect();
assert!(
!bytes.is_empty(),
"NoTargets must produce on-wire bytes, not a 0-byte hang"
);
assert!(bytes.starts_with(b"SERVER_ERROR "));
assert!(bytes.ends_with(b"\r\n"));
}
other => panic!("expected DispatchOutcome::Error, got {other:?}"),
}
}
use crate::cluster::pool::{BucketType, PoolConfig};
fn pool_with_bucket_types(
pool_read: ConsistencyLevel,
pool_write: ConsistencyLevel,
bucket_types: Vec<BucketType>,
default_bucket_type: Option<&str>,
peers: Vec<Peer>,
) -> Arc<ServerPool> {
let cfg = PoolConfig {
read_consistency: pool_read,
write_consistency: pool_write,
dc: "dc1".into(),
rack: "rA".into(),
bucket_types,
default_bucket_type: default_bucket_type.map(str::to_string),
..PoolConfig::default()
};
let pool = ServerPool::new(cfg, peers);
pool.preselect_remote_racks();
Arc::new(pool)
}
fn three_local_peers() -> Vec<Peer> {
vec![
peer(0, "dc1", "rA", 10, true, true),
peer(1, "dc1", "rB", 20, false, true),
peer(2, "dc1", "rC", 30, false, true),
]
}
#[test]
fn bucket_type_overrides_pool_consistency() {
let bts = vec![BucketType {
name: "hot".into(),
read_consistency: ConsistencyLevel::DcQuorum,
write_consistency: ConsistencyLevel::DcQuorum,
n_val: 0,
}];
let p = pool_with_bucket_types(
ConsistencyLevel::DcOne,
ConsistencyLevel::DcOne,
bts,
None,
three_local_peers(),
);
let req = Msg::new(1, MsgType::ReqRedisGet, true);
let plan = ClusterDispatcher::new(p).plan(&req, b"hot/key1");
match plan {
DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 3),
other => panic!("expected DC_QUORUM fan-out, got {other:?}"),
}
}
#[test]
fn slashless_key_falls_back_to_pool_default() {
let bts = vec![BucketType {
name: "hot".into(),
read_consistency: ConsistencyLevel::DcQuorum,
write_consistency: ConsistencyLevel::DcQuorum,
n_val: 0,
}];
let p = pool_with_bucket_types(
ConsistencyLevel::DcOne,
ConsistencyLevel::DcOne,
bts,
None,
three_local_peers(),
);
let req = Msg::new(1, MsgType::ReqRedisGet, true);
let plan = ClusterDispatcher::new(p).plan(&req, b"plain-key");
assert!(matches!(plan, DispatchPlan::LocalDatastore));
}
#[test]
fn unknown_bucket_uses_default_bucket_type_when_set() {
let bts = vec![BucketType {
name: "safe".into(),
read_consistency: ConsistencyLevel::DcQuorum,
write_consistency: ConsistencyLevel::DcQuorum,
n_val: 0,
}];
let p = pool_with_bucket_types(
ConsistencyLevel::DcOne,
ConsistencyLevel::DcOne,
bts,
Some("safe"),
three_local_peers(),
);
let req = Msg::new(1, MsgType::ReqRedisGet, true);
let plan = ClusterDispatcher::new(p.clone()).plan(&req, b"plain-key");
match plan {
DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 3),
other => panic!("expected DC_QUORUM via default bucket, got {other:?}"),
}
let plan = ClusterDispatcher::new(p).plan(&req, b"unknown-bucket/key");
match plan {
DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 3),
other => panic!("expected DC_QUORUM via default bucket, got {other:?}"),
}
}
#[test]
fn unknown_bucket_with_no_default_uses_pool_default() {
let bts = vec![BucketType {
name: "safe".into(),
read_consistency: ConsistencyLevel::DcQuorum,
write_consistency: ConsistencyLevel::DcQuorum,
n_val: 0,
}];
let p = pool_with_bucket_types(
ConsistencyLevel::DcOne,
ConsistencyLevel::DcOne,
bts,
None,
three_local_peers(),
);
let req = Msg::new(1, MsgType::ReqRedisGet, true);
let plan = ClusterDispatcher::new(p).plan(&req, b"unknown-bucket/key");
assert!(matches!(plan, DispatchPlan::LocalDatastore));
}
#[test]
fn n_val_one_caps_replicas_to_first_target() {
let bts = vec![BucketType {
name: "thin".into(),
read_consistency: ConsistencyLevel::DcQuorum,
write_consistency: ConsistencyLevel::DcQuorum,
n_val: 1,
}];
let p = pool_with_bucket_types(
ConsistencyLevel::DcOne,
ConsistencyLevel::DcOne,
bts,
None,
three_local_peers(),
);
let req = Msg::new(1, MsgType::ReqRedisGet, true);
let plan = ClusterDispatcher::new(p).plan(&req, b"thin/key");
match plan {
DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 1),
other => panic!("expected single-target plan, got {other:?}"),
}
}
#[test]
fn n_val_two_caps_replicas_to_first_two_targets() {
let bts = vec![BucketType {
name: "medium".into(),
read_consistency: ConsistencyLevel::DcQuorum,
write_consistency: ConsistencyLevel::DcQuorum,
n_val: 2,
}];
let p = pool_with_bucket_types(
ConsistencyLevel::DcOne,
ConsistencyLevel::DcOne,
bts,
None,
three_local_peers(),
);
let req = Msg::new(1, MsgType::ReqRedisGet, true);
let plan = ClusterDispatcher::new(p).plan(&req, b"medium/key");
match plan {
DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 2),
other => panic!("expected two-target plan, got {other:?}"),
}
}
#[test]
fn n_val_zero_does_not_cap() {
let bts = vec![BucketType {
name: "any".into(),
read_consistency: ConsistencyLevel::DcQuorum,
write_consistency: ConsistencyLevel::DcQuorum,
n_val: 0,
}];
let p = pool_with_bucket_types(
ConsistencyLevel::DcOne,
ConsistencyLevel::DcOne,
bts,
None,
three_local_peers(),
);
let req = Msg::new(1, MsgType::ReqRedisGet, true);
let plan = ClusterDispatcher::new(p).plan(&req, b"any/key");
match plan {
DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 3),
other => panic!("expected uncapped plan, got {other:?}"),
}
}
#[test]
fn n_val_larger_than_replicas_is_a_no_op() {
let bts = vec![BucketType {
name: "big".into(),
read_consistency: ConsistencyLevel::DcQuorum,
write_consistency: ConsistencyLevel::DcQuorum,
n_val: 7,
}];
let p = pool_with_bucket_types(
ConsistencyLevel::DcOne,
ConsistencyLevel::DcOne,
bts,
None,
three_local_peers(),
);
let req = Msg::new(1, MsgType::ReqRedisGet, true);
let plan = ClusterDispatcher::new(p).plan(&req, b"big/key");
match plan {
DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 3),
other => panic!("expected uncapped plan, got {other:?}"),
}
}
#[test]
fn no_targets_records_failure_metric() {
let mut p0 = peer(0, "dc1", "rA", 10, true, true);
p0.set_state(PeerState::Down, 0);
let p = pool(
ConsistencyLevel::DcQuorum,
ConsistencyLevel::DcQuorum,
vec![p0],
);
let metrics = Arc::new(crate::stats::FailureMetrics::new());
let disp = ClusterDispatcher::new(p).with_failure_metrics(metrics.clone());
let req = Msg::new(1, MsgType::ReqRedisGet, true);
assert_eq!(disp.plan(&req, b"k"), DispatchPlan::NoTargets);
let snap = metrics.snapshot();
assert_eq!(snap.no_targets.len(), 1);
let entry = &snap.no_targets[0];
assert_eq!(entry.dc, "dc1");
assert_eq!(entry.rack, "rA");
assert_eq!(entry.consistency, ConsistencyLevel::DcQuorum);
assert_eq!(entry.count, 1);
}
#[tokio::test]
async fn closed_backend_channel_records_closed_metric() {
let p = pool(
ConsistencyLevel::DcOne,
ConsistencyLevel::DcOne,
vec![peer(0, "dc1", "rA", 10, true, true)],
);
let (tx, rx) = mpsc::channel::<crate::net::server::OutboundRequest>(4);
drop(rx);
let metrics = Arc::new(crate::stats::FailureMetrics::new());
let disp = ClusterDispatcher::new(p)
.with_backend(tx)
.with_failure_metrics(metrics.clone());
let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
let pool_buf = crate::io::mbuf::MbufPool::default();
let mut buf = pool_buf.get();
buf.copy_from_slice(b"PING\r\n");
req.mbufs_mut().push_back(buf);
let (resp_tx, _resp_rx) = mpsc::channel(1);
let outcome = disp.dispatch(req, resp_tx);
assert!(matches!(outcome, DispatchOutcome::Error(_)));
let snap = metrics.snapshot();
assert_eq!(snap.backend_send_closed, 1);
assert_eq!(snap.backend_send_full, 0);
}
#[test]
fn two_peer_pool_with_one_down_records_per_key_no_targets() {
let cfg = crate::cluster::PoolConfig {
dc: "dc1".into(),
rack: "rA".into(),
read_consistency: ConsistencyLevel::DcQuorum,
write_consistency: ConsistencyLevel::DcQuorum,
..crate::cluster::PoolConfig::default()
};
let p0 = peer(0, "dc1", "rA", 2_147_483_648, true, true);
let mut p1 = peer(1, "dc1", "rA", 0, false, true);
p1.set_state(PeerState::Down, 0);
let pool_arc = ServerPool::new(cfg, vec![p0, p1]);
pool_arc.preselect_remote_racks();
let metrics = Arc::new(crate::stats::FailureMetrics::new());
let disp = ClusterDispatcher::new(Arc::new(pool_arc)).with_failure_metrics(metrics.clone());
let mut planned_no_targets = 0u64;
let mut planned_routable = 0u64;
for i in 0..100u32 {
let key = format!("k{i:03}");
let req = Msg::new(u64::from(i), MsgType::ReqRedisGet, true);
match disp.plan(&req, key.as_bytes()) {
DispatchPlan::NoTargets => planned_no_targets += 1,
DispatchPlan::Replicas { .. } | DispatchPlan::LocalDatastore => {
planned_routable += 1;
}
DispatchPlan::Drop => panic!("unexpected Drop in plan"),
}
}
assert!(planned_no_targets > 0, "expected some NoTargets dispatches");
assert!(planned_routable > 0, "expected some routable dispatches");
let snap = metrics.snapshot();
let counter_total: u64 = snap.no_targets.iter().map(|e| e.count).sum();
assert_eq!(
counter_total, planned_no_targets,
"dispatch_no_targets_total must match observed NoTargets count",
);
assert_eq!(snap.backend_send_full, 0);
assert_eq!(snap.backend_send_closed, 0);
assert!(snap.peer_send_full.is_empty());
assert!(snap.peer_send_closed.is_empty());
}
}