Skip to main content

dynomite/cluster/
dispatch.rs

1//! Cluster-aware [`Dispatcher`](crate::net::Dispatcher).
2//!
3//! Routes parsed [`Msg`]s based on the configured consistency level
4//! and the [`crate::cluster::pool::ServerPool`] topology:
5//!
6//! * `DC_ONE` reads pick the rack-local replica via the snitch.
7//! * `DC_ONE` writes fan out to every replica in the local DC.
8//! * `DC_QUORUM` / `DC_SAFE_QUORUM` reads fan out to every replica
9//!   in the local DC.
10//! * `DC_EACH_SAFE_QUORUM` writes fan out per-DC, walking the
11//!   per-DC racks via the preselected rack from
12//!   [`crate::cluster::pool::ServerPool::preselect_remote_racks`].
13//!
14//! The actual outbound delivery happens through the per-peer
15//! [`crate::net::ConnPool`]s; this module produces a
16//! [`DispatchPlan`] (the list of replica peers a request must be
17//! routed to) and exposes the planning logic so it can be tested
18//! independently of the runtime fan-out.
19//!
20//! # Examples
21//!
22//! ```
23//! use dynomite::cluster::dispatch::{ClusterDispatcher, DispatchPlan};
24//! use dynomite::cluster::pool::{PoolConfig, ServerPool};
25//! use dynomite::cluster::peer::{Peer, PeerEndpoint};
26//! use dynomite::hashkit::DynToken;
27//! use dynomite::msg::{Msg, MsgType};
28//! use std::sync::Arc;
29//!
30//! let cfg = PoolConfig {
31//!     dc: "d".into(), rack: "r".into(),
32//!     ..PoolConfig::default()
33//! };
34//! let local = Peer::new(
35//!     0, PeerEndpoint::tcp("h".into(), 1), "r".into(), "d".into(),
36//!     vec![DynToken::from_u32(0)], true, true, false,
37//! );
38//! let pool = Arc::new(ServerPool::new(cfg, vec![local]));
39//! let disp = ClusterDispatcher::new(pool);
40//! let req = Msg::new(1, MsgType::ReqRedisGet, true);
41//! let plan = disp.plan(&req, b"foo");
42//! assert!(matches!(plan, DispatchPlan::LocalDatastore));
43//! ```
44
45use std::sync::atomic::{AtomicU64, Ordering};
46use std::sync::Arc;
47
48use tokio::sync::mpsc;
49
50use crate::cluster::pool::ServerPool;
51use crate::cluster::snitch::{rack_distance, RackDistance};
52use crate::cluster::vnode;
53use crate::conf::HashType as ConfHashType;
54use crate::hashkit::{self, HashType};
55use crate::io::mbuf::MbufPool;
56use crate::msg::{ConsistencyLevel, Msg, MsgRouting, MsgType};
57use crate::net::dispatcher::{DispatchOutcome, Dispatcher, OutboundEnvelope, ServerSink};
58use crate::net::server::OutboundRequest;
59
60/// Process-global Prometheus-friendly counter of
61/// shadow-distribution disagreements. The dispatcher bumps this
62/// counter every time the configured `distribution` and the
63/// configured `distribution_shadow` choose different peers for
64/// the same key. Exposed for both the stats endpoint and the
65/// integration test that exercises shadow mode.
66///
67/// # Examples
68///
69/// ```
70/// use dynomite::cluster::dispatch::distribution_shadow_disagreement_total;
71/// let _seen = distribution_shadow_disagreement_total();
72/// ```
73#[must_use]
74pub fn distribution_shadow_disagreement_total() -> u64 {
75    SHADOW_DISAGREEMENTS.load(Ordering::Relaxed)
76}
77
78/// Reset the shadow-disagreement counter. Used by integration
79/// tests that need a clean baseline; never called from
80/// production code.
81///
82/// # Examples
83///
84/// ```
85/// use dynomite::cluster::dispatch::reset_distribution_shadow_disagreement_total;
86/// reset_distribution_shadow_disagreement_total();
87/// ```
88pub fn reset_distribution_shadow_disagreement_total() {
89    SHADOW_DISAGREEMENTS.store(0, Ordering::Relaxed);
90}
91
92static SHADOW_DISAGREEMENTS: AtomicU64 = AtomicU64::new(0);
93
94fn bump_shadow_disagreement() {
95    SHADOW_DISAGREEMENTS.fetch_add(1, Ordering::Relaxed);
96}
97
98/// Build the `dispatch.plan` info span and enter it. Returns the
99/// originating client request span (captured before the plan
100/// span was entered) plus the entered plan-span guard. Factored
101/// out so [`ClusterDispatcher::dispatch`] stays inside the
102/// project's per-function line budget.
103fn enter_plan_span(
104    req_id: u64,
105    plan: &DispatchPlan,
106) -> (tracing::Span, tracing::span::EnteredSpan) {
107    let req_span = tracing::Span::current();
108    let kind: &'static str = match plan {
109        DispatchPlan::Drop => "drop",
110        DispatchPlan::NoTargets => "no_targets",
111        DispatchPlan::LocalDatastore => "local_datastore",
112        DispatchPlan::Replicas { .. } => "replicas",
113    };
114    let targets = match plan {
115        DispatchPlan::Replicas { targets, .. } => targets.len(),
116        _ => 0,
117    };
118    let span = tracing::info_span!("dispatch.plan", req_id, plan = kind, targets,).entered();
119    (req_span, span)
120}
121
122fn map_hash(h: ConfHashType) -> HashType {
123    match h {
124        ConfHashType::OneAtATime => HashType::OneAtATime,
125        ConfHashType::Md5 => HashType::Md5,
126        ConfHashType::Crc16 => HashType::Crc16,
127        ConfHashType::Crc32 => HashType::Crc32,
128        ConfHashType::Crc32a => HashType::Crc32a,
129        ConfHashType::Fnv1_64 => HashType::Fnv1_64,
130        ConfHashType::Fnv1a64 => HashType::Fnv1a_64,
131        ConfHashType::Fnv1_32 => HashType::Fnv1_32,
132        ConfHashType::Fnv1a32 => HashType::Fnv1a_32,
133        ConfHashType::Hsieh => HashType::Hsieh,
134        ConfHashType::Murmur => HashType::Murmur,
135        ConfHashType::Jenkins => HashType::Jenkins,
136        ConfHashType::Murmur3 => HashType::Murmur3,
137        ConfHashType::Murmur3X64_64 => HashType::Murmur3X64_64,
138    }
139}
140
141/// One replica target produced by [`ClusterDispatcher::plan`].
142#[derive(Clone, Debug, Eq, PartialEq)]
143pub struct ReplicaTarget {
144    /// Index of the target peer in the pool's peer array.
145    pub peer_idx: u32,
146    /// Datacenter name.
147    pub dc: String,
148    /// Rack name.
149    pub rack: String,
150    /// True when the target is the local node.
151    pub is_local: bool,
152}
153
154/// Dispatch plan produced by the cluster dispatcher.
155///
156/// `LocalDatastore` is the early-return branch the reference
157/// engine takes when the routing tag is `ROUTING_LOCAL_NODE_ONLY`
158/// (or when the request is destined for the local node and the
159/// topology has only one peer); the per-connection driver then
160/// hands the request off to its server-side connection pool.
161#[derive(Clone, Debug, Eq, PartialEq)]
162pub enum DispatchPlan {
163    /// Hand the request straight to the local datastore.
164    LocalDatastore,
165    /// Forward to one or more peer replicas. The carried
166    /// consistency level is the one the planner resolved for
167    /// this request (after applying any bucket-type override),
168    /// so the dispatcher's reply coalescer does not have to
169    /// re-resolve it.
170    Replicas {
171        /// Replica peers the request must be routed to.
172        targets: Vec<ReplicaTarget>,
173        /// Resolved consistency level.
174        consistency: ConsistencyLevel,
175    },
176    /// Reply with an error: the cluster has no quorum-eligible
177    /// targets.
178    NoTargets,
179    /// Drop the request (`QUIT`-style swallow).
180    Drop,
181}
182
183/// Cluster-aware dispatcher.
184#[derive(Debug, Clone)]
185pub struct ClusterDispatcher {
186    pool: Arc<ServerPool>,
187    /// Outbound channel feeding the local datastore driver. When
188    /// `None`, `LocalDatastore` plans short-circuit to `Pending`
189    /// without forwarding (used by tests that do not need a real
190    /// backend). When set, requests for the local node are
191    /// encoded onto the wire and shipped to the [`crate::net::ServerConn`]
192    /// task that drives the redis / memcache backend.
193    backend: Option<mpsc::Sender<OutboundRequest>>,
194    /// Per-peer outbound channel for cross-DC fan-out. Keyed by
195    /// `Peer::idx`. When a `DispatchPlan::Replicas` plan names a
196    /// non-local peer, the dispatcher forwards via the matching
197    /// channel to a `DnodeServerConn` task. Peers without a
198    /// wired channel are skipped (`Pending`); when no replica
199    /// is reachable for the consistency level the dispatcher
200    /// falls back to a `DynomiteNoQuorumAchieved` error response.
201    peer_backends: std::collections::HashMap<u32, mpsc::Sender<OutboundRequest>>,
202    /// Mbuf pool used to render synthetic error payloads.
203    /// `MbufPool` already wraps an `Arc`, so cloning the
204    /// dispatcher (and the pool with it) shares the same free
205    /// list across every cluster handle.
206    mbuf_pool: MbufPool,
207    /// Optional node-local hint store. When set AND the pool's
208    /// `enable_hinted_handoff` flag is true, write requests
209    /// targeted at peers in [`crate::cluster::peer::PeerState::Down`]
210    /// (or at peers whose outbound channel is closed / full) are
211    /// recorded as hints and counted toward the consistency
212    /// threshold, instead of being silently skipped.
213    hint_store: Option<Arc<crate::cluster::hints::HintStore>>,
214    /// Optional failure-cause metrics handle. When wired, every
215    /// error-producing branch in the dispatcher increments the
216    /// matching counter via the [`crate::stats::FailureMetrics`]
217    /// accumulator. When `None`, the dispatcher's behaviour is
218    /// unchanged.
219    failure_metrics: Option<Arc<crate::stats::FailureMetrics>>,
220    /// Optional vector index registry. When set, the dispatcher
221    /// recognises RediSearch FT.* commands and the HSET
222    /// interception path on registered prefixes; FT.* keywords
223    /// short-circuit to a synthesised RESP reply built from
224    /// [`crate::proto::redis::ft::dispatch`], and HSETs against
225    /// indexed prefixes upsert the row through
226    /// [`crate::proto::redis::ft::maybe_index_hset`] before they
227    /// fall through to the standard backend path. When `None`,
228    /// the dispatcher's behaviour is unchanged.
229    vector_registry: Option<Arc<crate::vector::registry::VectorRegistry>>,
230}
231
232impl ClusterDispatcher {
233    /// Wrap a [`ServerPool`] in a dispatcher.
234    ///
235    /// # Examples
236    ///
237    /// ```
238    /// # use dynomite::cluster::dispatch::ClusterDispatcher;
239    /// # use dynomite::cluster::pool::{PoolConfig, ServerPool};
240    /// # use dynomite::cluster::peer::{Peer, PeerEndpoint};
241    /// # use dynomite::hashkit::DynToken;
242    /// # use std::sync::Arc;
243    /// # let cfg = PoolConfig::default();
244    /// # let local = Peer::new(
245    /// #    0, PeerEndpoint::tcp("h".into(), 1), "r".into(), "d".into(),
246    /// #    vec![DynToken::from_u32(0)], true, true, false,
247    /// # );
248    /// let pool = Arc::new(ServerPool::new(cfg, vec![local]));
249    /// let disp = ClusterDispatcher::new(pool);
250    /// let _ = disp.pool();
251    /// ```
252    #[must_use]
253    pub fn new(pool: Arc<ServerPool>) -> Self {
254        Self {
255            pool,
256            backend: None,
257            peer_backends: std::collections::HashMap::new(),
258            mbuf_pool: MbufPool::default(),
259            hint_store: None,
260            failure_metrics: None,
261            vector_registry: None,
262        }
263    }
264
265    /// Override the dispatcher's mbuf pool. Useful when the
266    /// embedding wants every synthetic error payload to come from
267    /// the same recycled buffers as the rest of the engine.
268    ///
269    /// # Examples
270    ///
271    /// ```
272    /// # use dynomite::cluster::dispatch::ClusterDispatcher;
273    /// # use dynomite::cluster::pool::{PoolConfig, ServerPool};
274    /// # use dynomite::cluster::peer::{Peer, PeerEndpoint};
275    /// # use dynomite::hashkit::DynToken;
276    /// # use dynomite::io::mbuf::MbufPool;
277    /// # use std::sync::Arc;
278    /// # let cfg = PoolConfig::default();
279    /// # let local = Peer::new(
280    /// #    0, PeerEndpoint::tcp("h".into(), 1), "r".into(), "d".into(),
281    /// #    vec![DynToken::from_u32(0)], true, true, false,
282    /// # );
283    /// let pool = Arc::new(ServerPool::new(cfg, vec![local]));
284    /// let _disp = ClusterDispatcher::new(pool).with_mbuf_pool(MbufPool::default());
285    /// ```
286    #[must_use]
287    pub fn with_mbuf_pool(mut self, pool: MbufPool) -> Self {
288        self.mbuf_pool = pool;
289        self
290    }
291
292    /// Borrow the dispatcher's mbuf pool. Exposed so embedders
293    /// can reuse the same pool when building synthetic responses
294    /// outside the dispatcher's own code paths.
295    #[must_use]
296    pub fn mbuf_pool(&self) -> &MbufPool {
297        &self.mbuf_pool
298    }
299
300    /// Attach a backend request channel. Calls to [`Self::dispatch`]
301    /// that produce a [`DispatchPlan::LocalDatastore`] plan will
302    /// forward the request bytes onto this channel for the local
303    /// datastore driver to write to the backend.
304    ///
305    /// The channel sender must be the request side of a
306    /// [`crate::net::ServerConn`] task; multiple senders cloned from
307    /// the same channel are fine.
308    #[must_use]
309    pub fn with_backend(mut self, backend: mpsc::Sender<OutboundRequest>) -> Self {
310        self.backend = Some(backend);
311        self
312    }
313
314    /// Attach an outbound channel for a single peer (by
315    /// `Peer::idx`). The supplied sender feeds a
316    /// [`crate::net::DnodeServerConn`] task that writes
317    /// dnode-framed requests to the peer's `dyn_listen` and
318    /// routes the response back through the per-request
319    /// responder channel.
320    ///
321    /// Wiring is additive: call this once per non-local peer.
322    /// Calling it again with the same `peer_idx` replaces the
323    /// previous sender (used by reconnect supervisors that
324    /// rebuild channels on restart).
325    #[must_use]
326    pub fn with_peer_backend(
327        mut self,
328        peer_idx: u32,
329        sender: mpsc::Sender<OutboundRequest>,
330    ) -> Self {
331        self.peer_backends.insert(peer_idx, sender);
332        self
333    }
334
335    /// Whether a backend channel is wired.
336    #[must_use]
337    pub fn has_backend(&self) -> bool {
338        self.backend.is_some()
339    }
340
341    /// Number of peer-backend channels wired.
342    #[must_use]
343    pub fn peer_backend_count(&self) -> usize {
344        self.peer_backends.len()
345    }
346
347    /// Borrow the underlying pool.
348    #[must_use]
349    pub fn pool(&self) -> &Arc<ServerPool> {
350        &self.pool
351    }
352
353    /// Attach a [`crate::cluster::hints::HintStore`].
354    ///
355    /// When set AND the pool's `enable_hinted_handoff` flag is
356    /// `true`, write requests targeted at peers in
357    /// [`crate::cluster::peer::PeerState::Down`] (or at peers
358    /// whose outbound channel is closed / full) are stored as
359    /// hints and counted toward the consistency threshold. The
360    /// background drainer task in `dynomited` is responsible
361    /// for shipping the hints back to the peer once it returns
362    /// to [`crate::cluster::peer::PeerState::Normal`]. Without
363    /// this builder call (or with `enable_hinted_handoff: false`)
364    /// the dispatcher behaviour is unchanged: a Down or
365    /// unreachable target is silently skipped.
366    ///
367    /// # Examples
368    ///
369    /// ```
370    /// # use std::sync::Arc;
371    /// # use dynomite::cluster::dispatch::ClusterDispatcher;
372    /// # use dynomite::cluster::hints::HintStore;
373    /// # use dynomite::cluster::peer::{Peer, PeerEndpoint};
374    /// # use dynomite::cluster::pool::{PoolConfig, ServerPool};
375    /// # use dynomite::hashkit::DynToken;
376    /// let cfg = PoolConfig { enable_hinted_handoff: true, ..PoolConfig::default() };
377    /// let local = Peer::new(
378    ///     0, PeerEndpoint::tcp("h".into(), 1), "r".into(), "d".into(),
379    ///     vec![DynToken::from_u32(0)], true, true, false,
380    /// );
381    /// let pool = Arc::new(ServerPool::new(cfg, vec![local]));
382    /// let store = Arc::new(HintStore::new(64 * 1024 * 1024));
383    /// let _disp = ClusterDispatcher::new(pool).with_hint_store(store);
384    /// ```
385    #[must_use]
386    pub fn with_hint_store(mut self, store: Arc<crate::cluster::hints::HintStore>) -> Self {
387        self.hint_store = Some(store);
388        self
389    }
390
391    /// Borrow the wired hint store, if any.
392    #[must_use]
393    pub fn hint_store(&self) -> Option<&Arc<crate::cluster::hints::HintStore>> {
394        self.hint_store.as_ref()
395    }
396
397    /// Attach a [`crate::stats::FailureMetrics`] handle.
398    ///
399    /// When wired, each error-producing branch in the
400    /// dispatcher (no-targets, peer-channel-full,
401    /// peer-channel-closed, backend-channel-full,
402    /// backend-channel-closed, response-timeout) increments
403    /// the matching counter so an operator can pull the
404    /// per-cause histogram off the `/stats` and `/metrics`
405    /// endpoints. The default behaviour is unchanged when no
406    /// metrics handle is supplied.
407    ///
408    /// # Examples
409    ///
410    /// ```
411    /// # use std::sync::Arc;
412    /// # use dynomite::cluster::dispatch::ClusterDispatcher;
413    /// # use dynomite::cluster::peer::{Peer, PeerEndpoint};
414    /// # use dynomite::cluster::pool::{PoolConfig, ServerPool};
415    /// # use dynomite::hashkit::DynToken;
416    /// # use dynomite::stats::FailureMetrics;
417    /// let cfg = PoolConfig::default();
418    /// let local = Peer::new(
419    ///     0, PeerEndpoint::tcp("h".into(), 1), "r".into(), "d".into(),
420    ///     vec![DynToken::from_u32(0)], true, true, false,
421    /// );
422    /// let pool = Arc::new(ServerPool::new(cfg, vec![local]));
423    /// let metrics = Arc::new(FailureMetrics::new());
424    /// let _disp = ClusterDispatcher::new(pool).with_failure_metrics(metrics);
425    /// ```
426    #[must_use]
427    pub fn with_failure_metrics(mut self, metrics: Arc<crate::stats::FailureMetrics>) -> Self {
428        self.failure_metrics = Some(metrics);
429        self
430    }
431
432    /// Borrow the wired failure-metrics handle, if any.
433    #[must_use]
434    pub fn failure_metrics(&self) -> Option<&Arc<crate::stats::FailureMetrics>> {
435        self.failure_metrics.as_ref()
436    }
437
438    /// Attach a [`crate::vector::registry::VectorRegistry`].
439    ///
440    /// When wired, parsed RediSearch FT.* requests
441    /// (`FT.CREATE` / `FT.SEARCH` / `FT.INFO` / `FT.LIST` /
442    /// `FT.DROPINDEX`) short-circuit to the registry-backed
443    /// executor in [`crate::proto::redis::ft`] and the synthesised
444    /// RESP bytes are returned to the client as
445    /// [`crate::net::DispatchOutcome::Inline`]. HSET requests
446    /// whose key matches a registered prefix are routed through
447    /// [`crate::proto::redis::ft::maybe_index_hset`] before they
448    /// fall through to the standard backend path; a malformed or
449    /// missing vector field surfaces as
450    /// [`crate::net::DispatchOutcome::Error`] with a `-ERR ...`
451    /// reply rather than reaching the backend.
452    ///
453    /// Without this builder call, the dispatcher's behaviour is
454    /// unchanged: FT.* keywords are forwarded to the local
455    /// datastore (which typically rejects them with `-ERR
456    /// unknown command`).
457    ///
458    /// # Examples
459    ///
460    /// ```
461    /// # use std::sync::Arc;
462    /// # use dynomite::cluster::dispatch::ClusterDispatcher;
463    /// # use dynomite::cluster::peer::{Peer, PeerEndpoint};
464    /// # use dynomite::cluster::pool::{PoolConfig, ServerPool};
465    /// # use dynomite::hashkit::DynToken;
466    /// # use dynomite::vector::registry::VectorRegistry;
467    /// let cfg = PoolConfig::default();
468    /// let local = Peer::new(
469    ///     0, PeerEndpoint::tcp("h".into(), 1), "r".into(), "d".into(),
470    ///     vec![DynToken::from_u32(0)], true, true, false,
471    /// );
472    /// let pool = Arc::new(ServerPool::new(cfg, vec![local]));
473    /// let registry = Arc::new(VectorRegistry::new());
474    /// let _disp = ClusterDispatcher::new(pool).with_vector_registry(registry);
475    /// ```
476    #[must_use]
477    pub fn with_vector_registry(
478        mut self,
479        registry: Arc<crate::vector::registry::VectorRegistry>,
480    ) -> Self {
481        self.vector_registry = Some(registry);
482        self
483    }
484
485    /// Borrow the wired vector-index registry, if any.
486    #[must_use]
487    pub fn vector_registry(&self) -> Option<&Arc<crate::vector::registry::VectorRegistry>> {
488        self.vector_registry.as_ref()
489    }
490
491    /// True when both the hint store is wired AND the pool has
492    /// `enable_hinted_handoff: true`. Hot-path predicate.
493    #[must_use]
494    pub fn hinted_handoff_active(&self) -> bool {
495        self.hint_store.is_some() && self.pool.config().enable_hinted_handoff
496    }
497
498    /// Compute the routing plan for `req` with the supplied key.
499    ///
500    /// `key` is the primary key of the request (the first key
501    /// returned by [`Msg::keys`] for parsed redis / memcache
502    /// commands, or an empty slice for argument-less commands).
503    ///
504    /// The function never panics; it consults the live peer table
505    /// behind the pool's `RwLock` and returns
506    /// [`DispatchPlan::NoTargets`] when the topology cannot
507    /// satisfy the request.
508    ///
509    /// # Examples
510    ///
511    /// See the module-level example.
512    #[must_use]
513    pub fn plan(&self, req: &Msg, key: &[u8]) -> DispatchPlan {
514        let cfg = self.pool.config();
515        let peers = self.pool.peers().read();
516        if peers.is_empty() {
517            self.record_no_targets_metric(cfg, ConsistencyLevel::default());
518            return DispatchPlan::NoTargets;
519        }
520        if matches!(req.routing(), MsgRouting::LocalNodeOnly) {
521            return DispatchPlan::LocalDatastore;
522        }
523        if key.is_empty() {
524            return DispatchPlan::LocalDatastore;
525        }
526        let token = hashkit::hash(map_hash(cfg.hash), key);
527        let key_hash64 = hashkit::hash64(map_hash(cfg.hash), key);
528        let bucket = crate::proto::redis::bucket_name(key);
529        let bucket_type = cfg.resolve_bucket_type(bucket);
530        let is_read = matches!(req.ty(), MsgType::Unknown) || req.flags().is_read;
531        let consistency = match (bucket_type, is_read) {
532            (Some(bt), true) => bt.read_consistency,
533            (Some(bt), false) => bt.write_consistency,
534            (None, true) => cfg.read_consistency,
535            (None, false) => cfg.write_consistency,
536        };
537        let n_val_cap = bucket_type.map_or(0, |bt| bt.n_val);
538        let dcs = self.pool.datacenters().read();
539        // When hinted handoff is active and the request is a
540        // write, peers in `Down` are kept in the routable set
541        // so the dispatcher can hint them at fan-out time. The
542        // hint counts toward the consistency threshold; the
543        // background drainer ships the hint once the peer
544        // returns to `Normal`. For reads (and for writes when
545        // handoff is off), Down peers are filtered out as
546        // before.
547        let include_down = self.hinted_handoff_active() && !is_read;
548        let routable = collect_routable(
549            &dcs,
550            &peers,
551            &token,
552            key_hash64,
553            cfg.distribution,
554            include_down,
555        );
556        if let Some(shadow) = cfg.distribution_shadow {
557            if shadow != cfg.distribution {
558                let shadow_routable =
559                    collect_routable(&dcs, &peers, &token, key_hash64, shadow, include_down);
560                if !plans_agree(&routable, &shadow_routable) {
561                    bump_shadow_disagreement();
562                    tracing::debug!(
563                        target: "dynomite::dispatch::shadow",
564                        live = cfg.distribution.as_str(),
565                        shadow = shadow.as_str(),
566                        "shadow distribution disagreed on key route"
567                    );
568                }
569            }
570        }
571        if routable.is_empty() {
572            self.record_no_targets_metric(cfg, consistency);
573            return DispatchPlan::NoTargets;
574        }
575        let (local, remote): (Vec<_>, Vec<_>) = routable
576            .into_iter()
577            .partition(|(dc_idx, _, _)| dcs[*dc_idx].name() == cfg.dc);
578        let plan =
579            plan_with_consistency(cfg, &dcs, &peers, consistency, req.routing(), local, remote);
580        let plan = cap_replicas(plan, n_val_cap);
581        if matches!(plan, DispatchPlan::NoTargets) {
582            self.record_no_targets_metric(cfg, consistency);
583        }
584        plan
585    }
586
587    /// Record a `dispatch_no_targets_total` metric tick using
588    /// the local-DC labels, when a metrics handle is wired.
589    fn record_no_targets_metric(
590        &self,
591        cfg: &crate::cluster::pool::PoolConfig,
592        consistency: ConsistencyLevel,
593    ) {
594        if let Some(m) = self.failure_metrics.as_ref() {
595            m.record_no_targets(&cfg.dc, &cfg.rack, consistency);
596        }
597    }
598
599    /// Resolve the destination DC of a peer (for per-peer
600    /// failure metrics). Local peers and unknown indexes both
601    /// fall back to the configured local DC.
602    fn peer_dc_label(&self, peer_idx: u32) -> String {
603        let peers = self.pool.peers().read();
604        peers
605            .get(peer_idx as usize)
606            .map_or_else(|| self.pool.config().dc.clone(), |p| p.dc().to_string())
607    }
608}
609
610/// Apply the bucket-type `n_val` fan-out cap to a freshly
611/// computed plan. Only `DispatchPlan::Replicas` is affected; the
612/// other variants pass through unchanged. `cap == 0` means "no
613/// cap" and is the no-op used for keys without a matching bucket
614/// type.
615fn cap_replicas(plan: DispatchPlan, cap: u8) -> DispatchPlan {
616    if cap == 0 {
617        return plan;
618    }
619    let cap = cap as usize;
620    match plan {
621        DispatchPlan::Replicas {
622            mut targets,
623            consistency,
624        } if targets.len() > cap => {
625            targets.truncate(cap);
626            DispatchPlan::Replicas {
627                targets,
628                consistency,
629            }
630        }
631        other => other,
632    }
633}
634
635fn plans_agree(a: &[(usize, usize, u32)], b: &[(usize, usize, u32)]) -> bool {
636    if a.len() != b.len() {
637        return false;
638    }
639    let mut a_idx: Vec<u32> = a.iter().map(|t| t.2).collect();
640    let mut b_idx: Vec<u32> = b.iter().map(|t| t.2).collect();
641    a_idx.sort_unstable();
642    b_idx.sort_unstable();
643    a_idx == b_idx
644}
645
646fn collect_routable(
647    dcs: &[crate::cluster::Datacenter],
648    peers: &[crate::cluster::peer::Peer],
649    token: &crate::hashkit::DynToken,
650    hash64: u64,
651    distribution: crate::conf::Distribution,
652    include_down: bool,
653) -> Vec<(usize, usize, u32)> {
654    let mut routable: Vec<(usize, usize, u32)> = Vec::new();
655    for (dc_idx, dc) in dcs.iter().enumerate() {
656        for (rack_idx, rack) in dc.racks().iter().enumerate() {
657            let candidate = match (distribution, rack.random_slices()) {
658                (crate::conf::Distribution::RandomSlicing, Some(slices)) => {
659                    // Map the chosen claimant name back onto a
660                    // peer index. The slice table holds peer
661                    // pname strings (host:port) so the
662                    // resolution is a linear scan over the
663                    // rack's peer set, which is small (peers
664                    // per rack, not the whole pool).
665                    slices.claimant_for(hash64).and_then(|name| {
666                        peers.iter().find_map(|p| {
667                            if p.dc() == dc.name()
668                                && p.rack() == rack.name()
669                                && p.endpoint().pname() == name
670                            {
671                                Some(p.idx())
672                            } else {
673                                None
674                            }
675                        })
676                    })
677                }
678                _ => vnode::dispatch(rack.continuums(), token),
679            };
680            if let Some(peer_idx) = candidate {
681                if let Some(peer) = peers.get(peer_idx as usize) {
682                    let state = peer.state();
683                    let accept = state.is_routable()
684                        || (include_down && matches!(state, crate::cluster::peer::PeerState::Down));
685                    if accept {
686                        routable.push((dc_idx, rack_idx, peer_idx));
687                    }
688                }
689            }
690        }
691    }
692    routable
693}
694
695fn build_target(
696    dcs: &[crate::cluster::Datacenter],
697    peers: &[crate::cluster::peer::Peer],
698    dc_idx: usize,
699    rack_idx: usize,
700    peer_idx: u32,
701) -> ReplicaTarget {
702    let dc_name = dcs[dc_idx].name().to_string();
703    let rack_name = dcs[dc_idx].racks()[rack_idx].name().to_string();
704    let is_local = peers
705        .get(peer_idx as usize)
706        .is_some_and(crate::cluster::peer::Peer::is_local);
707    ReplicaTarget {
708        peer_idx,
709        dc: dc_name,
710        rack: rack_name,
711        is_local,
712    }
713}
714
715fn plan_with_consistency(
716    cfg: &crate::cluster::pool::PoolConfig,
717    dcs: &[crate::cluster::Datacenter],
718    peers: &[crate::cluster::peer::Peer],
719    consistency: ConsistencyLevel,
720    routing: MsgRouting,
721    local: Vec<(usize, usize, u32)>,
722    remote: Vec<(usize, usize, u32)>,
723) -> DispatchPlan {
724    let want_per_dc_fanout = matches!(consistency, ConsistencyLevel::DcEachSafeQuorum)
725        || matches!(routing, MsgRouting::AllNodesAllRacksAllDcs);
726    let mut targets: Vec<ReplicaTarget> = Vec::new();
727    match consistency {
728        ConsistencyLevel::DcOne => {
729            if local.is_empty() {
730                return DispatchPlan::NoTargets;
731            }
732            let mut best: Option<(RackDistance, (usize, usize, u32))> = None;
733            for (dc_idx, rack_idx, peer_idx) in local {
734                let rack_name = dcs[dc_idx].racks()[rack_idx].name();
735                let d = rack_distance(&cfg.dc, &cfg.rack, &cfg.dc, rack_name);
736                let take = match best {
737                    None => true,
738                    Some((bd, _)) => d.cost() < bd.cost(),
739                };
740                if take {
741                    best = Some((d, (dc_idx, rack_idx, peer_idx)));
742                }
743            }
744            if let Some((_, (dc_idx, rack_idx, peer_idx))) = best {
745                let is_local_node = peers
746                    .get(peer_idx as usize)
747                    .is_some_and(crate::cluster::peer::Peer::is_local);
748                if is_local_node {
749                    return DispatchPlan::LocalDatastore;
750                }
751                targets.push(build_target(dcs, peers, dc_idx, rack_idx, peer_idx));
752            }
753        }
754        ConsistencyLevel::DcQuorum | ConsistencyLevel::DcSafeQuorum => {
755            if local.is_empty() {
756                return DispatchPlan::NoTargets;
757            }
758            for (dc_idx, rack_idx, peer_idx) in local {
759                targets.push(build_target(dcs, peers, dc_idx, rack_idx, peer_idx));
760            }
761        }
762        ConsistencyLevel::DcEachSafeQuorum => {
763            if local.is_empty() && remote.is_empty() {
764                return DispatchPlan::NoTargets;
765            }
766            for (dc_idx, rack_idx, peer_idx) in local.iter().chain(remote.iter()) {
767                targets.push(build_target(dcs, peers, *dc_idx, *rack_idx, *peer_idx));
768            }
769        }
770    }
771    if want_per_dc_fanout && !remote.is_empty() {
772        for (dc_idx, rack_idx, peer_idx) in remote {
773            if !targets.iter().any(|t| t.peer_idx == peer_idx) {
774                targets.push(build_target(dcs, peers, dc_idx, rack_idx, peer_idx));
775            }
776        }
777    }
778    if targets.is_empty() {
779        return DispatchPlan::LocalDatastore;
780    }
781    DispatchPlan::Replicas {
782        targets,
783        consistency,
784    }
785}
786
787impl Dispatcher for ClusterDispatcher {
788    #[allow(
789        clippy::too_many_lines,
790        reason = "single dispatch fn must enumerate every plan; splitting hides the planner-to-effect mapping"
791    )]
792    fn dispatch(&self, req: Msg, responder: ServerSink) -> DispatchOutcome {
793        if req.flags().quit {
794            return DispatchOutcome::Drop;
795        }
796        // FT.* / HSET interception. Runs before the routing
797        // planner so vector-index commands never visit the
798        // backend, and HSETs against an indexed prefix get the
799        // vector mirrored into the in-process registry before
800        // the standard storage write fans out.
801        if let Some(reg) = self.vector_registry.as_ref() {
802            if let Some(outcome) = self.intercept_vector_command(reg.as_ref(), &req) {
803                return outcome;
804            }
805        }
806        // Inspect the request without consuming it: pull the routing
807        // bytes from the first parsed key. `KeyPos::tag_bytes` returns
808        // the hash-tag-aware sub-range when one was parsed and the full
809        // key otherwise, which is the slice shape `plan` expects.
810        // Requests with no parsed keys (e.g. PING, INFO) fall through
811        // with an empty slice; `plan` handles that by routing to the
812        // local datastore.
813        let key: Vec<u8> = req
814            .keys()
815            .first()
816            .map(|kp| kp.tag_bytes().to_vec())
817            .unwrap_or_default();
818        let plan = self.plan(&req, &key);
819        let (req_span, _plan_span) = enter_plan_span(req.id(), &plan);
820        match plan {
821            DispatchPlan::Drop => DispatchOutcome::Drop,
822            DispatchPlan::NoTargets => {
823                let err_type = if matches!(req.ty(), MsgType::ReqRedisGet | MsgType::ReqRedisSet) {
824                    MsgType::RspRedisError
825                } else {
826                    MsgType::RspMcServerError
827                };
828                let rsp = crate::msg::response::make_error(
829                    &req,
830                    err_type,
831                    0,
832                    crate::msg::DynErrorCode::DynomiteNoQuorumAchieved,
833                    &self.mbuf_pool,
834                );
835                DispatchOutcome::Error(rsp)
836            }
837            DispatchPlan::LocalDatastore => {
838                if let Some(tx) = self.backend.as_ref() {
839                    // Snapshot the wire bytes from the parsed mbuf
840                    // chain. The chain is the original on-the-wire
841                    // sequence the parser walked, so this is a
842                    // faithful relay rather than a re-encode.
843                    let bytes: Vec<u8> = req
844                        .mbufs()
845                        .iter()
846                        .flat_map(|b| b.readable().to_vec())
847                        .collect();
848                    if bytes.is_empty() {
849                        // Parsed request with no replayable bytes
850                        // (e.g. a synthetic `Msg`) - drop rather
851                        // than enqueue a no-op on the backend.
852                        return DispatchOutcome::Drop;
853                    }
854                    let env = OutboundRequest {
855                        bytes,
856                        req_id: req.id(),
857                        responder,
858                        span: req_span.clone(),
859                        ty: crate::proto::dnode::DmsgType::Req,
860                        target_peer_idx: None,
861                    };
862                    if let Err(err) = tx.try_send(env) {
863                        // Backend channel full or closed: surface
864                        // an error to the client immediately.
865                        if let Some(m) = self.failure_metrics.as_ref() {
866                            match err {
867                                tokio::sync::mpsc::error::TrySendError::Full(_) => {
868                                    m.record_backend_send_full();
869                                }
870                                tokio::sync::mpsc::error::TrySendError::Closed(_) => {
871                                    m.record_backend_send_closed();
872                                }
873                            }
874                        }
875                        let err_type =
876                            if matches!(req.ty(), MsgType::ReqRedisGet | MsgType::ReqRedisSet) {
877                                MsgType::RspRedisError
878                            } else {
879                                MsgType::RspMcServerError
880                            };
881                        let rsp = crate::msg::response::make_error(
882                            &req,
883                            err_type,
884                            0,
885                            crate::msg::DynErrorCode::DynomiteNoQuorumAchieved,
886                            &self.mbuf_pool,
887                        );
888                        return DispatchOutcome::Error(rsp);
889                    }
890                }
891                DispatchOutcome::Pending
892            }
893            DispatchPlan::Replicas {
894                targets,
895                consistency,
896            } => self.dispatch_replicas(&req, &req_span, &targets, consistency, responder),
897        }
898    }
899}
900
901impl ClusterDispatcher {
902    /// Fan a request out across replicas and install the per-
903    /// request reply coalescer.
904    ///
905    /// The single-target case short-circuits to a direct
906    /// forward (no coalescer needed). The multi-target case
907    /// spawns a coalescer task on the ambient tokio runtime; the
908    /// task drains the per-target replies, picks one according
909    /// to the consistency level, and forwards the chosen reply
910    /// to the original `responder`. Divergent replicas are
911    /// scheduled for read-repair writes via the same
912    /// `peer_backends` channels.
913    ///
914    /// When [`hinted_handoff_active`](Self::hinted_handoff_active)
915    /// reports true and the request is a write, targets that
916    /// are currently in [`crate::cluster::peer::PeerState::Down`]
917    /// are recorded in the hint store instead of being sent.
918    /// A synthetic `+OK\r\n` reply is fed to the coalescer on
919    /// the hinted target's behalf so the consistency threshold
920    /// can be met by the surviving replicas plus the hint(s).
921    /// On `try_send` failure (channel closed or full) for a
922    /// non-Down peer, the dispatcher likewise falls back to
923    /// hinting before declaring the target lost.
924    fn dispatch_replicas(
925        &self,
926        req: &Msg,
927        req_span: &tracing::Span,
928        targets: &[ReplicaTarget],
929        consistency: ConsistencyLevel,
930        responder: ServerSink,
931    ) -> DispatchOutcome {
932        if targets.is_empty() {
933            return DispatchOutcome::Drop;
934        }
935        // Snapshot the wire bytes once. Each target gets its
936        // own clone (the ServerConn / DnodeServerConn takes
937        // ownership of `bytes`).
938        let bytes: Vec<u8> = req
939            .mbufs()
940            .iter()
941            .flat_map(|b| b.readable().to_vec())
942            .collect();
943        if bytes.is_empty() {
944            return DispatchOutcome::Drop;
945        }
946        // Snapshot the per-target current state so we do not
947        // re-acquire the peer-table lock inside the per-target
948        // loop. Local peers are always treated as Normal.
949        let peer_states = self.snapshot_peer_states(targets);
950        let is_read = matches!(req.ty(), MsgType::Unknown) || req.flags().is_read;
951        let is_write = !is_read;
952        let handoff_active = self.hinted_handoff_active() && is_write;
953        // Single-target path: no coalescing needed; forward
954        // directly to the original responder.
955        if targets.len() == 1 {
956            return self.dispatch_replicas_direct(
957                req,
958                req_span,
959                targets,
960                &bytes,
961                &responder,
962                &HandoffCtx {
963                    handoff_active,
964                    peer_states: &peer_states,
965                },
966            );
967        }
968        // Multi-target path: install the coalescer.
969        let cfg = self.pool.config();
970        let local_dc = cfg.dc.clone();
971        // Channel each replica's reply lands on. Sized to
972        // `targets.len() + 1`: every target produces at most one
973        // envelope (real or hint-synthesised) and we leave a
974        // spare so a late repair reply never blocks the actor.
975        let (intermediate_tx, intermediate_rx) =
976            mpsc::channel::<OutboundEnvelope>(targets.len() + 1);
977        // Build the tracker's target list and capture the
978        // per-target dispatch state.
979        let target_pairs: Vec<(u32, String)> =
980            targets.iter().map(|t| (t.peer_idx, t.dc.clone())).collect();
981        // Read repair context: the original primary key (single-
982        // key requests only) and the request type.
983        let repair_key: Option<Vec<u8>> = req
984            .keys()
985            .first()
986            .map(|kp| kp.tag_bytes().to_vec())
987            .filter(|k| !k.is_empty());
988        let repair_ctx = repair_key.map(|key| ReadRepairContext {
989            req_id: req.id(),
990            req_ty: req.ty(),
991            key,
992            mbuf_pool: self.mbuf_pool.clone(),
993            peer_backends: self.peer_backends.clone(),
994            local_backend: self.backend.clone(),
995            target_is_local: targets.iter().map(|t| (t.peer_idx, t.is_local)).collect(),
996        });
997        // Fan out: each per-target outbound feeds the coalescer
998        // channel, NOT the client's responder.
999        let mut sent = 0usize;
1000        let mut hinted = 0usize;
1001        for target in targets {
1002            let action = Self::choose_target_action(target, handoff_active, &peer_states);
1003            match action {
1004                TargetAction::Send => {
1005                    if self.fanout_send(target, req, req_span, &bytes, &intermediate_tx) {
1006                        sent += 1;
1007                    } else if handoff_active
1008                        && self.hint_target(target, &bytes, req, req_span, &intermediate_tx)
1009                    {
1010                        hinted += 1;
1011                    }
1012                }
1013                TargetAction::Hint => {
1014                    if self.hint_target(target, &bytes, req, req_span, &intermediate_tx) {
1015                        hinted += 1;
1016                    }
1017                }
1018            }
1019        }
1020        // Drop the local clone of the intermediate sender so the
1021        // coalescer task observes RX close once every per-target
1022        // sender has been dropped. (`OutboundRequest` owns one
1023        // sender each; once they finish they drop it.)
1024        drop(intermediate_tx);
1025        if sent + hinted == 0 {
1026            return DispatchOutcome::Error(self.no_quorum_error(req));
1027        }
1028        let req_id = req.id();
1029        let req_ty = req.ty();
1030        let mbuf_pool = self.mbuf_pool.clone();
1031        let failure_metrics = self.failure_metrics.clone();
1032        tokio::spawn(coalesce_actor(
1033            req_id,
1034            req_ty,
1035            consistency,
1036            target_pairs,
1037            local_dc,
1038            intermediate_rx,
1039            responder,
1040            mbuf_pool,
1041            repair_ctx,
1042            failure_metrics,
1043        ));
1044        DispatchOutcome::Pending
1045    }
1046
1047    /// Capture the current `PeerState` for each target so the
1048    /// per-target loop does not re-acquire the read lock on
1049    /// every iteration. Local-node targets are reported as
1050    /// `Normal` regardless of the on-disk peer entry.
1051    fn snapshot_peer_states(
1052        &self,
1053        targets: &[ReplicaTarget],
1054    ) -> std::collections::HashMap<u32, crate::cluster::peer::PeerState> {
1055        use crate::cluster::peer::PeerState;
1056        let peers = self.pool.peers().read();
1057        let mut out = std::collections::HashMap::with_capacity(targets.len());
1058        for t in targets {
1059            let state = if t.is_local {
1060                PeerState::Normal
1061            } else {
1062                peers
1063                    .get(t.peer_idx as usize)
1064                    .map_or(PeerState::Unknown, crate::cluster::peer::Peer::state)
1065            };
1066            out.insert(t.peer_idx, state);
1067        }
1068        out
1069    }
1070
1071    fn choose_target_action(
1072        target: &ReplicaTarget,
1073        handoff_active: bool,
1074        peer_states: &std::collections::HashMap<u32, crate::cluster::peer::PeerState>,
1075    ) -> TargetAction {
1076        use crate::cluster::peer::PeerState;
1077        if !handoff_active {
1078            return TargetAction::Send;
1079        }
1080        let state = peer_states
1081            .get(&target.peer_idx)
1082            .copied()
1083            .unwrap_or(PeerState::Unknown);
1084        match state {
1085            PeerState::Down => TargetAction::Hint,
1086            _ => TargetAction::Send,
1087        }
1088    }
1089
1090    /// Forward one target via its outbound channel. Returns
1091    /// `true` on a successful `try_send`.
1092    fn fanout_send(
1093        &self,
1094        target: &ReplicaTarget,
1095        req: &Msg,
1096        req_span: &tracing::Span,
1097        bytes: &[u8],
1098        intermediate_tx: &mpsc::Sender<OutboundEnvelope>,
1099    ) -> bool {
1100        let env = OutboundRequest {
1101            bytes: bytes.to_vec(),
1102            req_id: req.id(),
1103            responder: intermediate_tx.clone(),
1104            span: req_span.clone(),
1105            ty: crate::proto::dnode::DmsgType::Req,
1106            target_peer_idx: Some(target.peer_idx),
1107        };
1108        let send_result = if target.is_local {
1109            self.backend.as_ref().map(|tx| tx.try_send(env))
1110        } else {
1111            self.peer_backends
1112                .get(&target.peer_idx)
1113                .map(|tx| tx.try_send(env))
1114        };
1115        match send_result {
1116            Some(Ok(())) => true,
1117            Some(Err(err)) => {
1118                self.observe_send_error(target, &err);
1119                false
1120            }
1121            None => false,
1122        }
1123    }
1124
1125    /// Convert a `tokio::sync::mpsc::error::TrySendError` into a
1126    /// failure-metrics observation. Local targets bump the
1127    /// backend counters; peer targets bump the per-peer
1128    /// counters labelled with the peer's DC.
1129    fn observe_send_error(
1130        &self,
1131        target: &ReplicaTarget,
1132        err: &tokio::sync::mpsc::error::TrySendError<OutboundRequest>,
1133    ) {
1134        let Some(m) = self.failure_metrics.as_ref() else {
1135            return;
1136        };
1137        if target.is_local {
1138            match err {
1139                tokio::sync::mpsc::error::TrySendError::Full(_) => m.record_backend_send_full(),
1140                tokio::sync::mpsc::error::TrySendError::Closed(_) => {
1141                    m.record_backend_send_closed();
1142                }
1143            }
1144        } else {
1145            let peer_dc = self.peer_dc_label(target.peer_idx);
1146            match err {
1147                tokio::sync::mpsc::error::TrySendError::Full(_) => {
1148                    m.record_peer_send_full(target.peer_idx, &peer_dc);
1149                }
1150                tokio::sync::mpsc::error::TrySendError::Closed(_) => {
1151                    m.record_peer_send_closed(target.peer_idx, &peer_dc);
1152                }
1153            }
1154        }
1155    }
1156
1157    /// Record `bytes` as a hint for `target`'s peer and feed the
1158    /// coalescer a synthetic success reply. Returns `true` when
1159    /// both the enqueue and the synth-push succeeded.
1160    fn hint_target(
1161        &self,
1162        target: &ReplicaTarget,
1163        bytes: &[u8],
1164        req: &Msg,
1165        req_span: &tracing::Span,
1166        intermediate_tx: &mpsc::Sender<OutboundEnvelope>,
1167    ) -> bool {
1168        let Some(store) = self.hint_store.as_ref() else {
1169            return false;
1170        };
1171        let cfg = self.pool.config();
1172        let ttl = std::time::Duration::from_secs(cfg.hint_ttl_seconds.max(1));
1173        match store.enqueue(target.peer_idx, bytes.to_vec(), ttl) {
1174            Ok(()) => {}
1175            Err(e) => {
1176                tracing::debug!(
1177                    target: "dynomite::hints",
1178                    peer_idx = target.peer_idx,
1179                    error = %e,
1180                    "hint enqueue failed"
1181                );
1182                return false;
1183            }
1184        }
1185        let synth = synth_hint_reply(req, &self.mbuf_pool);
1186        let env = OutboundEnvelope {
1187            req_id: req.id(),
1188            rsp: synth,
1189            span: req_span.clone(),
1190            source_peer_idx: Some(target.peer_idx),
1191        };
1192        if intermediate_tx.try_send(env).is_err() {
1193            // The channel is sized for one envelope per target;
1194            // an immediate full means the coalescer task has
1195            // exited. Still report success so the hint sits in
1196            // the store for the drainer.
1197            tracing::debug!(
1198                target: "dynomite::hints",
1199                peer_idx = target.peer_idx,
1200                "hint synth-reply could not be queued; coalescer absent"
1201            );
1202        }
1203        tracing::debug!(
1204            target: "dynomite::hints",
1205            peer_idx = target.peer_idx,
1206            bytes = bytes.len(),
1207            "stored hint for down peer"
1208        );
1209        true
1210    }
1211
1212    fn dispatch_replicas_direct(
1213        &self,
1214        req: &Msg,
1215        req_span: &tracing::Span,
1216        targets: &[ReplicaTarget],
1217        bytes: &[u8],
1218        responder: &ServerSink,
1219        ctx: &HandoffCtx<'_>,
1220    ) -> DispatchOutcome {
1221        debug_assert_eq!(targets.len(), 1);
1222        let target = &targets[0];
1223        // If the target is Down and handoff is active, hint it
1224        // and feed the responder a synth `+OK\r\n` so the
1225        // client sees the write as having succeeded.
1226        if let TargetAction::Hint =
1227            Self::choose_target_action(target, ctx.handoff_active, ctx.peer_states)
1228        {
1229            if self.hint_single_target_direct(target, bytes, req, req_span, responder) {
1230                return DispatchOutcome::Pending;
1231            }
1232            return DispatchOutcome::Error(self.no_quorum_error(req));
1233        }
1234        let env = OutboundRequest {
1235            bytes: bytes.to_vec(),
1236            req_id: req.id(),
1237            responder: responder.clone(),
1238            span: req_span.clone(),
1239            ty: crate::proto::dnode::DmsgType::Req,
1240            target_peer_idx: Some(target.peer_idx),
1241        };
1242        let send_result = if target.is_local {
1243            self.backend.as_ref().map(|tx| tx.try_send(env))
1244        } else {
1245            self.peer_backends
1246                .get(&target.peer_idx)
1247                .map(|tx| tx.try_send(env))
1248        };
1249        let sent = match send_result {
1250            Some(Ok(())) => true,
1251            Some(Err(ref err)) => {
1252                self.observe_send_error(target, err);
1253                false
1254            }
1255            None => false,
1256        };
1257        if sent {
1258            return DispatchOutcome::Pending;
1259        }
1260        if ctx.handoff_active
1261            && self.hint_single_target_direct(target, bytes, req, req_span, responder)
1262        {
1263            return DispatchOutcome::Pending;
1264        }
1265        DispatchOutcome::Error(self.no_quorum_error(req))
1266    }
1267
1268    /// Single-target hint path. Records the hint in the store
1269    /// and pushes a `+OK\r\n` envelope onto the responder. The
1270    /// client sees the write as having succeeded; the drainer
1271    /// will replay the request to the peer when it returns.
1272    fn hint_single_target_direct(
1273        &self,
1274        target: &ReplicaTarget,
1275        bytes: &[u8],
1276        req: &Msg,
1277        req_span: &tracing::Span,
1278        responder: &ServerSink,
1279    ) -> bool {
1280        let Some(store) = self.hint_store.as_ref() else {
1281            return false;
1282        };
1283        let cfg = self.pool.config();
1284        let ttl = std::time::Duration::from_secs(cfg.hint_ttl_seconds.max(1));
1285        if let Err(e) = store.enqueue(target.peer_idx, bytes.to_vec(), ttl) {
1286            tracing::debug!(
1287                target: "dynomite::hints",
1288                peer_idx = target.peer_idx,
1289                error = %e,
1290                "hint enqueue failed (single-target)"
1291            );
1292            return false;
1293        }
1294        let synth = synth_hint_reply(req, &self.mbuf_pool);
1295        let env = OutboundEnvelope {
1296            req_id: req.id(),
1297            rsp: synth,
1298            span: req_span.clone(),
1299            source_peer_idx: Some(target.peer_idx),
1300        };
1301        let _ = responder.try_send(env);
1302        true
1303    }
1304
1305    fn no_quorum_error(&self, req: &Msg) -> Msg {
1306        let err_type = if matches!(req.ty(), MsgType::ReqRedisGet | MsgType::ReqRedisSet) {
1307            MsgType::RspRedisError
1308        } else {
1309            MsgType::RspMcServerError
1310        };
1311        crate::msg::response::make_error(
1312            req,
1313            err_type,
1314            0,
1315            crate::msg::DynErrorCode::DynomiteNoQuorumAchieved,
1316            &self.mbuf_pool,
1317        )
1318    }
1319
1320    /// FT.* / HSET interception. Returns `Some(outcome)` when the
1321    /// command was fully handled (FT.* keyword, or an HSET that
1322    /// references an indexed prefix with a malformed vector
1323    /// payload); returns `None` to let the caller fall through to
1324    /// the standard dispatch path. The HSET success path returns
1325    /// `None` after upserting into the registry so the standard
1326    /// storage write still goes to the backend.
1327    fn intercept_vector_command(
1328        &self,
1329        registry: &crate::vector::registry::VectorRegistry,
1330        req: &Msg,
1331    ) -> Option<DispatchOutcome> {
1332        match req.ty() {
1333            MsgType::ReqRedisFtCreate
1334            | MsgType::ReqRedisFtSearch
1335            | MsgType::ReqRedisFtInfo
1336            | MsgType::ReqRedisFtList
1337            | MsgType::ReqRedisFtDropindex
1338            | MsgType::ReqRedisFtRegex
1339            | MsgType::ReqRedisFtUnknown => Some(self.run_ft_command(registry, req)),
1340            MsgType::ReqRedisHset => self.intercept_hset(registry, req),
1341            _ => None,
1342        }
1343    }
1344
1345    /// Drive an FT.* command through the registry-backed executor
1346    /// and wrap the RESP bytes in a [`DispatchOutcome::Inline`].
1347    fn run_ft_command(
1348        &self,
1349        registry: &crate::vector::registry::VectorRegistry,
1350        req: &Msg,
1351    ) -> DispatchOutcome {
1352        // For typed FT.* variants the keyword is unambiguous; for
1353        // [`MsgType::ReqRedisFtUnknown`] we recover the original
1354        // wire keyword from the request's mbuf chain (the parser
1355        // case-folds for table lookup but the wire bytes are
1356        // preserved verbatim) so `ft::dispatch` can decide
1357        // whether we recognise the keyword and either execute
1358        // it or surface a structured `-ERR ...` reply.
1359        let recovered_kw: Vec<u8>;
1360        let keyword: &[u8] = match req.ty() {
1361            MsgType::ReqRedisFtCreate => b"FT.CREATE",
1362            MsgType::ReqRedisFtSearch => b"FT.SEARCH",
1363            MsgType::ReqRedisFtInfo => b"FT.INFO",
1364            MsgType::ReqRedisFtList => b"FT.LIST",
1365            MsgType::ReqRedisFtDropindex => b"FT.DROPINDEX",
1366            MsgType::ReqRedisFtRegex => b"FT.REGEX",
1367            MsgType::ReqRedisFtUnknown => {
1368                recovered_kw = first_bulk_token(req).unwrap_or_else(|| b"FT.UNKNOWN".to_vec());
1369                recovered_kw.as_slice()
1370            }
1371            // The dispatcher only enters this branch from the
1372            // FT.* arm above, so the catch-all is unreachable
1373            // unless the MsgType set drifts out of sync with the
1374            // intercept arm.
1375            _ => return DispatchOutcome::Drop,
1376        };
1377        let mut args: Vec<&[u8]> = Vec::with_capacity(1 + req.keys().len() + req.args().len());
1378        args.push(keyword);
1379        for k in req.keys() {
1380            args.push(k.key());
1381        }
1382        for a in req.args() {
1383            args.push(a.bytes());
1384        }
1385        let bytes = crate::proto::redis::ft::dispatch(registry, &args);
1386        DispatchOutcome::Inline(synthetic_redis_reply(req, &self.mbuf_pool, &bytes))
1387    }
1388
1389    /// HSET interception: returns `Some(Error(...))` when the
1390    /// HSET targets a registered prefix but its vector field is
1391    /// missing or malformed; returns `None` (so the dispatcher
1392    /// falls through to the backend) on success or when no
1393    /// registered prefix matches.
1394    fn intercept_hset(
1395        &self,
1396        registry: &crate::vector::registry::VectorRegistry,
1397        req: &Msg,
1398    ) -> Option<DispatchOutcome> {
1399        let mut args: Vec<&[u8]> = Vec::with_capacity(req.keys().len() + req.args().len());
1400        for k in req.keys() {
1401            args.push(k.key());
1402        }
1403        for a in req.args() {
1404            args.push(a.bytes());
1405        }
1406        match crate::proto::redis::ft::maybe_index_hset(registry, &args) {
1407            Ok(_) => None,
1408            Err(e) => {
1409                let payload = format!("-ERR {e}\r\n");
1410                Some(DispatchOutcome::Error(synthetic_redis_reply(
1411                    req,
1412                    &self.mbuf_pool,
1413                    payload.as_bytes(),
1414                )))
1415            }
1416        }
1417    }
1418}
1419
1420/// Wrap an arbitrary RESP byte sequence as a synthetic Redis
1421/// response [`Msg`]. The response inherits the request's id (so
1422/// the FSM can pair it with the originating request), is marked
1423/// `is_request = false`, and the supplied bytes are copied into
1424/// one or more mbufs drawn from `pool`.
1425fn synthetic_redis_reply(req: &Msg, pool: &MbufPool, payload: &[u8]) -> Msg {
1426    let mut rsp = Msg::new(req.id(), MsgType::RspRedisStatus, false);
1427    rsp.set_parent_id(req.id());
1428    let mut written = 0usize;
1429    while written < payload.len() {
1430        let mut buf = pool.get();
1431        let n = buf.recv(&payload[written..]);
1432        debug_assert!(
1433            n > 0,
1434            "MbufPool returned a buffer with zero writable capacity"
1435        );
1436        rsp.mbufs_mut().push_back(buf);
1437        written += n;
1438    }
1439    rsp.recompute_mlen();
1440    rsp
1441}
1442
1443/// Recover the first bulk-string token from a parsed RESP
1444/// request's mbuf chain. The parser case-folds the keyword for
1445/// the command-table lookup but stores the original wire bytes
1446/// in the mbufs; this helper reads them back so the dispatcher
1447/// can render structured error replies that quote the actual
1448/// keyword the client sent.
1449///
1450/// Returns `None` when the mbuf chain does not begin with a
1451/// well-formed `*N\r\n$M\r\n<token>\r\n` sequence.
1452fn first_bulk_token(req: &Msg) -> Option<Vec<u8>> {
1453    let mut wire: Vec<u8> = Vec::new();
1454    for buf in req.mbufs() {
1455        wire.extend_from_slice(buf.readable());
1456        if wire.len() > 256 {
1457            break;
1458        }
1459    }
1460    let mut p = 0usize;
1461    if wire.first() == Some(&b'*') {
1462        let cr = wire.iter().position(|&b| b == b'\r')?;
1463        if wire.get(cr + 1) != Some(&b'\n') {
1464            return None;
1465        }
1466        p = cr + 2;
1467    }
1468    if wire.get(p) != Some(&b'$') {
1469        return None;
1470    }
1471    let header_start = p + 1;
1472    let header_cr = wire[header_start..]
1473        .iter()
1474        .position(|&b| b == b'\r')
1475        .map(|i| header_start + i)?;
1476    if wire.get(header_cr + 1) != Some(&b'\n') {
1477        return None;
1478    }
1479    let len_str = std::str::from_utf8(&wire[header_start..header_cr]).ok()?;
1480    let len: usize = len_str.parse().ok()?;
1481    let body_start = header_cr + 2;
1482    let body_end = body_start.checked_add(len)?;
1483    if wire.len() < body_end + 2 {
1484        return None;
1485    }
1486    Some(wire[body_start..body_end].to_vec())
1487}
1488
1489/// Context required to schedule read-repair writes once the
1490/// coalescer has identified a winner and a divergent set.
1491#[derive(Clone)]
1492struct ReadRepairContext {
1493    req_id: crate::core::types::MsgId,
1494    req_ty: MsgType,
1495    /// Original primary key (single-key requests only). The v1
1496    /// repair scheduler operates over single-key Redis reads;
1497    /// multi-key fragmentation goes through a separate path.
1498    key: Vec<u8>,
1499    mbuf_pool: MbufPool,
1500    peer_backends: std::collections::HashMap<u32, mpsc::Sender<OutboundRequest>>,
1501    local_backend: Option<mpsc::Sender<OutboundRequest>>,
1502    target_is_local: std::collections::HashMap<u32, bool>,
1503}
1504
1505/// Per-fan-out coalescer task body.
1506#[allow(
1507    clippy::too_many_arguments,
1508    reason = "actor task captures the entire dispatch context; bundling into a struct adds churn for no callsite gain"
1509)]
1510async fn coalesce_actor(
1511    req_id: crate::core::types::MsgId,
1512    req_ty: MsgType,
1513    consistency: ConsistencyLevel,
1514    targets: Vec<(u32, String)>,
1515    local_dc: String,
1516    mut intermediate_rx: mpsc::Receiver<OutboundEnvelope>,
1517    client_tx: ServerSink,
1518    mbuf_pool: MbufPool,
1519    repair_ctx: Option<ReadRepairContext>,
1520    failure_metrics: Option<Arc<crate::stats::FailureMetrics>>,
1521) {
1522    use crate::proto::redis::{CoalesceOutcome, CoalesceTracker};
1523    let mut tracker = CoalesceTracker::new(req_id, consistency, targets, &local_dc);
1524    let mut emitted = false;
1525    while let Some(env) = intermediate_rx.recv().await {
1526        let source = env.source_peer_idx.unwrap_or(u32::MAX);
1527        let span = env.span.clone();
1528        let outcome = tracker.record_reply(source, env.rsp);
1529        match outcome {
1530            CoalesceOutcome::Pending => {}
1531            CoalesceOutcome::Ready {
1532                winner,
1533                divergent_targets,
1534            } => {
1535                if !emitted {
1536                    let winner_bytes: Vec<u8> = winner
1537                        .mbufs()
1538                        .iter()
1539                        .flat_map(|b| b.readable().to_vec())
1540                        .collect();
1541                    let out_env = OutboundEnvelope {
1542                        req_id,
1543                        rsp: *winner,
1544                        span: span.clone(),
1545                        source_peer_idx: None,
1546                    };
1547                    let _ = client_tx.send(out_env).await;
1548                    emitted = true;
1549                    if !divergent_targets.is_empty() {
1550                        if let Some(ctx) = repair_ctx.as_ref() {
1551                            schedule_read_repair(ctx, &divergent_targets, &winner_bytes, &span);
1552                        }
1553                    }
1554                }
1555            }
1556            CoalesceOutcome::Error(reason) => {
1557                if !emitted {
1558                    let err_type = if matches!(req_ty, MsgType::ReqRedisGet | MsgType::ReqRedisSet)
1559                    {
1560                        MsgType::RspRedisError
1561                    } else {
1562                        MsgType::RspMcServerError
1563                    };
1564                    let anchor = Msg::new(req_id, req_ty, true);
1565                    let rsp = crate::msg::response::make_error(
1566                        &anchor,
1567                        err_type,
1568                        0,
1569                        crate::msg::DynErrorCode::DynomiteNoQuorumAchieved,
1570                        &mbuf_pool,
1571                    );
1572                    let _ = client_tx
1573                        .send(OutboundEnvelope {
1574                            req_id,
1575                            rsp,
1576                            span: span.clone(),
1577                            source_peer_idx: None,
1578                        })
1579                        .await;
1580                    emitted = true;
1581                }
1582                tracing::debug!(target: "dynomite::coalesce", req_id, reason = %reason, "coalesce error");
1583            }
1584        }
1585    }
1586    if !emitted {
1587        // No reply was emitted and the channel closed (every
1588        // per-target sender dropped without producing a reply).
1589        // From the dispatcher's perspective the request has
1590        // timed out at the coalescer layer; surface a
1591        // quorum-unreachable error so the client does not hang
1592        // and bump the response-timeout counter.
1593        if let Some(m) = failure_metrics.as_ref() {
1594            m.record_response_timeout(consistency);
1595        }
1596        let err_type = if matches!(req_ty, MsgType::ReqRedisGet | MsgType::ReqRedisSet) {
1597            MsgType::RspRedisError
1598        } else {
1599            MsgType::RspMcServerError
1600        };
1601        let anchor = Msg::new(req_id, req_ty, true);
1602        let rsp = crate::msg::response::make_error(
1603            &anchor,
1604            err_type,
1605            0,
1606            crate::msg::DynErrorCode::DynomiteNoQuorumAchieved,
1607            &mbuf_pool,
1608        );
1609        let _ = client_tx
1610            .send(OutboundEnvelope {
1611                req_id,
1612                rsp,
1613                span: tracing::Span::none(),
1614                source_peer_idx: None,
1615            })
1616            .await;
1617    }
1618}
1619
1620/// Build a sink the read-repair task can drop replies into. The
1621/// scheduler is fire-and-forget: every reply is discarded.
1622fn repair_sink() -> ServerSink {
1623    let (tx, mut rx) = mpsc::channel::<OutboundEnvelope>(8);
1624    tokio::spawn(async move {
1625        while rx.recv().await.is_some() {
1626            // Drop the envelope; the original client already
1627            // received its reply on the main responder.
1628        }
1629    });
1630    tx
1631}
1632
1633/// Decode a winning RESP reply into the bytes we want to write
1634/// back to divergent replicas.
1635///
1636/// Returns `Some(bytes)` for a bulk-string winner (we ship a
1637/// `SET key value` to the divergent replica) or for a nil reply
1638/// (we ship a `DEL key`). Returns `None` for any other shape
1639/// (errors, integers, multibulk, ...) since the v1 repair
1640/// scheduler only handles single-bulk Redis GET-style winners.
1641fn decode_winner_for_repair(payload: &[u8]) -> Option<RepairAction> {
1642    if payload == b"$-1\r\n" {
1643        return Some(RepairAction::Delete);
1644    }
1645    if !payload.starts_with(b"$") {
1646        return None;
1647    }
1648    // `$<len>\r\n<value>\r\n`
1649    let crlf = payload.iter().position(|&b| b == b'\r')?;
1650    if payload.get(crlf + 1).copied() != Some(b'\n') {
1651        return None;
1652    }
1653    let len_str = std::str::from_utf8(&payload[1..crlf]).ok()?;
1654    let len: usize = len_str.parse().ok()?;
1655    let body_start = crlf + 2;
1656    let body_end = body_start.checked_add(len)?;
1657    if payload.len() < body_end + 2 {
1658        return None;
1659    }
1660    if &payload[body_end..body_end + 2] != b"\r\n" {
1661        return None;
1662    }
1663    Some(RepairAction::Write(payload[body_start..body_end].to_vec()))
1664}
1665
1666/// Bundle of handoff state passed into
1667/// [`ClusterDispatcher::dispatch_replicas_direct`]. Avoids a
1668/// noisy parameter list while keeping the per-call decisions
1669/// ("is handoff on?", "what state is each target in?") in one
1670/// place.
1671struct HandoffCtx<'a> {
1672    handoff_active: bool,
1673    peer_states: &'a std::collections::HashMap<u32, crate::cluster::peer::PeerState>,
1674}
1675
1676/// Per-target dispatch action chosen by
1677/// [`ClusterDispatcher::choose_target_action`].
1678#[derive(Clone, Copy, Debug, Eq, PartialEq)]
1679enum TargetAction {
1680    /// Forward the request via the per-peer outbound channel.
1681    Send,
1682    /// Record a hint and feed the coalescer a synth success
1683    /// reply.
1684    Hint,
1685}
1686
1687/// Synthetic reply pushed into the coalescer on behalf of a
1688/// hinted target. Always a `+OK\r\n` Redis status reply: the
1689/// dispatcher decouples the hint from the eventual peer reply
1690/// (the drainer will fire-and-forget the hint when the peer
1691/// returns), so the synth shape only needs to satisfy the
1692/// coalescer for SET-style writes which is the dominant case
1693/// for hinted handoff. Mismatched-shape writes (DEL with one
1694/// hinted target) coalesce to the surviving real reply via the
1695/// plurality / quorum branch and the divergence is harmless
1696/// because read-repair only fires for `MsgType::ReqRedisGet`.
1697fn synth_hint_reply(req: &Msg, pool: &MbufPool) -> Msg {
1698    crate::msg::response::make_simple_redis(req, pool, b"+OK\r\n")
1699}
1700
1701/// Action a read-repair task should take against a divergent
1702/// replica.
1703enum RepairAction {
1704    /// Ship `SET key <bytes>` to overwrite the stale value.
1705    Write(Vec<u8>),
1706    /// Ship `DEL key` to drop the stale value (winning reply
1707    /// was a nil bulk).
1708    Delete,
1709}
1710
1711/// Build the wire bytes for a Redis repair write.
1712fn build_repair_bytes(action: &RepairAction, key: &[u8]) -> Vec<u8> {
1713    match action {
1714        RepairAction::Write(value) => {
1715            let mut out = Vec::with_capacity(key.len() + value.len() + 32);
1716            out.extend_from_slice(b"*3\r\n$3\r\nSET\r\n$");
1717            out.extend_from_slice(key.len().to_string().as_bytes());
1718            out.extend_from_slice(b"\r\n");
1719            out.extend_from_slice(key);
1720            out.extend_from_slice(b"\r\n$");
1721            out.extend_from_slice(value.len().to_string().as_bytes());
1722            out.extend_from_slice(b"\r\n");
1723            out.extend_from_slice(value);
1724            out.extend_from_slice(b"\r\n");
1725            out
1726        }
1727        RepairAction::Delete => {
1728            let mut out = Vec::with_capacity(key.len() + 24);
1729            out.extend_from_slice(b"*2\r\n$3\r\nDEL\r\n$");
1730            out.extend_from_slice(key.len().to_string().as_bytes());
1731            out.extend_from_slice(b"\r\n");
1732            out.extend_from_slice(key);
1733            out.extend_from_slice(b"\r\n");
1734            out
1735        }
1736    }
1737}
1738
1739/// Schedule fire-and-forget read-repair writes to every
1740/// divergent target. The function only awaits a bounded mpsc
1741/// permit; it never blocks for the repair to complete or for
1742/// the divergent replica to ack.
1743///
1744/// The repair shape is decoded from `winner_bytes`:
1745///
1746/// * Bulk-string winner -> `SET key <value>`.
1747/// * Nil bulk winner -> `DEL key`.
1748/// * Anything else -> skipped (entropy reconciliation will
1749///   handle it later). This v1 limitation is documented in the
1750///   dispatcher tests and in `docs/parity.md`.
1751///
1752/// Repair writes are tagged with `DmsgType::ReqForward` so the
1753/// receiving peer's `dnode_client_loop` rewrites the parsed
1754/// request's routing tag to `LocalNodeOnly`, preventing a
1755/// recursive multi-replica fan-out at the divergent peer.
1756fn schedule_read_repair(
1757    ctx: &ReadRepairContext,
1758    divergent: &[u32],
1759    winner_bytes: &[u8],
1760    span: &tracing::Span,
1761) {
1762    if !matches!(ctx.req_ty, MsgType::ReqRedisGet) {
1763        return;
1764    }
1765    let Some(action) = decode_winner_for_repair(winner_bytes) else {
1766        return;
1767    };
1768    let bytes = build_repair_bytes(&action, &ctx.key);
1769    let sink = repair_sink();
1770    for &peer_idx in divergent {
1771        let is_local = ctx.target_is_local.get(&peer_idx).copied().unwrap_or(false);
1772        let env = OutboundRequest {
1773            bytes: bytes.clone(),
1774            req_id: ctx.req_id,
1775            responder: sink.clone(),
1776            span: span.clone(),
1777            ty: crate::proto::dnode::DmsgType::ReqForward,
1778            target_peer_idx: Some(peer_idx),
1779        };
1780        let sent = if is_local {
1781            ctx.local_backend
1782                .as_ref()
1783                .is_some_and(|tx| tx.try_send(env).is_ok())
1784        } else {
1785            ctx.peer_backends
1786                .get(&peer_idx)
1787                .is_some_and(|tx| tx.try_send(env).is_ok())
1788        };
1789        if sent {
1790            let _ = &ctx.mbuf_pool;
1791            tracing::debug!(
1792                target: "dynomite::read_repair",
1793                req_id = ctx.req_id,
1794                peer_idx,
1795                bytes = bytes.len(),
1796                "scheduled read-repair write",
1797            );
1798        } else {
1799            tracing::debug!(
1800                target: "dynomite::read_repair",
1801                req_id = ctx.req_id,
1802                peer_idx,
1803                "read-repair drop: backend channel unavailable or full",
1804            );
1805        }
1806    }
1807}
1808
1809#[cfg(test)]
1810mod tests {
1811    use super::*;
1812    use crate::cluster::peer::{Peer, PeerEndpoint, PeerState};
1813    use crate::conf::DataStore;
1814    use crate::hashkit::DynToken;
1815
1816    fn cfg(read: ConsistencyLevel, write: ConsistencyLevel) -> crate::cluster::PoolConfig {
1817        crate::cluster::PoolConfig {
1818            read_consistency: read,
1819            write_consistency: write,
1820            dc: "dc1".into(),
1821            rack: "rA".into(),
1822            ..crate::cluster::PoolConfig::default()
1823        }
1824    }
1825
1826    fn peer(idx: u32, dc: &str, rack: &str, tok: u32, is_local: bool, is_same: bool) -> Peer {
1827        let mut p = Peer::new(
1828            idx,
1829            PeerEndpoint::tcp("h".into(), 8101 + u16::try_from(idx).unwrap_or(0)),
1830            rack.into(),
1831            dc.into(),
1832            vec![DynToken::from_u32(tok)],
1833            is_local,
1834            is_same,
1835            false,
1836        );
1837        p.set_state(PeerState::Normal, 0);
1838        p
1839    }
1840
1841    fn pool(read: ConsistencyLevel, write: ConsistencyLevel, peers: Vec<Peer>) -> Arc<ServerPool> {
1842        let pool = ServerPool::new(cfg(read, write), peers);
1843        pool.preselect_remote_racks();
1844        Arc::new(pool)
1845    }
1846
1847    #[test]
1848    fn local_node_only_short_circuits() {
1849        let p = pool(
1850            ConsistencyLevel::DcOne,
1851            ConsistencyLevel::DcOne,
1852            vec![peer(0, "dc1", "rA", 10, true, true)],
1853        );
1854        let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
1855        req.set_routing(MsgRouting::LocalNodeOnly);
1856        assert_eq!(
1857            ClusterDispatcher::new(p).plan(&req, b"k"),
1858            DispatchPlan::LocalDatastore,
1859        );
1860    }
1861
1862    #[test]
1863    fn dc_one_read_targets_local_rack_when_present() {
1864        let p = pool(
1865            ConsistencyLevel::DcOne,
1866            ConsistencyLevel::DcOne,
1867            vec![
1868                peer(0, "dc1", "rA", 10, true, true),
1869                peer(1, "dc1", "rB", 20, false, true),
1870                peer(2, "dc2", "rA", 30, false, false),
1871            ],
1872        );
1873        let req = Msg::new(1, MsgType::ReqRedisGet, true);
1874        // Any key resolves to peer 0 in rack rA (single-token continuum).
1875        let plan = ClusterDispatcher::new(p).plan(&req, b"hello");
1876        assert!(matches!(plan, DispatchPlan::LocalDatastore));
1877    }
1878
1879    #[test]
1880    fn dc_quorum_fans_out_local_dc() {
1881        let p = pool(
1882            ConsistencyLevel::DcQuorum,
1883            ConsistencyLevel::DcQuorum,
1884            vec![
1885                peer(0, "dc1", "rA", 10, true, true),
1886                peer(1, "dc1", "rB", 20, false, true),
1887                peer(2, "dc2", "rA", 30, false, false),
1888            ],
1889        );
1890        let req = Msg::new(1, MsgType::ReqRedisGet, true);
1891        let plan = ClusterDispatcher::new(p).plan(&req, b"k");
1892        match plan {
1893            DispatchPlan::Replicas { targets: rs, .. } => {
1894                assert_eq!(rs.len(), 2);
1895                for r in rs {
1896                    assert_eq!(r.dc, "dc1");
1897                }
1898            }
1899            _ => panic!("expected replicas"),
1900        }
1901    }
1902
1903    #[test]
1904    fn dc_each_safe_quorum_fans_out_per_dc() {
1905        let p = pool(
1906            ConsistencyLevel::DcEachSafeQuorum,
1907            ConsistencyLevel::DcEachSafeQuorum,
1908            vec![
1909                peer(0, "dc1", "rA", 10, true, true),
1910                peer(1, "dc2", "rA", 20, false, false),
1911            ],
1912        );
1913        let req = Msg::new(1, MsgType::ReqRedisGet, true);
1914        let plan = ClusterDispatcher::new(p).plan(&req, b"k");
1915        match plan {
1916            DispatchPlan::Replicas { targets: rs, .. } => {
1917                assert_eq!(rs.len(), 2);
1918                let dcs: Vec<&str> = rs.iter().map(|r| r.dc.as_str()).collect();
1919                assert!(dcs.contains(&"dc1"));
1920                assert!(dcs.contains(&"dc2"));
1921            }
1922            _ => panic!("expected replicas"),
1923        }
1924    }
1925
1926    #[test]
1927    fn no_routable_peers_returns_no_targets() {
1928        let mut p0 = peer(0, "dc1", "rA", 10, true, true);
1929        p0.set_state(PeerState::Down, 0);
1930        let p = pool(
1931            ConsistencyLevel::DcQuorum,
1932            ConsistencyLevel::DcQuorum,
1933            vec![p0],
1934        );
1935        let req = Msg::new(1, MsgType::ReqRedisGet, true);
1936        let plan = ClusterDispatcher::new(p).plan(&req, b"k");
1937        assert_eq!(plan, DispatchPlan::NoTargets);
1938    }
1939
1940    /// Regression: any code path that returns
1941    /// `DispatchOutcome::Error` used to send 0 wire bytes to the
1942    /// client because [`crate::msg::response::make_error`] never
1943    /// attached the wire-format error string. The client then
1944    /// hung until its read timeout. After the fix, the error
1945    /// response carries a parseable `-Dynomite: ...` reply that
1946    /// the client can render as an error.
1947    #[test]
1948    fn no_targets_error_response_carries_dynomite_wire_bytes() {
1949        let mut p0 = peer(0, "dc1", "rA", 10, true, true);
1950        p0.set_state(PeerState::Down, 0);
1951        let p = pool(
1952            ConsistencyLevel::DcQuorum,
1953            ConsistencyLevel::DcQuorum,
1954            vec![p0],
1955        );
1956        let disp = ClusterDispatcher::new(p);
1957        let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
1958        req.push_key(crate::msg::keypos::KeyPos::without_tag(b"k".to_vec()));
1959        let (tx, _rx) = mpsc::channel(1);
1960        let outcome = disp.dispatch(req, tx);
1961        match outcome {
1962            DispatchOutcome::Error(rsp) => {
1963                assert_eq!(rsp.ty(), MsgType::RspRedisError);
1964                assert!(rsp.flags().is_error);
1965                let bytes: Vec<u8> = rsp
1966                    .mbufs()
1967                    .iter()
1968                    .flat_map(|b| b.readable().to_vec())
1969                    .collect();
1970                assert!(
1971                    !bytes.is_empty(),
1972                    "NoTargets must produce on-wire bytes, not a 0-byte hang"
1973                );
1974                assert!(bytes.starts_with(b"-Dynomite: "));
1975                assert!(bytes.ends_with(b"\r\n"));
1976                assert_eq!(rsp.mlen() as usize, bytes.len());
1977            }
1978            other => panic!("expected DispatchOutcome::Error, got {other:?}"),
1979        }
1980    }
1981
1982    /// Memcache traffic with no quorum-eligible target must
1983    /// surface a `SERVER_ERROR ...\r\n` reply rather than
1984    /// hanging the client.
1985    #[test]
1986    fn no_targets_error_response_memcache_wire_bytes() {
1987        // Build a memcache pool so the dispatcher's err_type
1988        // selection lands on `RspMcServerError` (the dispatcher
1989        // currently keys off the request `MsgType`, so a
1990        // memcache request flows through the memcache wire
1991        // shape).
1992        let mut cfg = cfg(ConsistencyLevel::DcQuorum, ConsistencyLevel::DcQuorum);
1993        cfg.data_store = DataStore::Memcache;
1994        let mut p0 = peer(0, "dc1", "rA", 10, true, true);
1995        p0.set_state(PeerState::Down, 0);
1996        let pool_arc = ServerPool::new(cfg, vec![p0]);
1997        pool_arc.preselect_remote_racks();
1998        let disp = ClusterDispatcher::new(Arc::new(pool_arc));
1999        let mut req = Msg::new(1, MsgType::ReqMcGet, true);
2000        req.push_key(crate::msg::keypos::KeyPos::without_tag(b"k".to_vec()));
2001        let (tx, _rx) = mpsc::channel(1);
2002        let outcome = disp.dispatch(req, tx);
2003        match outcome {
2004            DispatchOutcome::Error(rsp) => {
2005                assert_eq!(rsp.ty(), MsgType::RspMcServerError);
2006                let bytes: Vec<u8> = rsp
2007                    .mbufs()
2008                    .iter()
2009                    .flat_map(|b| b.readable().to_vec())
2010                    .collect();
2011                assert!(
2012                    !bytes.is_empty(),
2013                    "NoTargets must produce on-wire bytes, not a 0-byte hang"
2014                );
2015                assert!(bytes.starts_with(b"SERVER_ERROR "));
2016                assert!(bytes.ends_with(b"\r\n"));
2017            }
2018            other => panic!("expected DispatchOutcome::Error, got {other:?}"),
2019        }
2020    }
2021
2022    use crate::cluster::pool::{BucketType, PoolConfig};
2023
2024    fn pool_with_bucket_types(
2025        pool_read: ConsistencyLevel,
2026        pool_write: ConsistencyLevel,
2027        bucket_types: Vec<BucketType>,
2028        default_bucket_type: Option<&str>,
2029        peers: Vec<Peer>,
2030    ) -> Arc<ServerPool> {
2031        let cfg = PoolConfig {
2032            read_consistency: pool_read,
2033            write_consistency: pool_write,
2034            dc: "dc1".into(),
2035            rack: "rA".into(),
2036            bucket_types,
2037            default_bucket_type: default_bucket_type.map(str::to_string),
2038            ..PoolConfig::default()
2039        };
2040        let pool = ServerPool::new(cfg, peers);
2041        pool.preselect_remote_racks();
2042        Arc::new(pool)
2043    }
2044
2045    fn three_local_peers() -> Vec<Peer> {
2046        vec![
2047            peer(0, "dc1", "rA", 10, true, true),
2048            peer(1, "dc1", "rB", 20, false, true),
2049            peer(2, "dc1", "rC", 30, false, true),
2050        ]
2051    }
2052
2053    #[test]
2054    fn bucket_type_overrides_pool_consistency() {
2055        // Pool default is DC_ONE, the bucket forces DC_QUORUM.
2056        let bts = vec![BucketType {
2057            name: "hot".into(),
2058            read_consistency: ConsistencyLevel::DcQuorum,
2059            write_consistency: ConsistencyLevel::DcQuorum,
2060            n_val: 0,
2061        }];
2062        let p = pool_with_bucket_types(
2063            ConsistencyLevel::DcOne,
2064            ConsistencyLevel::DcOne,
2065            bts,
2066            None,
2067            three_local_peers(),
2068        );
2069        let req = Msg::new(1, MsgType::ReqRedisGet, true);
2070        let plan = ClusterDispatcher::new(p).plan(&req, b"hot/key1");
2071        match plan {
2072            DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 3),
2073            other => panic!("expected DC_QUORUM fan-out, got {other:?}"),
2074        }
2075    }
2076
2077    #[test]
2078    fn slashless_key_falls_back_to_pool_default() {
2079        let bts = vec![BucketType {
2080            name: "hot".into(),
2081            read_consistency: ConsistencyLevel::DcQuorum,
2082            write_consistency: ConsistencyLevel::DcQuorum,
2083            n_val: 0,
2084        }];
2085        let p = pool_with_bucket_types(
2086            ConsistencyLevel::DcOne,
2087            ConsistencyLevel::DcOne,
2088            bts,
2089            None,
2090            three_local_peers(),
2091        );
2092        let req = Msg::new(1, MsgType::ReqRedisGet, true);
2093        let plan = ClusterDispatcher::new(p).plan(&req, b"plain-key");
2094        // No slash and no default_bucket_type -> pool DC_ONE.
2095        // The local rack hosts peer 0 so the plan short-circuits.
2096        assert!(matches!(plan, DispatchPlan::LocalDatastore));
2097    }
2098
2099    #[test]
2100    fn unknown_bucket_uses_default_bucket_type_when_set() {
2101        let bts = vec![BucketType {
2102            name: "safe".into(),
2103            read_consistency: ConsistencyLevel::DcQuorum,
2104            write_consistency: ConsistencyLevel::DcQuorum,
2105            n_val: 0,
2106        }];
2107        let p = pool_with_bucket_types(
2108            ConsistencyLevel::DcOne,
2109            ConsistencyLevel::DcOne,
2110            bts,
2111            Some("safe"),
2112            three_local_peers(),
2113        );
2114        let req = Msg::new(1, MsgType::ReqRedisGet, true);
2115        // Slashless key: bucket is None, default_bucket_type=safe applies
2116        // so we get the bucket-type's DC_QUORUM fan-out.
2117        let plan = ClusterDispatcher::new(p.clone()).plan(&req, b"plain-key");
2118        match plan {
2119            DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 3),
2120            other => panic!("expected DC_QUORUM via default bucket, got {other:?}"),
2121        }
2122        // Slashed key with an unknown bucket prefix also falls
2123        // through to the default bucket type.
2124        let plan = ClusterDispatcher::new(p).plan(&req, b"unknown-bucket/key");
2125        match plan {
2126            DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 3),
2127            other => panic!("expected DC_QUORUM via default bucket, got {other:?}"),
2128        }
2129    }
2130
2131    #[test]
2132    fn unknown_bucket_with_no_default_uses_pool_default() {
2133        let bts = vec![BucketType {
2134            name: "safe".into(),
2135            read_consistency: ConsistencyLevel::DcQuorum,
2136            write_consistency: ConsistencyLevel::DcQuorum,
2137            n_val: 0,
2138        }];
2139        let p = pool_with_bucket_types(
2140            ConsistencyLevel::DcOne,
2141            ConsistencyLevel::DcOne,
2142            bts,
2143            None,
2144            three_local_peers(),
2145        );
2146        let req = Msg::new(1, MsgType::ReqRedisGet, true);
2147        let plan = ClusterDispatcher::new(p).plan(&req, b"unknown-bucket/key");
2148        assert!(matches!(plan, DispatchPlan::LocalDatastore));
2149    }
2150
2151    #[test]
2152    fn n_val_one_caps_replicas_to_first_target() {
2153        let bts = vec![BucketType {
2154            name: "thin".into(),
2155            read_consistency: ConsistencyLevel::DcQuorum,
2156            write_consistency: ConsistencyLevel::DcQuorum,
2157            n_val: 1,
2158        }];
2159        let p = pool_with_bucket_types(
2160            ConsistencyLevel::DcOne,
2161            ConsistencyLevel::DcOne,
2162            bts,
2163            None,
2164            three_local_peers(),
2165        );
2166        let req = Msg::new(1, MsgType::ReqRedisGet, true);
2167        let plan = ClusterDispatcher::new(p).plan(&req, b"thin/key");
2168        match plan {
2169            DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 1),
2170            other => panic!("expected single-target plan, got {other:?}"),
2171        }
2172    }
2173
2174    #[test]
2175    fn n_val_two_caps_replicas_to_first_two_targets() {
2176        let bts = vec![BucketType {
2177            name: "medium".into(),
2178            read_consistency: ConsistencyLevel::DcQuorum,
2179            write_consistency: ConsistencyLevel::DcQuorum,
2180            n_val: 2,
2181        }];
2182        let p = pool_with_bucket_types(
2183            ConsistencyLevel::DcOne,
2184            ConsistencyLevel::DcOne,
2185            bts,
2186            None,
2187            three_local_peers(),
2188        );
2189        let req = Msg::new(1, MsgType::ReqRedisGet, true);
2190        let plan = ClusterDispatcher::new(p).plan(&req, b"medium/key");
2191        match plan {
2192            DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 2),
2193            other => panic!("expected two-target plan, got {other:?}"),
2194        }
2195    }
2196
2197    #[test]
2198    fn n_val_zero_does_not_cap() {
2199        let bts = vec![BucketType {
2200            name: "any".into(),
2201            read_consistency: ConsistencyLevel::DcQuorum,
2202            write_consistency: ConsistencyLevel::DcQuorum,
2203            n_val: 0,
2204        }];
2205        let p = pool_with_bucket_types(
2206            ConsistencyLevel::DcOne,
2207            ConsistencyLevel::DcOne,
2208            bts,
2209            None,
2210            three_local_peers(),
2211        );
2212        let req = Msg::new(1, MsgType::ReqRedisGet, true);
2213        let plan = ClusterDispatcher::new(p).plan(&req, b"any/key");
2214        match plan {
2215            DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 3),
2216            other => panic!("expected uncapped plan, got {other:?}"),
2217        }
2218    }
2219
2220    #[test]
2221    fn n_val_larger_than_replicas_is_a_no_op() {
2222        let bts = vec![BucketType {
2223            name: "big".into(),
2224            read_consistency: ConsistencyLevel::DcQuorum,
2225            write_consistency: ConsistencyLevel::DcQuorum,
2226            n_val: 7,
2227        }];
2228        let p = pool_with_bucket_types(
2229            ConsistencyLevel::DcOne,
2230            ConsistencyLevel::DcOne,
2231            bts,
2232            None,
2233            three_local_peers(),
2234        );
2235        let req = Msg::new(1, MsgType::ReqRedisGet, true);
2236        let plan = ClusterDispatcher::new(p).plan(&req, b"big/key");
2237        match plan {
2238            DispatchPlan::Replicas { targets: rs, .. } => assert_eq!(rs.len(), 3),
2239            other => panic!("expected uncapped plan, got {other:?}"),
2240        }
2241    }
2242
2243    /// Smoke test for the failure-cause counter wiring: a
2244    /// `NoTargets` plan increments the labelled counter
2245    /// exactly once.
2246    #[test]
2247    fn no_targets_records_failure_metric() {
2248        let mut p0 = peer(0, "dc1", "rA", 10, true, true);
2249        p0.set_state(PeerState::Down, 0);
2250        let p = pool(
2251            ConsistencyLevel::DcQuorum,
2252            ConsistencyLevel::DcQuorum,
2253            vec![p0],
2254        );
2255        let metrics = Arc::new(crate::stats::FailureMetrics::new());
2256        let disp = ClusterDispatcher::new(p).with_failure_metrics(metrics.clone());
2257        let req = Msg::new(1, MsgType::ReqRedisGet, true);
2258        assert_eq!(disp.plan(&req, b"k"), DispatchPlan::NoTargets);
2259        let snap = metrics.snapshot();
2260        assert_eq!(snap.no_targets.len(), 1);
2261        let entry = &snap.no_targets[0];
2262        assert_eq!(entry.dc, "dc1");
2263        assert_eq!(entry.rack, "rA");
2264        assert_eq!(entry.consistency, ConsistencyLevel::DcQuorum);
2265        assert_eq!(entry.count, 1);
2266    }
2267
2268    /// A closed mpsc channel returns `Closed` from
2269    /// `try_send`. Wire the dispatcher's local-datastore path
2270    /// to such a channel, fire one request, and assert the
2271    /// `dispatch_backend_send_closed_total` counter ticks by
2272    /// exactly one.
2273    #[tokio::test]
2274    async fn closed_backend_channel_records_closed_metric() {
2275        let p = pool(
2276            ConsistencyLevel::DcOne,
2277            ConsistencyLevel::DcOne,
2278            vec![peer(0, "dc1", "rA", 10, true, true)],
2279        );
2280        let (tx, rx) = mpsc::channel::<crate::net::server::OutboundRequest>(4);
2281        drop(rx);
2282        let metrics = Arc::new(crate::stats::FailureMetrics::new());
2283        let disp = ClusterDispatcher::new(p)
2284            .with_backend(tx)
2285            .with_failure_metrics(metrics.clone());
2286        let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
2287        // Attach a non-empty mbuf so the dispatcher actually
2288        // attempts the try_send (empty bytes short-circuit
2289        // to Drop before the channel is touched).
2290        let pool_buf = crate::io::mbuf::MbufPool::default();
2291        let mut buf = pool_buf.get();
2292        buf.copy_from_slice(b"PING\r\n");
2293        req.mbufs_mut().push_back(buf);
2294        let (resp_tx, _resp_rx) = mpsc::channel(1);
2295        let outcome = disp.dispatch(req, resp_tx);
2296        assert!(matches!(outcome, DispatchOutcome::Error(_)));
2297        let snap = metrics.snapshot();
2298        assert_eq!(snap.backend_send_closed, 1);
2299        assert_eq!(snap.backend_send_full, 0);
2300    }
2301
2302    /// Integration-style unit test: build a two-peer pool,
2303    /// mark one peer Down, drive 100 dispatches across the
2304    /// ring, and assert the `dispatch_no_targets_total`
2305    /// counter reflects every observed `NoTargets` plan.
2306    #[test]
2307    fn two_peer_pool_with_one_down_records_per_key_no_targets() {
2308        let cfg = crate::cluster::PoolConfig {
2309            dc: "dc1".into(),
2310            rack: "rA".into(),
2311            read_consistency: ConsistencyLevel::DcQuorum,
2312            write_consistency: ConsistencyLevel::DcQuorum,
2313            ..crate::cluster::PoolConfig::default()
2314        };
2315        // Single-rack two-peer ring: peer 0 owns the upper
2316        // half via wrap-around (token 2_147_483_648), peer 1
2317        // owns the lower half (token 0 plus the boundary up to
2318        // 2_147_483_648). With both peers in rack `rA` the
2319        // continuum has two entries, so each key maps to
2320        // exactly one peer; marking peer 1 Down causes its
2321        // arc to produce `NoTargets`.
2322        let p0 = peer(0, "dc1", "rA", 2_147_483_648, true, true);
2323        let mut p1 = peer(1, "dc1", "rA", 0, false, true);
2324        p1.set_state(PeerState::Down, 0);
2325        let pool_arc = ServerPool::new(cfg, vec![p0, p1]);
2326        pool_arc.preselect_remote_racks();
2327        let metrics = Arc::new(crate::stats::FailureMetrics::new());
2328        let disp = ClusterDispatcher::new(Arc::new(pool_arc)).with_failure_metrics(metrics.clone());
2329        let mut planned_no_targets = 0u64;
2330        let mut planned_routable = 0u64;
2331        for i in 0..100u32 {
2332            let key = format!("k{i:03}");
2333            let req = Msg::new(u64::from(i), MsgType::ReqRedisGet, true);
2334            match disp.plan(&req, key.as_bytes()) {
2335                DispatchPlan::NoTargets => planned_no_targets += 1,
2336                DispatchPlan::Replicas { .. } | DispatchPlan::LocalDatastore => {
2337                    planned_routable += 1;
2338                }
2339                DispatchPlan::Drop => panic!("unexpected Drop in plan"),
2340            }
2341        }
2342        assert!(planned_no_targets > 0, "expected some NoTargets dispatches");
2343        assert!(planned_routable > 0, "expected some routable dispatches");
2344        let snap = metrics.snapshot();
2345        let counter_total: u64 = snap.no_targets.iter().map(|e| e.count).sum();
2346        assert_eq!(
2347            counter_total, planned_no_targets,
2348            "dispatch_no_targets_total must match observed NoTargets count",
2349        );
2350        // No `Closed`/`Full` channel errors expected: we did
2351        // not wire any backends.
2352        assert_eq!(snap.backend_send_full, 0);
2353        assert_eq!(snap.backend_send_closed, 0);
2354        assert!(snap.peer_send_full.is_empty());
2355        assert!(snap.peer_send_closed.is_empty());
2356    }
2357}