Skip to main content

dynomite/cluster/
dispatch.rs

1//! Cluster-aware [`Dispatcher`](crate::net::Dispatcher).
2//!
3//! Routes parsed [`Msg`]s based on the configured consistency level
4//! and the [`crate::cluster::pool::ServerPool`] topology:
5//!
6//! * `DC_ONE` reads pick the rack-local replica via the snitch.
7//! * `DC_ONE` writes fan out to every replica in the local DC.
8//! * `DC_QUORUM` / `DC_SAFE_QUORUM` reads fan out to every replica
9//!   in the local DC.
10//! * `DC_EACH_SAFE_QUORUM` writes fan out per-DC, walking the
11//!   per-DC racks via the preselected rack from
12//!   [`crate::cluster::pool::ServerPool::preselect_remote_racks`].
13//!
14//! The actual outbound delivery happens through the per-peer
15//! [`crate::net::ConnPool`]s; this module produces a
16//! [`DispatchPlan`] (the list of replica peers a request must be
17//! routed to) and exposes the planning logic so it can be tested
18//! independently of the runtime fan-out.
19//!
20//! # Examples
21//!
22//! ```
23//! use dynomite::cluster::dispatch::{ClusterDispatcher, DispatchPlan};
24//! use dynomite::cluster::pool::{PoolConfig, ServerPool};
25//! use dynomite::cluster::peer::{Peer, PeerEndpoint};
26//! use dynomite::hashkit::DynToken;
27//! use dynomite::msg::{Msg, MsgType};
28//! use std::sync::Arc;
29//!
30//! let cfg = PoolConfig {
31//!     dc: "d".into(), rack: "r".into(),
32//!     ..PoolConfig::default()
33//! };
34//! let local = Peer::new(
35//!     0, PeerEndpoint::tcp("h".into(), 1), "r".into(), "d".into(),
36//!     vec![DynToken::from_u32(0)], true, true, false,
37//! );
38//! let pool = Arc::new(ServerPool::new(cfg, vec![local]));
39//! let disp = ClusterDispatcher::new(pool);
40//! let req = Msg::new(1, MsgType::ReqRedisGet, true);
41//! let plan = disp.plan(&req, b"foo");
42//! assert!(matches!(plan, DispatchPlan::LocalDatastore));
43//! ```
44
45use std::sync::atomic::{AtomicU64, Ordering};
46use std::sync::Arc;
47
48use tokio::sync::mpsc;
49
50use crate::cluster::pool::ServerPool;
51use crate::cluster::snitch::{rack_distance, RackDistance};
52use crate::cluster::vnode;
53use crate::conf::HashType as ConfHashType;
54use crate::hashkit::{self, HashType};
55use crate::io::mbuf::MbufPool;
56use crate::msg::{ConsistencyLevel, Msg, MsgRouting, MsgType};
57use crate::net::dispatcher::{DispatchOutcome, Dispatcher, OutboundEnvelope, ServerSink};
58use crate::net::server::OutboundRequest;
59
60/// Process-global Prometheus-friendly counter of
61/// shadow-distribution disagreements. The dispatcher bumps this
62/// counter every time the configured `distribution` and the
63/// configured `distribution_shadow` choose different peers for
64/// the same key. Exposed for both the stats endpoint and the
65/// integration test that exercises shadow mode.
66///
67/// # Examples
68///
69/// ```
70/// use dynomite::cluster::dispatch::distribution_shadow_disagreement_total;
71/// let _seen = distribution_shadow_disagreement_total();
72/// ```
73#[must_use]
74pub fn distribution_shadow_disagreement_total() -> u64 {
75    SHADOW_DISAGREEMENTS.load(Ordering::Relaxed)
76}
77
78/// Reset the shadow-disagreement counter. Used by integration
79/// tests that need a clean baseline; never called from
80/// production code.
81///
82/// # Examples
83///
84/// ```
85/// use dynomite::cluster::dispatch::reset_distribution_shadow_disagreement_total;
86/// reset_distribution_shadow_disagreement_total();
87/// ```
88pub fn reset_distribution_shadow_disagreement_total() {
89    SHADOW_DISAGREEMENTS.store(0, Ordering::Relaxed);
90}
91
92static SHADOW_DISAGREEMENTS: AtomicU64 = AtomicU64::new(0);
93
94fn bump_shadow_disagreement() {
95    SHADOW_DISAGREEMENTS.fetch_add(1, Ordering::Relaxed);
96}
97
98/// Build the `dispatch.plan` info span and enter it. Returns the
99/// originating client request span (captured before the plan
100/// span was entered) plus the entered plan-span guard. Factored
101/// out so [`ClusterDispatcher::dispatch`] stays inside the
102/// project's per-function line budget.
103fn enter_plan_span(
104    req_id: u64,
105    plan: &DispatchPlan,
106) -> (tracing::Span, tracing::span::EnteredSpan) {
107    let req_span = tracing::Span::current();
108    let kind: &'static str = match plan {
109        DispatchPlan::Drop => "drop",
110        DispatchPlan::NoTargets => "no_targets",
111        DispatchPlan::LocalDatastore => "local_datastore",
112        DispatchPlan::Replicas { .. } => "replicas",
113    };
114    let targets = match plan {
115        DispatchPlan::Replicas { targets, .. } => targets.len(),
116        _ => 0,
117    };
118    let span = tracing::info_span!("dispatch.plan", req_id, plan = kind, targets,).entered();
119    (req_span, span)
120}
121
122fn map_hash(h: ConfHashType) -> HashType {
123    match h {
124        ConfHashType::OneAtATime => HashType::OneAtATime,
125        ConfHashType::Md5 => HashType::Md5,
126        ConfHashType::Crc16 => HashType::Crc16,
127        ConfHashType::Crc32 => HashType::Crc32,
128        ConfHashType::Crc32a => HashType::Crc32a,
129        ConfHashType::Fnv1_64 => HashType::Fnv1_64,
130        ConfHashType::Fnv1a64 => HashType::Fnv1a_64,
131        ConfHashType::Fnv1_32 => HashType::Fnv1_32,
132        ConfHashType::Fnv1a32 => HashType::Fnv1a_32,
133        ConfHashType::Hsieh => HashType::Hsieh,
134        ConfHashType::Murmur => HashType::Murmur,
135        ConfHashType::Jenkins => HashType::Jenkins,
136        ConfHashType::Murmur3 => HashType::Murmur3,
137        ConfHashType::Murmur3X64_64 => HashType::Murmur3X64_64,
138    }
139}
140
141/// One replica target produced by [`ClusterDispatcher::plan`].
142#[derive(Clone, Debug, Eq, PartialEq)]
143pub struct ReplicaTarget {
144    /// Index of the target peer in the pool's peer array.
145    pub peer_idx: u32,
146    /// Datacenter name.
147    pub dc: String,
148    /// Rack name.
149    pub rack: String,
150    /// True when the target is the local node.
151    pub is_local: bool,
152}
153
154/// Dispatch plan produced by the cluster dispatcher.
155///
156/// `LocalDatastore` is the early-return branch the reference
157/// engine takes when the routing tag is `ROUTING_LOCAL_NODE_ONLY`
158/// (or when the request is destined for the local node and the
159/// topology has only one peer); the per-connection driver then
160/// hands the request off to its server-side connection pool.
161#[derive(Clone, Debug, Eq, PartialEq)]
162pub enum DispatchPlan {
163    /// Hand the request straight to the local datastore.
164    LocalDatastore,
165    /// Forward to one or more peer replicas. The carried
166    /// consistency level is the one the planner resolved for
167    /// this request (after applying any bucket-type override),
168    /// so the dispatcher's reply coalescer does not have to
169    /// re-resolve it.
170    Replicas {
171        /// Replica peers the request must be routed to.
172        targets: Vec<ReplicaTarget>,
173        /// Resolved consistency level.
174        consistency: ConsistencyLevel,
175    },
176    /// Reply with an error: the cluster has no quorum-eligible
177    /// targets.
178    NoTargets,
179    /// Drop the request (`QUIT`-style swallow).
180    Drop,
181}
182
183/// Cluster-aware dispatcher.
184#[derive(Debug, Clone)]
185pub struct ClusterDispatcher {
186    pool: Arc<ServerPool>,
187    /// Outbound channel feeding the local datastore driver. When
188    /// `None`, `LocalDatastore` plans short-circuit to `Pending`
189    /// without forwarding (used by tests that do not need a real
190    /// backend). When set, requests for the local node are
191    /// encoded onto the wire and shipped to the [`crate::net::ServerConn`]
192    /// task that drives the redis / memcache backend.
193    backend: Option<mpsc::Sender<OutboundRequest>>,
194    /// Per-peer outbound channel for cross-DC fan-out. Keyed by
195    /// `Peer::idx`. When a `DispatchPlan::Replicas` plan names a
196    /// non-local peer, the dispatcher forwards via the matching
197    /// channel to a `DnodeServerConn` task. Peers without a
198    /// wired channel are skipped (`Pending`); when no replica
199    /// is reachable for the consistency level the dispatcher
200    /// falls back to a `DynomiteNoQuorumAchieved` error response.
201    peer_backends: std::collections::HashMap<u32, mpsc::Sender<OutboundRequest>>,
202    /// Mbuf pool used to render synthetic error payloads.
203    /// `MbufPool` already wraps an `Arc`, so cloning the
204    /// dispatcher (and the pool with it) shares the same free
205    /// list across every cluster handle.
206    mbuf_pool: MbufPool,
207    /// Optional node-local hint store. When set AND the pool's
208    /// `enable_hinted_handoff` flag is true, write requests
209    /// targeted at peers in [`crate::cluster::peer::PeerState::Down`]
210    /// (or at peers whose outbound channel is closed / full) are
211    /// recorded as hints and counted toward the consistency
212    /// threshold, instead of being silently skipped.
213    hint_store: Option<Arc<crate::cluster::hints::HintStore>>,
214    /// Optional failure-cause metrics handle. When wired, every
215    /// error-producing branch in the dispatcher increments the
216    /// matching counter via the [`crate::stats::FailureMetrics`]
217    /// accumulator. When `None`, the dispatcher's behaviour is
218    /// unchanged.
219    failure_metrics: Option<Arc<crate::stats::FailureMetrics>>,
220    /// Optional command-dispatch extension. When set, the
221    /// dispatcher offers FT.* / HSET requests to the extension
222    /// before the routing planner runs; the extension may
223    /// short-circuit with a synthesised reply
224    /// ([`crate::net::DispatchOutcome::Inline`]), reject the
225    /// request with a structured error
226    /// ([`crate::net::DispatchOutcome::Error`]), or fall
227    /// through to the standard storage path. When `None`, the
228    /// dispatcher's behaviour is unchanged. See
229    /// [`crate::embed::CommandExtension`] for the trait shape.
230    command_extension: Option<Arc<dyn crate::embed::CommandExtension>>,
231}
232
233impl ClusterDispatcher {
234    /// Wrap a [`ServerPool`] in a dispatcher.
235    ///
236    /// # Examples
237    ///
238    /// ```
239    /// # use dynomite::cluster::dispatch::ClusterDispatcher;
240    /// # use dynomite::cluster::pool::{PoolConfig, ServerPool};
241    /// # use dynomite::cluster::peer::{Peer, PeerEndpoint};
242    /// # use dynomite::hashkit::DynToken;
243    /// # use std::sync::Arc;
244    /// # let cfg = PoolConfig::default();
245    /// # let local = Peer::new(
246    /// #    0, PeerEndpoint::tcp("h".into(), 1), "r".into(), "d".into(),
247    /// #    vec![DynToken::from_u32(0)], true, true, false,
248    /// # );
249    /// let pool = Arc::new(ServerPool::new(cfg, vec![local]));
250    /// let disp = ClusterDispatcher::new(pool);
251    /// let _ = disp.pool();
252    /// ```
253    #[must_use]
254    pub fn new(pool: Arc<ServerPool>) -> Self {
255        Self {
256            pool,
257            backend: None,
258            peer_backends: std::collections::HashMap::new(),
259            mbuf_pool: MbufPool::default(),
260            hint_store: None,
261            failure_metrics: None,
262            command_extension: None,
263        }
264    }
265
266    /// Override the dispatcher's mbuf pool. Useful when the
267    /// embedding wants every synthetic error payload to come from
268    /// the same recycled buffers as the rest of the engine.
269    ///
270    /// # Examples
271    ///
272    /// ```
273    /// # use dynomite::cluster::dispatch::ClusterDispatcher;
274    /// # use dynomite::cluster::pool::{PoolConfig, ServerPool};
275    /// # use dynomite::cluster::peer::{Peer, PeerEndpoint};
276    /// # use dynomite::hashkit::DynToken;
277    /// # use dynomite::io::mbuf::MbufPool;
278    /// # use std::sync::Arc;
279    /// # let cfg = PoolConfig::default();
280    /// # let local = Peer::new(
281    /// #    0, PeerEndpoint::tcp("h".into(), 1), "r".into(), "d".into(),
282    /// #    vec![DynToken::from_u32(0)], true, true, false,
283    /// # );
284    /// let pool = Arc::new(ServerPool::new(cfg, vec![local]));
285    /// let _disp = ClusterDispatcher::new(pool).with_mbuf_pool(MbufPool::default());
286    /// ```
287    #[must_use]
288    pub fn with_mbuf_pool(mut self, pool: MbufPool) -> Self {
289        self.mbuf_pool = pool;
290        self
291    }
292
293    /// Borrow the dispatcher's mbuf pool. Exposed so embedders
294    /// can reuse the same pool when building synthetic responses
295    /// outside the dispatcher's own code paths.
296    #[must_use]
297    pub fn mbuf_pool(&self) -> &MbufPool {
298        &self.mbuf_pool
299    }
300
301    /// Attach a backend request channel. Calls to [`Self::dispatch`]
302    /// that produce a [`DispatchPlan::LocalDatastore`] plan will
303    /// forward the request bytes onto this channel for the local
304    /// datastore driver to write to the backend.
305    ///
306    /// The channel sender must be the request side of a
307    /// [`crate::net::ServerConn`] task; multiple senders cloned from
308    /// the same channel are fine.
309    #[must_use]
310    pub fn with_backend(mut self, backend: mpsc::Sender<OutboundRequest>) -> Self {
311        self.backend = Some(backend);
312        self
313    }
314
315    /// Attach an outbound channel for a single peer (by
316    /// `Peer::idx`). The supplied sender feeds a
317    /// [`crate::net::DnodeServerConn`] task that writes
318    /// dnode-framed requests to the peer's `dyn_listen` and
319    /// routes the response back through the per-request
320    /// responder channel.
321    ///
322    /// Wiring is additive: call this once per non-local peer.
323    /// Calling it again with the same `peer_idx` replaces the
324    /// previous sender (used by reconnect supervisors that
325    /// rebuild channels on restart).
326    #[must_use]
327    pub fn with_peer_backend(
328        mut self,
329        peer_idx: u32,
330        sender: mpsc::Sender<OutboundRequest>,
331    ) -> Self {
332        self.peer_backends.insert(peer_idx, sender);
333        self
334    }
335
336    /// Whether a backend channel is wired.
337    #[must_use]
338    pub fn has_backend(&self) -> bool {
339        self.backend.is_some()
340    }
341
342    /// Number of peer-backend channels wired.
343    #[must_use]
344    pub fn peer_backend_count(&self) -> usize {
345        self.peer_backends.len()
346    }
347
348    /// Borrow the underlying pool.
349    #[must_use]
350    pub fn pool(&self) -> &Arc<ServerPool> {
351        &self.pool
352    }
353
354    /// Attach a [`crate::cluster::hints::HintStore`].
355    ///
356    /// When set AND the pool's `enable_hinted_handoff` flag is
357    /// `true`, write requests targeted at peers in
358    /// [`crate::cluster::peer::PeerState::Down`] (or at peers
359    /// whose outbound channel is closed / full) are stored as
360    /// hints and counted toward the consistency threshold. The
361    /// background drainer task in `dynomited` is responsible
362    /// for shipping the hints back to the peer once it returns
363    /// to [`crate::cluster::peer::PeerState::Normal`]. Without
364    /// this builder call (or with `enable_hinted_handoff: false`)
365    /// the dispatcher behaviour is unchanged: a Down or
366    /// unreachable target is silently skipped.
367    ///
368    /// # Examples
369    ///
370    /// ```
371    /// # use std::sync::Arc;
372    /// # use dynomite::cluster::dispatch::ClusterDispatcher;
373    /// # use dynomite::cluster::hints::HintStore;
374    /// # use dynomite::cluster::peer::{Peer, PeerEndpoint};
375    /// # use dynomite::cluster::pool::{PoolConfig, ServerPool};
376    /// # use dynomite::hashkit::DynToken;
377    /// let cfg = PoolConfig { enable_hinted_handoff: true, ..PoolConfig::default() };
378    /// let local = Peer::new(
379    ///     0, PeerEndpoint::tcp("h".into(), 1), "r".into(), "d".into(),
380    ///     vec![DynToken::from_u32(0)], true, true, false,
381    /// );
382    /// let pool = Arc::new(ServerPool::new(cfg, vec![local]));
383    /// let store = Arc::new(HintStore::new(64 * 1024 * 1024));
384    /// let _disp = ClusterDispatcher::new(pool).with_hint_store(store);
385    /// ```
386    #[must_use]
387    pub fn with_hint_store(mut self, store: Arc<crate::cluster::hints::HintStore>) -> Self {
388        self.hint_store = Some(store);
389        self
390    }
391
392    /// Borrow the wired hint store, if any.
393    #[must_use]
394    pub fn hint_store(&self) -> Option<&Arc<crate::cluster::hints::HintStore>> {
395        self.hint_store.as_ref()
396    }
397
398    /// Attach a [`crate::stats::FailureMetrics`] handle.
399    ///
400    /// When wired, each error-producing branch in the
401    /// dispatcher (no-targets, peer-channel-full,
402    /// peer-channel-closed, backend-channel-full,
403    /// backend-channel-closed, response-timeout) increments
404    /// the matching counter so an operator can pull the
405    /// per-cause histogram off the `/stats` and `/metrics`
406    /// endpoints. The default behaviour is unchanged when no
407    /// metrics handle is supplied.
408    ///
409    /// # Examples
410    ///
411    /// ```
412    /// # use std::sync::Arc;
413    /// # use dynomite::cluster::dispatch::ClusterDispatcher;
414    /// # use dynomite::cluster::peer::{Peer, PeerEndpoint};
415    /// # use dynomite::cluster::pool::{PoolConfig, ServerPool};
416    /// # use dynomite::hashkit::DynToken;
417    /// # use dynomite::stats::FailureMetrics;
418    /// let cfg = PoolConfig::default();
419    /// let local = Peer::new(
420    ///     0, PeerEndpoint::tcp("h".into(), 1), "r".into(), "d".into(),
421    ///     vec![DynToken::from_u32(0)], true, true, false,
422    /// );
423    /// let pool = Arc::new(ServerPool::new(cfg, vec![local]));
424    /// let metrics = Arc::new(FailureMetrics::new());
425    /// let _disp = ClusterDispatcher::new(pool).with_failure_metrics(metrics);
426    /// ```
427    #[must_use]
428    pub fn with_failure_metrics(mut self, metrics: Arc<crate::stats::FailureMetrics>) -> Self {
429        self.failure_metrics = Some(metrics);
430        self
431    }
432
433    /// Borrow the wired failure-metrics handle, if any.
434    #[must_use]
435    pub fn failure_metrics(&self) -> Option<&Arc<crate::stats::FailureMetrics>> {
436        self.failure_metrics.as_ref()
437    }
438
439    /// Attach a [`crate::embed::CommandExtension`].
440    ///
441    /// When wired, parsed FT.* requests and HSET requests are
442    /// offered to the extension before the routing planner
443    /// runs. The extension may produce a synthesised RESP
444    /// reply (returned to the client as
445    /// [`crate::net::DispatchOutcome::Inline`]), surface a
446    /// structured `-ERR ...` reply
447    /// ([`crate::net::DispatchOutcome::Error`]), or fall
448    /// through. Without this builder call the dispatcher's
449    /// behaviour is unchanged: FT.* keywords are forwarded to
450    /// the local datastore (which typically rejects them with
451    /// `-ERR unknown command`) and HSETs proceed unmodified.
452    ///
453    /// The standard RediSearch implementation lives in the
454    /// `dynomite-search` crate; the trait itself is part of
455    /// the engine so embedders can plug their own.
456    ///
457    /// # Examples
458    ///
459    /// ```
460    /// # use std::sync::Arc;
461    /// # use dynomite::cluster::dispatch::ClusterDispatcher;
462    /// # use dynomite::cluster::peer::{Peer, PeerEndpoint};
463    /// # use dynomite::cluster::pool::{PoolConfig, ServerPool};
464    /// # use dynomite::embed::{CommandExtension, HsetOutcome};
465    /// # use dynomite::hashkit::DynToken;
466    /// # use dynomite::msg::MsgType;
467    /// #[derive(Debug)]
468    /// struct NoOp;
469    /// impl CommandExtension for NoOp {
470    ///     fn handles_msg_type(&self, _: MsgType) -> bool { false }
471    ///     fn try_dispatch(&self, _: &[&[u8]]) -> Option<Vec<u8>> { None }
472    /// }
473    /// let cfg = PoolConfig::default();
474    /// let local = Peer::new(
475    ///     0, PeerEndpoint::tcp("h".into(), 1), "r".into(), "d".into(),
476    ///     vec![DynToken::from_u32(0)], true, true, false,
477    /// );
478    /// let pool = Arc::new(ServerPool::new(cfg, vec![local]));
479    /// let _disp = ClusterDispatcher::new(pool).with_command_extension(Arc::new(NoOp));
480    /// ```
481    #[must_use]
482    pub fn with_command_extension(mut self, ext: Arc<dyn crate::embed::CommandExtension>) -> Self {
483        self.command_extension = Some(ext);
484        self
485    }
486
487    /// Borrow the wired command extension, if any.
488    #[must_use]
489    pub fn command_extension(&self) -> Option<&Arc<dyn crate::embed::CommandExtension>> {
490        self.command_extension.as_ref()
491    }
492
493    /// True when both the hint store is wired AND the pool has
494    /// `enable_hinted_handoff: true`. Hot-path predicate.
495    #[must_use]
496    pub fn hinted_handoff_active(&self) -> bool {
497        self.hint_store.is_some() && self.pool.config().enable_hinted_handoff
498    }
499
500    /// Compute the routing plan for `req` with the supplied key.
501    ///
502    /// `key` is the primary key of the request (the first key
503    /// returned by [`Msg::keys`] for parsed redis / memcache
504    /// commands, or an empty slice for argument-less commands).
505    ///
506    /// The function never panics; it consults the live peer table
507    /// behind the pool's `RwLock` and returns
508    /// [`DispatchPlan::NoTargets`] when the topology cannot
509    /// satisfy the request.
510    ///
511    /// # Examples
512    ///
513    /// See the module-level example.
514    #[must_use]
515    pub fn plan(&self, req: &Msg, key: &[u8]) -> DispatchPlan {
516        let cfg = self.pool.config();
517        let peers = self.pool.peers().read();
518        if peers.is_empty() {
519            self.record_no_targets_metric(cfg, ConsistencyLevel::default());
520            return DispatchPlan::NoTargets;
521        }
522        if matches!(req.routing(), MsgRouting::LocalNodeOnly) {
523            return DispatchPlan::LocalDatastore;
524        }
525        if key.is_empty() {
526            return DispatchPlan::LocalDatastore;
527        }
528        let token = hashkit::hash(map_hash(cfg.hash), key);
529        let key_hash64 = hashkit::hash64(map_hash(cfg.hash), key);
530        let bucket = crate::proto::redis::bucket_name(key);
531        let bucket_type = cfg.resolve_bucket_type(bucket);
532        let is_read = matches!(req.ty(), MsgType::Unknown) || req.flags().is_read;
533        let consistency = match (bucket_type, is_read) {
534            (Some(bt), true) => bt.read_consistency,
535            (Some(bt), false) => bt.write_consistency,
536            (None, true) => cfg.read_consistency,
537            (None, false) => cfg.write_consistency,
538        };
539        let n_val_cap = bucket_type.map_or(0, |bt| bt.n_val);
540        let dcs = self.pool.datacenters().read();
541        // When hinted handoff is active and the request is a
542        // write, peers in `Down` are kept in the routable set
543        // so the dispatcher can hint them at fan-out time. The
544        // hint counts toward the consistency threshold; the
545        // background drainer ships the hint once the peer
546        // returns to `Normal`. For reads (and for writes when
547        // handoff is off), Down peers are filtered out as
548        // before.
549        let include_down = self.hinted_handoff_active() && !is_read;
550        let routable = collect_routable(
551            &dcs,
552            &peers,
553            &token,
554            key_hash64,
555            cfg.distribution,
556            include_down,
557        );
558        if let Some(shadow) = cfg.distribution_shadow {
559            if shadow != cfg.distribution {
560                let shadow_routable =
561                    collect_routable(&dcs, &peers, &token, key_hash64, shadow, include_down);
562                if !plans_agree(&routable, &shadow_routable) {
563                    bump_shadow_disagreement();
564                    tracing::debug!(
565                        target: "dynomite::dispatch::shadow",
566                        live = cfg.distribution.as_str(),
567                        shadow = shadow.as_str(),
568                        "shadow distribution disagreed on key route"
569                    );
570                }
571            }
572        }
573        if routable.is_empty() {
574            self.record_no_targets_metric(cfg, consistency);
575            return DispatchPlan::NoTargets;
576        }
577        let (local, remote): (Vec<_>, Vec<_>) = routable
578            .into_iter()
579            .partition(|(dc_idx, _, _)| dcs[*dc_idx].name() == cfg.dc);
580        let plan =
581            plan_with_consistency(cfg, &dcs, &peers, consistency, req.routing(), local, remote);
582        let plan = cap_replicas(plan, n_val_cap);
583        if matches!(plan, DispatchPlan::NoTargets) {
584            self.record_no_targets_metric(cfg, consistency);
585        }
586        plan
587    }
588
589    /// Record a `dispatch_no_targets_total` metric tick using
590    /// the local-DC labels, when a metrics handle is wired.
591    fn record_no_targets_metric(
592        &self,
593        cfg: &crate::cluster::pool::PoolConfig,
594        consistency: ConsistencyLevel,
595    ) {
596        if let Some(m) = self.failure_metrics.as_ref() {
597            m.record_no_targets(&cfg.dc, &cfg.rack, consistency);
598        }
599    }
600
601    /// Resolve the destination DC of a peer (for per-peer
602    /// failure metrics). Local peers and unknown indexes both
603    /// fall back to the configured local DC.
604    fn peer_dc_label(&self, peer_idx: u32) -> String {
605        let peers = self.pool.peers().read();
606        peers
607            .get(peer_idx as usize)
608            .map_or_else(|| self.pool.config().dc.clone(), |p| p.dc().to_string())
609    }
610}
611
612/// Apply the bucket-type `n_val` fan-out cap to a freshly
613/// computed plan. Only `DispatchPlan::Replicas` is affected; the
614/// other variants pass through unchanged. `cap == 0` means "no
615/// cap" and is the no-op used for keys without a matching bucket
616/// type.
617fn cap_replicas(plan: DispatchPlan, cap: u8) -> DispatchPlan {
618    if cap == 0 {
619        return plan;
620    }
621    let cap = cap as usize;
622    match plan {
623        DispatchPlan::Replicas {
624            mut targets,
625            consistency,
626        } if targets.len() > cap => {
627            targets.truncate(cap);
628            DispatchPlan::Replicas {
629                targets,
630                consistency,
631            }
632        }
633        other => other,
634    }
635}
636
637fn plans_agree(a: &[(usize, usize, u32)], b: &[(usize, usize, u32)]) -> bool {
638    if a.len() != b.len() {
639        return false;
640    }
641    let mut a_idx: Vec<u32> = a.iter().map(|t| t.2).collect();
642    let mut b_idx: Vec<u32> = b.iter().map(|t| t.2).collect();
643    a_idx.sort_unstable();
644    b_idx.sort_unstable();
645    a_idx == b_idx
646}
647
648fn collect_routable(
649    dcs: &[crate::cluster::Datacenter],
650    peers: &[crate::cluster::peer::Peer],
651    token: &crate::hashkit::DynToken,
652    hash64: u64,
653    distribution: crate::conf::Distribution,
654    include_down: bool,
655) -> Vec<(usize, usize, u32)> {
656    let mut routable: Vec<(usize, usize, u32)> = Vec::new();
657    for (dc_idx, dc) in dcs.iter().enumerate() {
658        for (rack_idx, rack) in dc.racks().iter().enumerate() {
659            let candidate = match (distribution, rack.random_slices()) {
660                (crate::conf::Distribution::RandomSlicing, Some(slices)) => {
661                    // Map the chosen claimant name back onto a
662                    // peer index. The slice table holds peer
663                    // pname strings (host:port) so the
664                    // resolution is a linear scan over the
665                    // rack's peer set, which is small (peers
666                    // per rack, not the whole pool).
667                    slices.claimant_for(hash64).and_then(|name| {
668                        peers.iter().find_map(|p| {
669                            if p.dc() == dc.name()
670                                && p.rack() == rack.name()
671                                && p.endpoint().pname() == name
672                            {
673                                Some(p.idx())
674                            } else {
675                                None
676                            }
677                        })
678                    })
679                }
680                _ => vnode::dispatch(rack.continuums(), token),
681            };
682            if let Some(peer_idx) = candidate {
683                if let Some(peer) = peers.get(peer_idx as usize) {
684                    let state = peer.state();
685                    let accept = state.is_routable()
686                        || (include_down && matches!(state, crate::cluster::peer::PeerState::Down));
687                    if accept {
688                        routable.push((dc_idx, rack_idx, peer_idx));
689                    }
690                }
691            }
692        }
693    }
694    routable
695}
696
697fn build_target(
698    dcs: &[crate::cluster::Datacenter],
699    peers: &[crate::cluster::peer::Peer],
700    dc_idx: usize,
701    rack_idx: usize,
702    peer_idx: u32,
703) -> ReplicaTarget {
704    let dc_name = dcs[dc_idx].name().to_string();
705    let rack_name = dcs[dc_idx].racks()[rack_idx].name().to_string();
706    let is_local = peers
707        .get(peer_idx as usize)
708        .is_some_and(crate::cluster::peer::Peer::is_local);
709    ReplicaTarget {
710        peer_idx,
711        dc: dc_name,
712        rack: rack_name,
713        is_local,
714    }
715}
716
717fn plan_with_consistency(
718    cfg: &crate::cluster::pool::PoolConfig,
719    dcs: &[crate::cluster::Datacenter],
720    peers: &[crate::cluster::peer::Peer],
721    consistency: ConsistencyLevel,
722    routing: MsgRouting,
723    local: Vec<(usize, usize, u32)>,
724    remote: Vec<(usize, usize, u32)>,
725) -> DispatchPlan {
726    let want_per_dc_fanout = matches!(consistency, ConsistencyLevel::DcEachSafeQuorum)
727        || matches!(routing, MsgRouting::AllNodesAllRacksAllDcs);
728    let mut targets: Vec<ReplicaTarget> = Vec::new();
729    match consistency {
730        ConsistencyLevel::DcOne => {
731            if local.is_empty() {
732                return DispatchPlan::NoTargets;
733            }
734            let mut best: Option<(RackDistance, (usize, usize, u32))> = None;
735            for (dc_idx, rack_idx, peer_idx) in local {
736                let rack_name = dcs[dc_idx].racks()[rack_idx].name();
737                let d = rack_distance(&cfg.dc, &cfg.rack, &cfg.dc, rack_name);
738                let take = match best {
739                    None => true,
740                    Some((bd, _)) => d.cost() < bd.cost(),
741                };
742                if take {
743                    best = Some((d, (dc_idx, rack_idx, peer_idx)));
744                }
745            }
746            if let Some((_, (dc_idx, rack_idx, peer_idx))) = best {
747                let is_local_node = peers
748                    .get(peer_idx as usize)
749                    .is_some_and(crate::cluster::peer::Peer::is_local);
750                if is_local_node {
751                    return DispatchPlan::LocalDatastore;
752                }
753                targets.push(build_target(dcs, peers, dc_idx, rack_idx, peer_idx));
754            }
755        }
756        ConsistencyLevel::DcQuorum | ConsistencyLevel::DcSafeQuorum => {
757            if local.is_empty() {
758                return DispatchPlan::NoTargets;
759            }
760            for (dc_idx, rack_idx, peer_idx) in local {
761                targets.push(build_target(dcs, peers, dc_idx, rack_idx, peer_idx));
762            }
763        }
764        ConsistencyLevel::DcEachSafeQuorum => {
765            if local.is_empty() && remote.is_empty() {
766                return DispatchPlan::NoTargets;
767            }
768            for (dc_idx, rack_idx, peer_idx) in local.iter().chain(remote.iter()) {
769                targets.push(build_target(dcs, peers, *dc_idx, *rack_idx, *peer_idx));
770            }
771        }
772    }
773    if want_per_dc_fanout && !remote.is_empty() {
774        for (dc_idx, rack_idx, peer_idx) in remote {
775            if !targets.iter().any(|t| t.peer_idx == peer_idx) {
776                targets.push(build_target(dcs, peers, dc_idx, rack_idx, peer_idx));
777            }
778        }
779    }
780    if targets.is_empty() {
781        return DispatchPlan::LocalDatastore;
782    }
783    DispatchPlan::Replicas {
784        targets,
785        consistency,
786    }
787}
788
789impl Dispatcher for ClusterDispatcher {
790    #[allow(
791        clippy::too_many_lines,
792        reason = "single dispatch fn must enumerate every plan; splitting hides the planner-to-effect mapping"
793    )]
794    fn dispatch(&self, req: Msg, responder: ServerSink) -> DispatchOutcome {
795        if req.flags().quit {
796            return DispatchOutcome::Drop;
797        }
798        // FT.* / HSET interception. Runs before the routing
799        // planner so vector-index commands never visit the
800        // backend, and HSETs against an indexed prefix get the
801        // vector mirrored into the in-process registry before
802        // the standard storage write fans out.
803        if let Some(ext) = self.command_extension.as_ref() {
804            if let Some(outcome) = self.intercept_command(ext.as_ref(), &req) {
805                return outcome;
806            }
807        }
808        // Inspect the request without consuming it: pull the routing
809        // bytes from the first parsed key. `KeyPos::tag_bytes` returns
810        // the hash-tag-aware sub-range when one was parsed and the full
811        // key otherwise, which is the slice shape `plan` expects.
812        // Requests with no parsed keys (e.g. PING, INFO) fall through
813        // with an empty slice; `plan` handles that by routing to the
814        // local datastore.
815        let key: Vec<u8> = req
816            .keys()
817            .first()
818            .map(|kp| kp.tag_bytes().to_vec())
819            .unwrap_or_default();
820        let plan = self.plan(&req, &key);
821        let (req_span, _plan_span) = enter_plan_span(req.id(), &plan);
822        match plan {
823            DispatchPlan::Drop => DispatchOutcome::Drop,
824            DispatchPlan::NoTargets => {
825                let err_type = if matches!(req.ty(), MsgType::ReqRedisGet | MsgType::ReqRedisSet) {
826                    MsgType::RspRedisError
827                } else {
828                    MsgType::RspMcServerError
829                };
830                let rsp = crate::msg::response::make_error(
831                    &req,
832                    err_type,
833                    0,
834                    crate::msg::DynErrorCode::DynomiteNoQuorumAchieved,
835                    &self.mbuf_pool,
836                );
837                DispatchOutcome::Error(rsp)
838            }
839            DispatchPlan::LocalDatastore => {
840                if let Some(tx) = self.backend.as_ref() {
841                    // Snapshot the wire bytes from the parsed mbuf
842                    // chain. The chain is the original on-the-wire
843                    // sequence the parser walked, so this is a
844                    // faithful relay rather than a re-encode.
845                    let bytes: Vec<u8> = req
846                        .mbufs()
847                        .iter()
848                        .flat_map(|b| b.readable().to_vec())
849                        .collect();
850                    if bytes.is_empty() {
851                        // Parsed request with no replayable bytes
852                        // (e.g. a synthetic `Msg`) - drop rather
853                        // than enqueue a no-op on the backend.
854                        return DispatchOutcome::Drop;
855                    }
856                    let env = OutboundRequest {
857                        bytes,
858                        req_id: req.id(),
859                        responder,
860                        span: req_span.clone(),
861                        ty: crate::proto::dnode::DmsgType::Req,
862                        target_peer_idx: None,
863                    };
864                    if let Err(err) = tx.try_send(env) {
865                        // Backend channel full or closed: surface
866                        // an error to the client immediately.
867                        if let Some(m) = self.failure_metrics.as_ref() {
868                            match err {
869                                tokio::sync::mpsc::error::TrySendError::Full(_) => {
870                                    m.record_backend_send_full();
871                                }
872                                tokio::sync::mpsc::error::TrySendError::Closed(_) => {
873                                    m.record_backend_send_closed();
874                                }
875                            }
876                        }
877                        let err_type =
878                            if matches!(req.ty(), MsgType::ReqRedisGet | MsgType::ReqRedisSet) {
879                                MsgType::RspRedisError
880                            } else {
881                                MsgType::RspMcServerError
882                            };
883                        let rsp = crate::msg::response::make_error(
884                            &req,
885                            err_type,
886                            0,
887                            crate::msg::DynErrorCode::DynomiteNoQuorumAchieved,
888                            &self.mbuf_pool,
889                        );
890                        return DispatchOutcome::Error(rsp);
891                    }
892                }
893                DispatchOutcome::Pending
894            }
895            DispatchPlan::Replicas {
896                targets,
897                consistency,
898            } => self.dispatch_replicas(&req, &req_span, &targets, consistency, responder),
899        }
900    }
901}
902
903impl ClusterDispatcher {
904    /// Fan a request out across replicas and install the per-
905    /// request reply coalescer.
906    ///
907    /// The single-target case short-circuits to a direct
908    /// forward (no coalescer needed). The multi-target case
909    /// spawns a coalescer task on the ambient tokio runtime; the
910    /// task drains the per-target replies, picks one according
911    /// to the consistency level, and forwards the chosen reply
912    /// to the original `responder`. Divergent replicas are
913    /// scheduled for read-repair writes via the same
914    /// `peer_backends` channels.
915    ///
916    /// When [`hinted_handoff_active`](Self::hinted_handoff_active)
917    /// reports true and the request is a write, targets that
918    /// are currently in [`crate::cluster::peer::PeerState::Down`]
919    /// are recorded in the hint store instead of being sent.
920    /// A synthetic `+OK\r\n` reply is fed to the coalescer on
921    /// the hinted target's behalf so the consistency threshold
922    /// can be met by the surviving replicas plus the hint(s).
923    /// On `try_send` failure (channel closed or full) for a
924    /// non-Down peer, the dispatcher likewise falls back to
925    /// hinting before declaring the target lost.
926    fn dispatch_replicas(
927        &self,
928        req: &Msg,
929        req_span: &tracing::Span,
930        targets: &[ReplicaTarget],
931        consistency: ConsistencyLevel,
932        responder: ServerSink,
933    ) -> DispatchOutcome {
934        if targets.is_empty() {
935            return DispatchOutcome::Drop;
936        }
937        // Snapshot the wire bytes once. Each target gets its
938        // own clone (the ServerConn / DnodeServerConn takes
939        // ownership of `bytes`).
940        let bytes: Vec<u8> = req
941            .mbufs()
942            .iter()
943            .flat_map(|b| b.readable().to_vec())
944            .collect();
945        if bytes.is_empty() {
946            return DispatchOutcome::Drop;
947        }
948        // Snapshot the per-target current state so we do not
949        // re-acquire the peer-table lock inside the per-target
950        // loop. Local peers are always treated as Normal.
951        let peer_states = self.snapshot_peer_states(targets);
952        let is_read = matches!(req.ty(), MsgType::Unknown) || req.flags().is_read;
953        let is_write = !is_read;
954        let handoff_active = self.hinted_handoff_active() && is_write;
955        // Single-target path: no coalescing needed; forward
956        // directly to the original responder.
957        if targets.len() == 1 {
958            return self.dispatch_replicas_direct(
959                req,
960                req_span,
961                targets,
962                &bytes,
963                &responder,
964                &HandoffCtx {
965                    handoff_active,
966                    peer_states: &peer_states,
967                },
968            );
969        }
970        // Multi-target path: install the coalescer.
971        let cfg = self.pool.config();
972        let local_dc = cfg.dc.clone();
973        // Channel each replica's reply lands on. Sized to
974        // `targets.len() + 1`: every target produces at most one
975        // envelope (real or hint-synthesised) and we leave a
976        // spare so a late repair reply never blocks the actor.
977        let (intermediate_tx, intermediate_rx) =
978            mpsc::channel::<OutboundEnvelope>(targets.len() + 1);
979        // Build the tracker's target list and capture the
980        // per-target dispatch state.
981        let target_pairs: Vec<(u32, String)> =
982            targets.iter().map(|t| (t.peer_idx, t.dc.clone())).collect();
983        // Read repair context: the original primary key (single-
984        // key requests only) and the request type.
985        let repair_key: Option<Vec<u8>> = req
986            .keys()
987            .first()
988            .map(|kp| kp.tag_bytes().to_vec())
989            .filter(|k| !k.is_empty());
990        let repair_ctx = repair_key.map(|key| ReadRepairContext {
991            req_id: req.id(),
992            req_ty: req.ty(),
993            key,
994            mbuf_pool: self.mbuf_pool.clone(),
995            peer_backends: self.peer_backends.clone(),
996            local_backend: self.backend.clone(),
997            target_is_local: targets.iter().map(|t| (t.peer_idx, t.is_local)).collect(),
998        });
999        // Fan out: each per-target outbound feeds the coalescer
1000        // channel, NOT the client's responder.
1001        let mut sent = 0usize;
1002        let mut hinted = 0usize;
1003        for target in targets {
1004            let action = Self::choose_target_action(target, handoff_active, &peer_states);
1005            match action {
1006                TargetAction::Send => {
1007                    if self.fanout_send(target, req, req_span, &bytes, &intermediate_tx) {
1008                        sent += 1;
1009                    } else if handoff_active
1010                        && self.hint_target(target, &bytes, req, req_span, &intermediate_tx)
1011                    {
1012                        hinted += 1;
1013                    }
1014                }
1015                TargetAction::Hint => {
1016                    if self.hint_target(target, &bytes, req, req_span, &intermediate_tx) {
1017                        hinted += 1;
1018                    }
1019                }
1020            }
1021        }
1022        // Drop the local clone of the intermediate sender so the
1023        // coalescer task observes RX close once every per-target
1024        // sender has been dropped. (`OutboundRequest` owns one
1025        // sender each; once they finish they drop it.)
1026        drop(intermediate_tx);
1027        if sent + hinted == 0 {
1028            return DispatchOutcome::Error(self.no_quorum_error(req));
1029        }
1030        let req_id = req.id();
1031        let req_ty = req.ty();
1032        let mbuf_pool = self.mbuf_pool.clone();
1033        let failure_metrics = self.failure_metrics.clone();
1034        tokio::spawn(coalesce_actor(
1035            req_id,
1036            req_ty,
1037            consistency,
1038            target_pairs,
1039            local_dc,
1040            intermediate_rx,
1041            responder,
1042            mbuf_pool,
1043            repair_ctx,
1044            failure_metrics,
1045        ));
1046        DispatchOutcome::Pending
1047    }
1048
1049    /// Capture the current `PeerState` for each target so the
1050    /// per-target loop does not re-acquire the read lock on
1051    /// every iteration. Local-node targets are reported as
1052    /// `Normal` regardless of the on-disk peer entry.
1053    fn snapshot_peer_states(
1054        &self,
1055        targets: &[ReplicaTarget],
1056    ) -> std::collections::HashMap<u32, crate::cluster::peer::PeerState> {
1057        use crate::cluster::peer::PeerState;
1058        let peers = self.pool.peers().read();
1059        let mut out = std::collections::HashMap::with_capacity(targets.len());
1060        for t in targets {
1061            let state = if t.is_local {
1062                PeerState::Normal
1063            } else {
1064                peers
1065                    .get(t.peer_idx as usize)
1066                    .map_or(PeerState::Unknown, crate::cluster::peer::Peer::state)
1067            };
1068            out.insert(t.peer_idx, state);
1069        }
1070        out
1071    }
1072
1073    fn choose_target_action(
1074        target: &ReplicaTarget,
1075        handoff_active: bool,
1076        peer_states: &std::collections::HashMap<u32, crate::cluster::peer::PeerState>,
1077    ) -> TargetAction {
1078        use crate::cluster::peer::PeerState;
1079        if !handoff_active {
1080            return TargetAction::Send;
1081        }
1082        let state = peer_states
1083            .get(&target.peer_idx)
1084            .copied()
1085            .unwrap_or(PeerState::Unknown);
1086        match state {
1087            PeerState::Down => TargetAction::Hint,
1088            _ => TargetAction::Send,
1089        }
1090    }
1091
1092    /// Forward one target via its outbound channel. Returns
1093    /// `true` on a successful `try_send`.
1094    fn fanout_send(
1095        &self,
1096        target: &ReplicaTarget,
1097        req: &Msg,
1098        req_span: &tracing::Span,
1099        bytes: &[u8],
1100        intermediate_tx: &mpsc::Sender<OutboundEnvelope>,
1101    ) -> bool {
1102        let env = OutboundRequest {
1103            bytes: bytes.to_vec(),
1104            req_id: req.id(),
1105            responder: intermediate_tx.clone(),
1106            span: req_span.clone(),
1107            ty: crate::proto::dnode::DmsgType::Req,
1108            target_peer_idx: Some(target.peer_idx),
1109        };
1110        let send_result = if target.is_local {
1111            self.backend.as_ref().map(|tx| tx.try_send(env))
1112        } else {
1113            self.peer_backends
1114                .get(&target.peer_idx)
1115                .map(|tx| tx.try_send(env))
1116        };
1117        match send_result {
1118            Some(Ok(())) => true,
1119            Some(Err(err)) => {
1120                self.observe_send_error(target, &err);
1121                false
1122            }
1123            None => false,
1124        }
1125    }
1126
1127    /// Convert a `tokio::sync::mpsc::error::TrySendError` into a
1128    /// failure-metrics observation. Local targets bump the
1129    /// backend counters; peer targets bump the per-peer
1130    /// counters labelled with the peer's DC.
1131    fn observe_send_error(
1132        &self,
1133        target: &ReplicaTarget,
1134        err: &tokio::sync::mpsc::error::TrySendError<OutboundRequest>,
1135    ) {
1136        let Some(m) = self.failure_metrics.as_ref() else {
1137            return;
1138        };
1139        if target.is_local {
1140            match err {
1141                tokio::sync::mpsc::error::TrySendError::Full(_) => m.record_backend_send_full(),
1142                tokio::sync::mpsc::error::TrySendError::Closed(_) => {
1143                    m.record_backend_send_closed();
1144                }
1145            }
1146        } else {
1147            let peer_dc = self.peer_dc_label(target.peer_idx);
1148            match err {
1149                tokio::sync::mpsc::error::TrySendError::Full(_) => {
1150                    m.record_peer_send_full(target.peer_idx, &peer_dc);
1151                }
1152                tokio::sync::mpsc::error::TrySendError::Closed(_) => {
1153                    m.record_peer_send_closed(target.peer_idx, &peer_dc);
1154                }
1155            }
1156        }
1157    }
1158
1159    /// Record `bytes` as a hint for `target`'s peer and feed the
1160    /// coalescer a synthetic success reply. Returns `true` when
1161    /// both the enqueue and the synth-push succeeded.
1162    fn hint_target(
1163        &self,
1164        target: &ReplicaTarget,
1165        bytes: &[u8],
1166        req: &Msg,
1167        req_span: &tracing::Span,
1168        intermediate_tx: &mpsc::Sender<OutboundEnvelope>,
1169    ) -> bool {
1170        let Some(store) = self.hint_store.as_ref() else {
1171            return false;
1172        };
1173        let cfg = self.pool.config();
1174        let ttl = std::time::Duration::from_secs(cfg.hint_ttl_seconds.max(1));
1175        match store.enqueue(target.peer_idx, bytes.to_vec(), ttl) {
1176            Ok(()) => {}
1177            Err(e) => {
1178                tracing::debug!(
1179                    target: "dynomite::hints",
1180                    peer_idx = target.peer_idx,
1181                    error = %e,
1182                    "hint enqueue failed"
1183                );
1184                return false;
1185            }
1186        }
1187        let synth = synth_hint_reply(req, &self.mbuf_pool);
1188        let env = OutboundEnvelope {
1189            req_id: req.id(),
1190            rsp: synth,
1191            span: req_span.clone(),
1192            source_peer_idx: Some(target.peer_idx),
1193        };
1194        if intermediate_tx.try_send(env).is_err() {
1195            // The channel is sized for one envelope per target;
1196            // an immediate full means the coalescer task has
1197            // exited. Still report success so the hint sits in
1198            // the store for the drainer.
1199            tracing::debug!(
1200                target: "dynomite::hints",
1201                peer_idx = target.peer_idx,
1202                "hint synth-reply could not be queued; coalescer absent"
1203            );
1204        }
1205        tracing::debug!(
1206            target: "dynomite::hints",
1207            peer_idx = target.peer_idx,
1208            bytes = bytes.len(),
1209            "stored hint for down peer"
1210        );
1211        true
1212    }
1213
1214    fn dispatch_replicas_direct(
1215        &self,
1216        req: &Msg,
1217        req_span: &tracing::Span,
1218        targets: &[ReplicaTarget],
1219        bytes: &[u8],
1220        responder: &ServerSink,
1221        ctx: &HandoffCtx<'_>,
1222    ) -> DispatchOutcome {
1223        debug_assert_eq!(targets.len(), 1);
1224        let target = &targets[0];
1225        // If the target is Down and handoff is active, hint it
1226        // and feed the responder a synth `+OK\r\n` so the
1227        // client sees the write as having succeeded.
1228        if let TargetAction::Hint =
1229            Self::choose_target_action(target, ctx.handoff_active, ctx.peer_states)
1230        {
1231            if self.hint_single_target_direct(target, bytes, req, req_span, responder) {
1232                return DispatchOutcome::Pending;
1233            }
1234            return DispatchOutcome::Error(self.no_quorum_error(req));
1235        }
1236        let env = OutboundRequest {
1237            bytes: bytes.to_vec(),
1238            req_id: req.id(),
1239            responder: responder.clone(),
1240            span: req_span.clone(),
1241            ty: crate::proto::dnode::DmsgType::Req,
1242            target_peer_idx: Some(target.peer_idx),
1243        };
1244        let send_result = if target.is_local {
1245            self.backend.as_ref().map(|tx| tx.try_send(env))
1246        } else {
1247            self.peer_backends
1248                .get(&target.peer_idx)
1249                .map(|tx| tx.try_send(env))
1250        };
1251        let sent = match send_result {
1252            Some(Ok(())) => true,
1253            Some(Err(ref err)) => {
1254                self.observe_send_error(target, err);
1255                false
1256            }
1257            None => false,
1258        };
1259        if sent {
1260            return DispatchOutcome::Pending;
1261        }
1262        if ctx.handoff_active
1263            && self.hint_single_target_direct(target, bytes, req, req_span, responder)
1264        {
1265            return DispatchOutcome::Pending;
1266        }
1267        DispatchOutcome::Error(self.no_quorum_error(req))
1268    }
1269
1270    /// Single-target hint path. Records the hint in the store
1271    /// and pushes a `+OK\r\n` envelope onto the responder. The
1272    /// client sees the write as having succeeded; the drainer
1273    /// will replay the request to the peer when it returns.
1274    fn hint_single_target_direct(
1275        &self,
1276        target: &ReplicaTarget,
1277        bytes: &[u8],
1278        req: &Msg,
1279        req_span: &tracing::Span,
1280        responder: &ServerSink,
1281    ) -> bool {
1282        let Some(store) = self.hint_store.as_ref() else {
1283            return false;
1284        };
1285        let cfg = self.pool.config();
1286        let ttl = std::time::Duration::from_secs(cfg.hint_ttl_seconds.max(1));
1287        if let Err(e) = store.enqueue(target.peer_idx, bytes.to_vec(), ttl) {
1288            tracing::debug!(
1289                target: "dynomite::hints",
1290                peer_idx = target.peer_idx,
1291                error = %e,
1292                "hint enqueue failed (single-target)"
1293            );
1294            return false;
1295        }
1296        let synth = synth_hint_reply(req, &self.mbuf_pool);
1297        let env = OutboundEnvelope {
1298            req_id: req.id(),
1299            rsp: synth,
1300            span: req_span.clone(),
1301            source_peer_idx: Some(target.peer_idx),
1302        };
1303        let _ = responder.try_send(env);
1304        true
1305    }
1306
1307    fn no_quorum_error(&self, req: &Msg) -> Msg {
1308        let err_type = if matches!(req.ty(), MsgType::ReqRedisGet | MsgType::ReqRedisSet) {
1309            MsgType::RspRedisError
1310        } else {
1311            MsgType::RspMcServerError
1312        };
1313        crate::msg::response::make_error(
1314            req,
1315            err_type,
1316            0,
1317            crate::msg::DynErrorCode::DynomiteNoQuorumAchieved,
1318            &self.mbuf_pool,
1319        )
1320    }
1321
1322    /// FT.* / HSET interception. Returns `Some(outcome)` when the
1323    /// command was fully handled (FT.* keyword, or an HSET that
1324    /// references an indexed prefix with a malformed payload);
1325    /// returns `None` to let the caller fall through to the
1326    /// standard dispatch path. The HSET success path returns
1327    /// `None` after the extension records the side-effect so
1328    /// the standard storage write still goes to the backend.
1329    fn intercept_command(
1330        &self,
1331        ext: &dyn crate::embed::CommandExtension,
1332        req: &Msg,
1333    ) -> Option<DispatchOutcome> {
1334        if ext.handles_msg_type(req.ty()) {
1335            return Some(self.run_extension_command(ext, req));
1336        }
1337        if matches!(req.ty(), MsgType::ReqRedisHset) {
1338            return self.intercept_extension_hset(ext, req);
1339        }
1340        None
1341    }
1342
1343    /// Drive an FT.* command through the registered extension
1344    /// and wrap the RESP bytes in a [`DispatchOutcome::Inline`].
1345    /// When the extension declines (`try_dispatch` returns
1346    /// `None`) the outcome is rendered as a `-ERR not
1347    /// supported in this build` reply so the dispatcher does
1348    /// not silently drop the request.
1349    fn run_extension_command(
1350        &self,
1351        ext: &dyn crate::embed::CommandExtension,
1352        req: &Msg,
1353    ) -> DispatchOutcome {
1354        // For typed FT.* variants the keyword is unambiguous; for
1355        // [`MsgType::ReqRedisFtUnknown`] we recover the original
1356        // wire keyword from the request's mbuf chain (the parser
1357        // case-folds for table lookup but the wire bytes are
1358        // preserved verbatim) so the extension can decide
1359        // whether we recognise the keyword and either execute
1360        // it or surface a structured `-ERR ...` reply.
1361        let recovered_kw: Vec<u8>;
1362        let keyword: &[u8] = match req.ty() {
1363            MsgType::ReqRedisFtCreate => b"FT.CREATE",
1364            MsgType::ReqRedisFtSearch => b"FT.SEARCH",
1365            MsgType::ReqRedisFtInfo => b"FT.INFO",
1366            MsgType::ReqRedisFtList => b"FT.LIST",
1367            MsgType::ReqRedisFtDropindex => b"FT.DROPINDEX",
1368            MsgType::ReqRedisFtRegex => b"FT.REGEX",
1369            MsgType::ReqRedisFtSugadd => b"FT.SUGADD",
1370            MsgType::ReqRedisFtSugget => b"FT.SUGGET",
1371            MsgType::ReqRedisFtSugdel => b"FT.SUGDEL",
1372            MsgType::ReqRedisFtSuglen => b"FT.SUGLEN",
1373            MsgType::ReqRedisFtUnknown => {
1374                recovered_kw = first_bulk_token(req).unwrap_or_else(|| b"FT.UNKNOWN".to_vec());
1375                recovered_kw.as_slice()
1376            }
1377            // The dispatcher only enters this branch from the
1378            // FT.* arm in [`Self::intercept_command`], so the
1379            // catch-all is unreachable unless the MsgType set
1380            // drifts out of sync with that arm.
1381            _ => return DispatchOutcome::Drop,
1382        };
1383        let mut args: Vec<&[u8]> = Vec::with_capacity(1 + req.keys().len() + req.args().len());
1384        args.push(keyword);
1385        for k in req.keys() {
1386            args.push(k.key());
1387        }
1388        for a in req.args() {
1389            args.push(a.bytes());
1390        }
1391        let bytes = ext.try_dispatch(&args).unwrap_or_else(|| {
1392            let kw = String::from_utf8_lossy(keyword);
1393            format!("-ERR not supported in this build: {kw}\r\n").into_bytes()
1394        });
1395        DispatchOutcome::Inline(synthetic_redis_reply(req, &self.mbuf_pool, &bytes))
1396    }
1397
1398    /// HSET interception via the registered extension.
1399    /// Returns `Some(Error(...))` when the extension reports a
1400    /// malformed payload against a registered prefix; returns
1401    /// `None` (so the dispatcher falls through to the backend)
1402    /// on success or when no registered prefix matches.
1403    fn intercept_extension_hset(
1404        &self,
1405        ext: &dyn crate::embed::CommandExtension,
1406        req: &Msg,
1407    ) -> Option<DispatchOutcome> {
1408        let mut args: Vec<&[u8]> = Vec::with_capacity(req.keys().len() + req.args().len());
1409        for k in req.keys() {
1410            args.push(k.key());
1411        }
1412        for a in req.args() {
1413            args.push(a.bytes());
1414        }
1415        match ext.try_intercept_hset(&args) {
1416            crate::embed::HsetOutcome::Absorbed | crate::embed::HsetOutcome::NotIndexed => None,
1417            crate::embed::HsetOutcome::Error(message) => {
1418                let payload = format!("-ERR {message}\r\n");
1419                Some(DispatchOutcome::Error(synthetic_redis_reply(
1420                    req,
1421                    &self.mbuf_pool,
1422                    payload.as_bytes(),
1423                )))
1424            }
1425        }
1426    }
1427}
1428
1429/// Wrap an arbitrary RESP byte sequence as a synthetic Redis
1430/// response [`Msg`]. The response inherits the request's id (so
1431/// the FSM can pair it with the originating request), is marked
1432/// `is_request = false`, and the supplied bytes are copied into
1433/// one or more mbufs drawn from `pool`.
1434fn synthetic_redis_reply(req: &Msg, pool: &MbufPool, payload: &[u8]) -> Msg {
1435    let mut rsp = Msg::new(req.id(), MsgType::RspRedisStatus, false);
1436    rsp.set_parent_id(req.id());
1437    let mut written = 0usize;
1438    while written < payload.len() {
1439        let mut buf = pool.get();
1440        let n = buf.recv(&payload[written..]);
1441        debug_assert!(
1442            n > 0,
1443            "MbufPool returned a buffer with zero writable capacity"
1444        );
1445        rsp.mbufs_mut().push_back(buf);
1446        written += n;
1447    }
1448    rsp.recompute_mlen();
1449    rsp
1450}
1451
1452/// Recover the first bulk-string token from a parsed RESP
1453/// request's mbuf chain. The parser case-folds the keyword for
1454/// the command-table lookup but stores the original wire bytes
1455/// in the mbufs; this helper reads them back so the dispatcher
1456/// can render structured error replies that quote the actual
1457/// keyword the client sent.
1458///
1459/// Returns `None` when the mbuf chain does not begin with a
1460/// well-formed `*N\r\n$M\r\n<token>\r\n` sequence.
1461fn first_bulk_token(req: &Msg) -> Option<Vec<u8>> {
1462    let mut wire: Vec<u8> = Vec::new();
1463    for buf in req.mbufs() {
1464        wire.extend_from_slice(buf.readable());
1465        if wire.len() > 256 {
1466            break;
1467        }
1468    }
1469    let mut p = 0usize;
1470    if wire.first() == Some(&b'*') {
1471        let cr = wire.iter().position(|&b| b == b'\r')?;
1472        if wire.get(cr + 1) != Some(&b'\n') {
1473            return None;
1474        }
1475        p = cr + 2;
1476    }
1477    if wire.get(p) != Some(&b'$') {
1478        return None;
1479    }
1480    let header_start = p + 1;
1481    let header_cr = wire[header_start..]
1482        .iter()
1483        .position(|&b| b == b'\r')
1484        .map(|i| header_start + i)?;
1485    if wire.get(header_cr + 1) != Some(&b'\n') {
1486        return None;
1487    }
1488    let len_str = std::str::from_utf8(&wire[header_start..header_cr]).ok()?;
1489    let len: usize = len_str.parse().ok()?;
1490    let body_start = header_cr + 2;
1491    let body_end = body_start.checked_add(len)?;
1492    if wire.len() < body_end + 2 {
1493        return None;
1494    }
1495    Some(wire[body_start..body_end].to_vec())
1496}
1497
1498/// Context required to schedule read-repair writes once the
1499/// coalescer has identified a winner and a divergent set.
1500#[derive(Clone)]
1501struct ReadRepairContext {
1502    req_id: crate::core::types::MsgId,
1503    req_ty: MsgType,
1504    /// Original primary key (single-key requests only). The v1
1505    /// repair scheduler operates over single-key Redis reads;
1506    /// multi-key fragmentation goes through a separate path.
1507    key: Vec<u8>,
1508    mbuf_pool: MbufPool,
1509    peer_backends: std::collections::HashMap<u32, mpsc::Sender<OutboundRequest>>,
1510    local_backend: Option<mpsc::Sender<OutboundRequest>>,
1511    target_is_local: std::collections::HashMap<u32, bool>,
1512}
1513
1514/// Per-fan-out coalescer task body.
1515#[allow(
1516    clippy::too_many_arguments,
1517    reason = "actor task captures the entire dispatch context; bundling into a struct adds churn for no callsite gain"
1518)]
1519async fn coalesce_actor(
1520    req_id: crate::core::types::MsgId,
1521    req_ty: MsgType,
1522    consistency: ConsistencyLevel,
1523    targets: Vec<(u32, String)>,
1524    local_dc: String,
1525    mut intermediate_rx: mpsc::Receiver<OutboundEnvelope>,
1526    client_tx: ServerSink,
1527    mbuf_pool: MbufPool,
1528    repair_ctx: Option<ReadRepairContext>,
1529    failure_metrics: Option<Arc<crate::stats::FailureMetrics>>,
1530) {
1531    use crate::proto::redis::{CoalesceOutcome, CoalesceTracker};
1532    let mut tracker = CoalesceTracker::new(req_id, consistency, targets, &local_dc);
1533    let mut emitted = false;
1534    while let Some(env) = intermediate_rx.recv().await {
1535        let source = env.source_peer_idx.unwrap_or(u32::MAX);
1536        let span = env.span.clone();
1537        let outcome = tracker.record_reply(source, env.rsp);
1538        match outcome {
1539            CoalesceOutcome::Pending => {}
1540            CoalesceOutcome::Ready {
1541                winner,
1542                divergent_targets,
1543            } => {
1544                if !emitted {
1545                    let winner_bytes: Vec<u8> = winner
1546                        .mbufs()
1547                        .iter()
1548                        .flat_map(|b| b.readable().to_vec())
1549                        .collect();
1550                    let out_env = OutboundEnvelope {
1551                        req_id,
1552                        rsp: *winner,
1553                        span: span.clone(),
1554                        source_peer_idx: None,
1555                    };
1556                    let _ = client_tx.send(out_env).await;
1557                    emitted = true;
1558                    if !divergent_targets.is_empty() {
1559                        if let Some(ctx) = repair_ctx.as_ref() {
1560                            schedule_read_repair(ctx, &divergent_targets, &winner_bytes, &span);
1561                        }
1562                    }
1563                }
1564            }
1565            CoalesceOutcome::Error(reason) => {
1566                if !emitted {
1567                    let err_type = if matches!(req_ty, MsgType::ReqRedisGet | MsgType::ReqRedisSet)
1568                    {
1569                        MsgType::RspRedisError
1570                    } else {
1571                        MsgType::RspMcServerError
1572                    };
1573                    let anchor = Msg::new(req_id, req_ty, true);
1574                    let rsp = crate::msg::response::make_error(
1575                        &anchor,
1576                        err_type,
1577                        0,
1578                        crate::msg::DynErrorCode::DynomiteNoQuorumAchieved,
1579                        &mbuf_pool,
1580                    );
1581                    let _ = client_tx
1582                        .send(OutboundEnvelope {
1583                            req_id,
1584                            rsp,
1585                            span: span.clone(),
1586                            source_peer_idx: None,
1587                        })
1588                        .await;
1589                    emitted = true;
1590                }
1591                tracing::debug!(target: "dynomite::coalesce", req_id, reason = %reason, "coalesce error");
1592            }
1593        }
1594    }
1595    if !emitted {
1596        // No reply was emitted and the channel closed (every
1597        // per-target sender dropped without producing a reply).
1598        // From the dispatcher's perspective the request has
1599        // timed out at the coalescer layer; surface a
1600        // quorum-unreachable error so the client does not hang
1601        // and bump the response-timeout counter.
1602        if let Some(m) = failure_metrics.as_ref() {
1603            m.record_response_timeout(consistency);
1604        }
1605        let err_type = if matches!(req_ty, MsgType::ReqRedisGet | MsgType::ReqRedisSet) {
1606            MsgType::RspRedisError
1607        } else {
1608            MsgType::RspMcServerError
1609        };
1610        let anchor = Msg::new(req_id, req_ty, true);
1611        let rsp = crate::msg::response::make_error(
1612            &anchor,
1613            err_type,
1614            0,
1615            crate::msg::DynErrorCode::DynomiteNoQuorumAchieved,
1616            &mbuf_pool,
1617        );
1618        let _ = client_tx
1619            .send(OutboundEnvelope {
1620                req_id,
1621                rsp,
1622                span: tracing::Span::none(),
1623                source_peer_idx: None,
1624            })
1625            .await;
1626    }
1627}
1628
1629/// Build a sink the read-repair task can drop replies into. The
1630/// scheduler is fire-and-forget: every reply is discarded.
1631fn repair_sink() -> ServerSink {
1632    let (tx, mut rx) = mpsc::channel::<OutboundEnvelope>(8);
1633    tokio::spawn(async move {
1634        while rx.recv().await.is_some() {
1635            // Drop the envelope; the original client already
1636            // received its reply on the main responder.
1637        }
1638    });
1639    tx
1640}
1641
1642/// Decode a winning RESP reply into the bytes we want to write
1643/// back to divergent replicas.
1644///
1645/// Returns `Some(bytes)` for a bulk-string winner (we ship a
1646/// `SET key value` to the divergent replica) or for a nil reply
1647/// (we ship a `DEL key`). Returns `None` for any other shape
1648/// (errors, integers, multibulk, ...) since the v1 repair
1649/// scheduler only handles single-bulk Redis GET-style winners.
1650fn decode_winner_for_repair(payload: &[u8]) -> Option<RepairAction> {
1651    if payload == b"$-1\r\n" {
1652        return Some(RepairAction::Delete);
1653    }
1654    if !payload.starts_with(b"$") {
1655        return None;
1656    }
1657    // `$<len>\r\n<value>\r\n`
1658    let crlf = payload.iter().position(|&b| b == b'\r')?;
1659    if payload.get(crlf + 1).copied() != Some(b'\n') {
1660        return None;
1661    }
1662    let len_str = std::str::from_utf8(&payload[1..crlf]).ok()?;
1663    let len: usize = len_str.parse().ok()?;
1664    let body_start = crlf + 2;
1665    let body_end = body_start.checked_add(len)?;
1666    if payload.len() < body_end + 2 {
1667        return None;
1668    }
1669    if &payload[body_end..body_end + 2] != b"\r\n" {
1670        return None;
1671    }
1672    Some(RepairAction::Write(payload[body_start..body_end].to_vec()))
1673}
1674
1675/// Bundle of handoff state passed into
1676/// [`ClusterDispatcher::dispatch_replicas_direct`]. Avoids a
1677/// noisy parameter list while keeping the per-call decisions
1678/// ("is handoff on?", "what state is each target in?") in one
1679/// place.
1680struct HandoffCtx<'a> {
1681    handoff_active: bool,
1682    peer_states: &'a std::collections::HashMap<u32, crate::cluster::peer::PeerState>,
1683}
1684
1685/// Per-target dispatch action chosen by
1686/// [`ClusterDispatcher::choose_target_action`].
1687#[derive(Clone, Copy, Debug, Eq, PartialEq)]
1688enum TargetAction {
1689    /// Forward the request via the per-peer outbound channel.
1690    Send,
1691    /// Record a hint and feed the coalescer a synth success
1692    /// reply.
1693    Hint,
1694}
1695
1696/// Synthetic reply pushed into the coalescer on behalf of a
1697/// hinted target. Always a `+OK\r\n` Redis status reply: the
1698/// dispatcher decouples the hint from the eventual peer reply
1699/// (the drainer will fire-and-forget the hint when the peer
1700/// returns), so the synth shape only needs to satisfy the
1701/// coalescer for SET-style writes which is the dominant case
1702/// for hinted handoff. Mismatched-shape writes (DEL with one
1703/// hinted target) coalesce to the surviving real reply via the
1704/// plurality / quorum branch and the divergence is harmless
1705/// because read-repair only fires for `MsgType::ReqRedisGet`.
1706fn synth_hint_reply(req: &Msg, pool: &MbufPool) -> Msg {
1707    crate::msg::response::make_simple_redis(req, pool, b"+OK\r\n")
1708}
1709
1710/// Action a read-repair task should take against a divergent
1711/// replica.
1712enum RepairAction {
1713    /// Ship `SET key <bytes>` to overwrite the stale value.
1714    Write(Vec<u8>),
1715    /// Ship `DEL key` to drop the stale value (winning reply
1716    /// was a nil bulk).
1717    Delete,
1718}
1719
1720/// Build the wire bytes for a Redis repair write.
1721fn build_repair_bytes(action: &RepairAction, key: &[u8]) -> Vec<u8> {
1722    match action {
1723        RepairAction::Write(value) => {
1724            let mut out = Vec::with_capacity(key.len() + value.len() + 32);
1725            out.extend_from_slice(b"*3\r\n$3\r\nSET\r\n$");
1726            out.extend_from_slice(key.len().to_string().as_bytes());
1727            out.extend_from_slice(b"\r\n");
1728            out.extend_from_slice(key);
1729            out.extend_from_slice(b"\r\n$");
1730            out.extend_from_slice(value.len().to_string().as_bytes());
1731            out.extend_from_slice(b"\r\n");
1732            out.extend_from_slice(value);
1733            out.extend_from_slice(b"\r\n");
1734            out
1735        }
1736        RepairAction::Delete => {
1737            let mut out = Vec::with_capacity(key.len() + 24);
1738            out.extend_from_slice(b"*2\r\n$3\r\nDEL\r\n$");
1739            out.extend_from_slice(key.len().to_string().as_bytes());
1740            out.extend_from_slice(b"\r\n");
1741            out.extend_from_slice(key);
1742            out.extend_from_slice(b"\r\n");
1743            out
1744        }
1745    }
1746}
1747
1748/// Schedule fire-and-forget read-repair writes to every
1749/// divergent target. The function only awaits a bounded mpsc
1750/// permit; it never blocks for the repair to complete or for
1751/// the divergent replica to ack.
1752///
1753/// The repair shape is decoded from `winner_bytes`:
1754///
1755/// * Bulk-string winner -> `SET key <value>`.
1756/// * Nil bulk winner -> `DEL key`.
1757/// * Anything else -> skipped (entropy reconciliation will
1758///   handle it later). This v1 limitation is documented in the
1759///   dispatcher tests and in `docs/parity.md`.
1760///
1761/// Repair writes are tagged with `DmsgType::ReqForward` so the
1762/// receiving peer's `dnode_client_loop` rewrites the parsed
1763/// request's routing tag to `LocalNodeOnly`, preventing a
1764/// recursive multi-replica fan-out at the divergent peer.
1765fn schedule_read_repair(
1766    ctx: &ReadRepairContext,
1767    divergent: &[u32],
1768    winner_bytes: &[u8],
1769    span: &tracing::Span,
1770) {
1771    if !matches!(ctx.req_ty, MsgType::ReqRedisGet) {
1772        return;
1773    }
1774    let Some(action) = decode_winner_for_repair(winner_bytes) else {
1775        return;
1776    };
1777    let bytes = build_repair_bytes(&action, &ctx.key);
1778    let sink = repair_sink();
1779    for &peer_idx in divergent {
1780        let is_local = ctx.target_is_local.get(&peer_idx).copied().unwrap_or(false);
1781        let env = OutboundRequest {
1782            bytes: bytes.clone(),
1783            req_id: ctx.req_id,
1784            responder: sink.clone(),
1785            span: span.clone(),
1786            ty: crate::proto::dnode::DmsgType::ReqForward,
1787            target_peer_idx: Some(peer_idx),
1788        };
1789        let sent = if is_local {
1790            ctx.local_backend
1791                .as_ref()
1792                .is_some_and(|tx| tx.try_send(env).is_ok())
1793        } else {
1794            ctx.peer_backends
1795                .get(&peer_idx)
1796                .is_some_and(|tx| tx.try_send(env).is_ok())
1797        };
1798        if sent {
1799            let _ = &ctx.mbuf_pool;
1800            tracing::debug!(
1801                target: "dynomite::read_repair",
1802                req_id = ctx.req_id,
1803                peer_idx,
1804                bytes = bytes.len(),
1805                "scheduled read-repair write",
1806            );
1807        } else {
1808            tracing::debug!(
1809                target: "dynomite::read_repair",
1810                req_id = ctx.req_id,
1811                peer_idx,
1812                "read-repair drop: backend channel unavailable or full",
1813            );
1814        }
1815    }
1816}
1817
1818#[cfg(test)]
1819mod tests {
1820    use super::*;
1821    use crate::cluster::peer::{Peer, PeerEndpoint, PeerState};
1822    use crate::conf::DataStore;
1823    use crate::hashkit::DynToken;
1824
1825    fn cfg(read: ConsistencyLevel, write: ConsistencyLevel) -> crate::cluster::PoolConfig {
1826        crate::cluster::PoolConfig {
1827            read_consistency: read,
1828            write_consistency: write,
1829            dc: "dc1".into(),
1830            rack: "rA".into(),
1831            ..crate::cluster::PoolConfig::default()
1832        }
1833    }
1834
1835    fn peer(idx: u32, dc: &str, rack: &str, tok: u32, is_local: bool, is_same: bool) -> Peer {
1836        let mut p = Peer::new(
1837            idx,
1838            PeerEndpoint::tcp("h".into(), 8101 + u16::try_from(idx).unwrap_or(0)),
1839            rack.into(),
1840            dc.into(),
1841            vec![DynToken::from_u32(tok)],
1842            is_local,
1843            is_same,
1844            false,
1845        );
1846        p.set_state(PeerState::Normal, 0);
1847        p
1848    }
1849
1850    fn pool(read: ConsistencyLevel, write: ConsistencyLevel, peers: Vec<Peer>) -> Arc<ServerPool> {
1851        let pool = ServerPool::new(cfg(read, write), peers);
1852        pool.preselect_remote_racks();
1853        Arc::new(pool)
1854    }
1855
1856    #[test]
1857    fn local_node_only_short_circuits() {
1858        let p = pool(
1859            ConsistencyLevel::DcOne,
1860            ConsistencyLevel::DcOne,
1861            vec![peer(0, "dc1", "rA", 10, true, true)],
1862        );
1863        let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
1864        req.set_routing(MsgRouting::LocalNodeOnly);
1865        assert_eq!(
1866            ClusterDispatcher::new(p).plan(&req, b"k"),
1867            DispatchPlan::LocalDatastore,
1868        );
1869    }
1870
1871    #[test]
1872    fn dc_one_read_targets_local_rack_when_present() {
1873        let p = pool(
1874            ConsistencyLevel::DcOne,
1875            ConsistencyLevel::DcOne,
1876            vec![
1877                peer(0, "dc1", "rA", 10, true, true),
1878                peer(1, "dc1", "rB", 20, false, true),
1879                peer(2, "dc2", "rA", 30, false, false),
1880            ],
1881        );
1882        let req = Msg::new(1, MsgType::ReqRedisGet, true);
1883        // Any key resolves to peer 0 in rack rA (single-token continuum).
1884        let plan = ClusterDispatcher::new(p).plan(&req, b"hello");
1885        assert!(matches!(plan, DispatchPlan::LocalDatastore));
1886    }
1887
1888    #[test]
1889    fn dc_quorum_fans_out_local_dc() {
1890        let p = pool(
1891            ConsistencyLevel::DcQuorum,
1892            ConsistencyLevel::DcQuorum,
1893            vec![
1894                peer(0, "dc1", "rA", 10, true, true),
1895                peer(1, "dc1", "rB", 20, false, true),
1896                peer(2, "dc2", "rA", 30, false, false),
1897            ],
1898        );
1899        let req = Msg::new(1, MsgType::ReqRedisGet, true);
1900        let plan = ClusterDispatcher::new(p).plan(&req, b"k");
1901        match plan {
1902            DispatchPlan::Replicas { targets: rs, .. } => {
1903                assert_eq!(rs.len(), 2);
1904                for r in rs {
1905                    assert_eq!(r.dc, "dc1");
1906                }
1907            }
1908            _ => panic!("expected replicas"),
1909        }
1910    }
1911
1912    #[test]
1913    fn dc_each_safe_quorum_fans_out_per_dc() {
1914        let p = pool(
1915            ConsistencyLevel::DcEachSafeQuorum,
1916            ConsistencyLevel::DcEachSafeQuorum,
1917            vec![
1918                peer(0, "dc1", "rA", 10, true, true),
1919                peer(1, "dc2", "rA", 20, false, false),
1920            ],
1921        );
1922        let req = Msg::new(1, MsgType::ReqRedisGet, true);
1923        let plan = ClusterDispatcher::new(p).plan(&req, b"k");
1924        match plan {
1925            DispatchPlan::Replicas { targets: rs, .. } => {
1926                assert_eq!(rs.len(), 2);
1927                let dcs: Vec<&str> = rs.iter().map(|r| r.dc.as_str()).collect();
1928                assert!(dcs.contains(&"dc1"));
1929                assert!(dcs.contains(&"dc2"));
1930            }
1931            _ => panic!("expected replicas"),
1932        }
1933    }
1934
1935    #[test]
1936    fn no_routable_peers_returns_no_targets() {
1937        let mut p0 = peer(0, "dc1", "rA", 10, true, true);
1938        p0.set_state(PeerState::Down, 0);
1939        let p = pool(
1940            ConsistencyLevel::DcQuorum,
1941            ConsistencyLevel::DcQuorum,
1942            vec![p0],
1943        );
1944        let req = Msg::new(1, MsgType::ReqRedisGet, true);
1945        let plan = ClusterDispatcher::new(p).plan(&req, b"k");
1946        assert_eq!(plan, DispatchPlan::NoTargets);
1947    }
1948
1949    /// Regression: any code path that returns
1950    /// `DispatchOutcome::Error` used to send 0 wire bytes to the
1951    /// client because [`crate::msg::response::make_error`] never
1952    /// attached the wire-format error string. The client then
1953    /// hung until its read timeout. After the fix, the error
1954    /// response carries a parseable `-Dynomite: ...` reply that
1955    /// the client can render as an error.
1956    #[test]
1957    fn no_targets_error_response_carries_dynomite_wire_bytes() {
1958        let mut p0 = peer(0, "dc1", "rA", 10, true, true);
1959        p0.set_state(PeerState::Down, 0);
1960        let p = pool(
1961            ConsistencyLevel::DcQuorum,
1962            ConsistencyLevel::DcQuorum,
1963            vec![p0],
1964        );
1965        let disp = ClusterDispatcher::new(p);
1966        let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
1967        req.push_key(crate::msg::keypos::KeyPos::without_tag(b"k".to_vec()));
1968        let (tx, _rx) = mpsc::channel(1);
1969        let outcome = disp.dispatch(req, tx);
1970        match outcome {
1971            DispatchOutcome::Error(rsp) => {
1972                assert_eq!(rsp.ty(), MsgType::RspRedisError);
1973                assert!(rsp.flags().is_error);
1974                let bytes: Vec<u8> = rsp
1975                    .mbufs()
1976                    .iter()
1977                    .flat_map(|b| b.readable().to_vec())
1978                    .collect();
1979                assert!(
1980                    !bytes.is_empty(),
1981                    "NoTargets must produce on-wire bytes, not a 0-byte hang"
1982                );
1983                assert!(bytes.starts_with(b"-Dynomite: "));
1984                assert!(bytes.ends_with(b"\r\n"));
1985                assert_eq!(rsp.mlen() as usize, bytes.len());
1986            }
1987            other => panic!("expected DispatchOutcome::Error, got {other:?}"),
1988        }
1989    }
1990
1991    /// Memcache traffic with no quorum-eligible target must
1992    /// surface a `SERVER_ERROR ...\r\n` reply rather than
1993    /// hanging the client.
1994    #[test]
1995    fn no_targets_error_response_memcache_wire_bytes() {
1996        // Build a memcache pool so the dispatcher's err_type
1997        // selection lands on `RspMcServerError` (the dispatcher
1998        // currently keys off the request `MsgType`, so a
1999        // memcache request flows through the memcache wire
2000        // shape).
2001        let mut cfg = cfg(ConsistencyLevel::DcQuorum, ConsistencyLevel::DcQuorum);
2002        cfg.data_store = DataStore::Memcache;
2003        let mut p0 = peer(0, "dc1", "rA", 10, true, true);
2004        p0.set_state(PeerState::Down, 0);
2005        let pool_arc = ServerPool::new(cfg, vec![p0]);
2006        pool_arc.preselect_remote_racks();
2007        let disp = ClusterDispatcher::new(Arc::new(pool_arc));
2008        let mut req = Msg::new(1, MsgType::ReqMcGet, true);
2009        req.push_key(crate::msg::keypos::KeyPos::without_tag(b"k".to_vec()));
2010        let (tx, _rx) = mpsc::channel(1);
2011        let outcome = disp.dispatch(req, tx);
2012        match outcome {
2013            DispatchOutcome::Error(rsp) => {
2014                assert_eq!(rsp.ty(), MsgType::RspMcServerError);
2015                let bytes: Vec<u8> = rsp
2016                    .mbufs()
2017                    .iter()
2018                    .flat_map(|b| b.readable().to_vec())
2019                    .collect();
2020                assert!(
2021                    !bytes.is_empty(),
2022                    "NoTargets must produce on-wire bytes, not a 0-byte hang"
2023                );
2024                assert!(bytes.starts_with(b"SERVER_ERROR "));
2025                assert!(bytes.ends_with(b"\r\n"));
2026            }
2027            other => panic!("expected DispatchOutcome::Error, got {other:?}"),
2028        }
2029    }
2030
2031    use crate::cluster::pool::{BucketType, PoolConfig};
2032
2033    fn pool_with_bucket_types(
2034        pool_read: ConsistencyLevel,
2035        pool_write: ConsistencyLevel,
2036        bucket_types: Vec<BucketType>,
2037        default_bucket_type: Option<&str>,
2038        peers: Vec<Peer>,
2039    ) -> Arc<ServerPool> {
2040        let cfg = PoolConfig {
2041            read_consistency: pool_read,
2042            write_consistency: pool_write,
2043            dc: "dc1".into(),
2044            rack: "rA".into(),
2045            bucket_types,
2046            default_bucket_type: default_bucket_type.map(str::to_string),
2047            ..PoolConfig::default()
2048        };
2049        let pool = ServerPool::new(cfg, peers);
2050        pool.preselect_remote_racks();
2051        Arc::new(pool)
2052    }
2053
2054    fn three_local_peers() -> Vec<Peer> {
2055        vec![
2056            peer(0, "dc1", "rA", 10, true, true),
2057            peer(1, "dc1", "rB", 20, false, true),
2058            peer(2, "dc1", "rC", 30, false, true),
2059        ]
2060    }
2061
2062    #[test]
2063    fn bucket_type_overrides_pool_consistency() {
2064        // Pool default is DC_ONE, the bucket forces DC_QUORUM.
2065        let bts = vec![BucketType {
2066            name: "hot".into(),
2067            read_consistency: ConsistencyLevel::DcQuorum,
2068            write_consistency: ConsistencyLevel::DcQuorum,
2069            n_val: 0,
2070        }];
2071        let p = pool_with_bucket_types(
2072            ConsistencyLevel::DcOne,
2073            ConsistencyLevel::DcOne,
2074            bts,
2075            None,
2076            three_local_peers(),
2077        );
2078        let req = Msg::new(1, MsgType::ReqRedisGet, true);
2079        let plan = ClusterDispatcher::new(p).plan(&req, b"hot/key1");
2080        match plan {
2081            DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 3),
2082            other => panic!("expected DC_QUORUM fan-out, got {other:?}"),
2083        }
2084    }
2085
2086    #[test]
2087    fn slashless_key_falls_back_to_pool_default() {
2088        let bts = vec![BucketType {
2089            name: "hot".into(),
2090            read_consistency: ConsistencyLevel::DcQuorum,
2091            write_consistency: ConsistencyLevel::DcQuorum,
2092            n_val: 0,
2093        }];
2094        let p = pool_with_bucket_types(
2095            ConsistencyLevel::DcOne,
2096            ConsistencyLevel::DcOne,
2097            bts,
2098            None,
2099            three_local_peers(),
2100        );
2101        let req = Msg::new(1, MsgType::ReqRedisGet, true);
2102        let plan = ClusterDispatcher::new(p).plan(&req, b"plain-key");
2103        // No slash and no default_bucket_type -> pool DC_ONE.
2104        // The local rack hosts peer 0 so the plan short-circuits.
2105        assert!(matches!(plan, DispatchPlan::LocalDatastore));
2106    }
2107
2108    #[test]
2109    fn unknown_bucket_uses_default_bucket_type_when_set() {
2110        let bts = vec![BucketType {
2111            name: "safe".into(),
2112            read_consistency: ConsistencyLevel::DcQuorum,
2113            write_consistency: ConsistencyLevel::DcQuorum,
2114            n_val: 0,
2115        }];
2116        let p = pool_with_bucket_types(
2117            ConsistencyLevel::DcOne,
2118            ConsistencyLevel::DcOne,
2119            bts,
2120            Some("safe"),
2121            three_local_peers(),
2122        );
2123        let req = Msg::new(1, MsgType::ReqRedisGet, true);
2124        // Slashless key: bucket is None, default_bucket_type=safe applies
2125        // so we get the bucket-type's DC_QUORUM fan-out.
2126        let plan = ClusterDispatcher::new(p.clone()).plan(&req, b"plain-key");
2127        match plan {
2128            DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 3),
2129            other => panic!("expected DC_QUORUM via default bucket, got {other:?}"),
2130        }
2131        // Slashed key with an unknown bucket prefix also falls
2132        // through to the default bucket type.
2133        let plan = ClusterDispatcher::new(p).plan(&req, b"unknown-bucket/key");
2134        match plan {
2135            DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 3),
2136            other => panic!("expected DC_QUORUM via default bucket, got {other:?}"),
2137        }
2138    }
2139
2140    #[test]
2141    fn unknown_bucket_with_no_default_uses_pool_default() {
2142        let bts = vec![BucketType {
2143            name: "safe".into(),
2144            read_consistency: ConsistencyLevel::DcQuorum,
2145            write_consistency: ConsistencyLevel::DcQuorum,
2146            n_val: 0,
2147        }];
2148        let p = pool_with_bucket_types(
2149            ConsistencyLevel::DcOne,
2150            ConsistencyLevel::DcOne,
2151            bts,
2152            None,
2153            three_local_peers(),
2154        );
2155        let req = Msg::new(1, MsgType::ReqRedisGet, true);
2156        let plan = ClusterDispatcher::new(p).plan(&req, b"unknown-bucket/key");
2157        assert!(matches!(plan, DispatchPlan::LocalDatastore));
2158    }
2159
2160    #[test]
2161    fn n_val_one_caps_replicas_to_first_target() {
2162        let bts = vec![BucketType {
2163            name: "thin".into(),
2164            read_consistency: ConsistencyLevel::DcQuorum,
2165            write_consistency: ConsistencyLevel::DcQuorum,
2166            n_val: 1,
2167        }];
2168        let p = pool_with_bucket_types(
2169            ConsistencyLevel::DcOne,
2170            ConsistencyLevel::DcOne,
2171            bts,
2172            None,
2173            three_local_peers(),
2174        );
2175        let req = Msg::new(1, MsgType::ReqRedisGet, true);
2176        let plan = ClusterDispatcher::new(p).plan(&req, b"thin/key");
2177        match plan {
2178            DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 1),
2179            other => panic!("expected single-target plan, got {other:?}"),
2180        }
2181    }
2182
2183    #[test]
2184    fn n_val_two_caps_replicas_to_first_two_targets() {
2185        let bts = vec![BucketType {
2186            name: "medium".into(),
2187            read_consistency: ConsistencyLevel::DcQuorum,
2188            write_consistency: ConsistencyLevel::DcQuorum,
2189            n_val: 2,
2190        }];
2191        let p = pool_with_bucket_types(
2192            ConsistencyLevel::DcOne,
2193            ConsistencyLevel::DcOne,
2194            bts,
2195            None,
2196            three_local_peers(),
2197        );
2198        let req = Msg::new(1, MsgType::ReqRedisGet, true);
2199        let plan = ClusterDispatcher::new(p).plan(&req, b"medium/key");
2200        match plan {
2201            DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 2),
2202            other => panic!("expected two-target plan, got {other:?}"),
2203        }
2204    }
2205
2206    #[test]
2207    fn n_val_zero_does_not_cap() {
2208        let bts = vec![BucketType {
2209            name: "any".into(),
2210            read_consistency: ConsistencyLevel::DcQuorum,
2211            write_consistency: ConsistencyLevel::DcQuorum,
2212            n_val: 0,
2213        }];
2214        let p = pool_with_bucket_types(
2215            ConsistencyLevel::DcOne,
2216            ConsistencyLevel::DcOne,
2217            bts,
2218            None,
2219            three_local_peers(),
2220        );
2221        let req = Msg::new(1, MsgType::ReqRedisGet, true);
2222        let plan = ClusterDispatcher::new(p).plan(&req, b"any/key");
2223        match plan {
2224            DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 3),
2225            other => panic!("expected uncapped plan, got {other:?}"),
2226        }
2227    }
2228
2229    #[test]
2230    fn n_val_larger_than_replicas_is_a_no_op() {
2231        let bts = vec![BucketType {
2232            name: "big".into(),
2233            read_consistency: ConsistencyLevel::DcQuorum,
2234            write_consistency: ConsistencyLevel::DcQuorum,
2235            n_val: 7,
2236        }];
2237        let p = pool_with_bucket_types(
2238            ConsistencyLevel::DcOne,
2239            ConsistencyLevel::DcOne,
2240            bts,
2241            None,
2242            three_local_peers(),
2243        );
2244        let req = Msg::new(1, MsgType::ReqRedisGet, true);
2245        let plan = ClusterDispatcher::new(p).plan(&req, b"big/key");
2246        match plan {
2247            DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 3),
2248            other => panic!("expected uncapped plan, got {other:?}"),
2249        }
2250    }
2251
2252    /// Smoke test for the failure-cause counter wiring: a
2253    /// `NoTargets` plan increments the labelled counter
2254    /// exactly once.
2255    #[test]
2256    fn no_targets_records_failure_metric() {
2257        let mut p0 = peer(0, "dc1", "rA", 10, true, true);
2258        p0.set_state(PeerState::Down, 0);
2259        let p = pool(
2260            ConsistencyLevel::DcQuorum,
2261            ConsistencyLevel::DcQuorum,
2262            vec![p0],
2263        );
2264        let metrics = Arc::new(crate::stats::FailureMetrics::new());
2265        let disp = ClusterDispatcher::new(p).with_failure_metrics(metrics.clone());
2266        let req = Msg::new(1, MsgType::ReqRedisGet, true);
2267        assert_eq!(disp.plan(&req, b"k"), DispatchPlan::NoTargets);
2268        let snap = metrics.snapshot();
2269        assert_eq!(snap.no_targets.len(), 1);
2270        let entry = &snap.no_targets[0];
2271        assert_eq!(entry.dc, "dc1");
2272        assert_eq!(entry.rack, "rA");
2273        assert_eq!(entry.consistency, ConsistencyLevel::DcQuorum);
2274        assert_eq!(entry.count, 1);
2275    }
2276
2277    /// A closed mpsc channel returns `Closed` from
2278    /// `try_send`. Wire the dispatcher's local-datastore path
2279    /// to such a channel, fire one request, and assert the
2280    /// `dispatch_backend_send_closed_total` counter ticks by
2281    /// exactly one.
2282    #[tokio::test]
2283    async fn closed_backend_channel_records_closed_metric() {
2284        let p = pool(
2285            ConsistencyLevel::DcOne,
2286            ConsistencyLevel::DcOne,
2287            vec![peer(0, "dc1", "rA", 10, true, true)],
2288        );
2289        let (tx, rx) = mpsc::channel::<crate::net::server::OutboundRequest>(4);
2290        drop(rx);
2291        let metrics = Arc::new(crate::stats::FailureMetrics::new());
2292        let disp = ClusterDispatcher::new(p)
2293            .with_backend(tx)
2294            .with_failure_metrics(metrics.clone());
2295        let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
2296        // Attach a non-empty mbuf so the dispatcher actually
2297        // attempts the try_send (empty bytes short-circuit
2298        // to Drop before the channel is touched).
2299        let pool_buf = crate::io::mbuf::MbufPool::default();
2300        let mut buf = pool_buf.get();
2301        buf.copy_from_slice(b"PING\r\n");
2302        req.mbufs_mut().push_back(buf);
2303        let (resp_tx, _resp_rx) = mpsc::channel(1);
2304        let outcome = disp.dispatch(req, resp_tx);
2305        assert!(matches!(outcome, DispatchOutcome::Error(_)));
2306        let snap = metrics.snapshot();
2307        assert_eq!(snap.backend_send_closed, 1);
2308        assert_eq!(snap.backend_send_full, 0);
2309    }
2310
2311    /// Integration-style unit test: build a two-peer pool,
2312    /// mark one peer Down, drive 100 dispatches across the
2313    /// ring, and assert the `dispatch_no_targets_total`
2314    /// counter reflects every observed `NoTargets` plan.
2315    #[test]
2316    fn two_peer_pool_with_one_down_records_per_key_no_targets() {
2317        let cfg = crate::cluster::PoolConfig {
2318            dc: "dc1".into(),
2319            rack: "rA".into(),
2320            read_consistency: ConsistencyLevel::DcQuorum,
2321            write_consistency: ConsistencyLevel::DcQuorum,
2322            ..crate::cluster::PoolConfig::default()
2323        };
2324        // Single-rack two-peer ring: peer 0 owns the upper
2325        // half via wrap-around (token 2_147_483_648), peer 1
2326        // owns the lower half (token 0 plus the boundary up to
2327        // 2_147_483_648). With both peers in rack `rA` the
2328        // continuum has two entries, so each key maps to
2329        // exactly one peer; marking peer 1 Down causes its
2330        // arc to produce `NoTargets`.
2331        let p0 = peer(0, "dc1", "rA", 2_147_483_648, true, true);
2332        let mut p1 = peer(1, "dc1", "rA", 0, false, true);
2333        p1.set_state(PeerState::Down, 0);
2334        let pool_arc = ServerPool::new(cfg, vec![p0, p1]);
2335        pool_arc.preselect_remote_racks();
2336        let metrics = Arc::new(crate::stats::FailureMetrics::new());
2337        let disp = ClusterDispatcher::new(Arc::new(pool_arc)).with_failure_metrics(metrics.clone());
2338        let mut planned_no_targets = 0u64;
2339        let mut planned_routable = 0u64;
2340        for i in 0..100u32 {
2341            let key = format!("k{i:03}");
2342            let req = Msg::new(u64::from(i), MsgType::ReqRedisGet, true);
2343            match disp.plan(&req, key.as_bytes()) {
2344                DispatchPlan::NoTargets => planned_no_targets += 1,
2345                DispatchPlan::Replicas { .. } | DispatchPlan::LocalDatastore => {
2346                    planned_routable += 1;
2347                }
2348                DispatchPlan::Drop => panic!("unexpected Drop in plan"),
2349            }
2350        }
2351        assert!(planned_no_targets > 0, "expected some NoTargets dispatches");
2352        assert!(planned_routable > 0, "expected some routable dispatches");
2353        let snap = metrics.snapshot();
2354        let counter_total: u64 = snap.no_targets.iter().map(|e| e.count).sum();
2355        assert_eq!(
2356            counter_total, planned_no_targets,
2357            "dispatch_no_targets_total must match observed NoTargets count",
2358        );
2359        // No `Closed`/`Full` channel errors expected: we did
2360        // not wire any backends.
2361        assert_eq!(snap.backend_send_full, 0);
2362        assert_eq!(snap.backend_send_closed, 0);
2363        assert!(snap.peer_send_full.is_empty());
2364        assert!(snap.peer_send_closed.is_empty());
2365    }
2366}