Skip to main content

dynomite/cluster/
pool.rs

1//! Server pool owner.
2//!
3//! [`ServerPool`] is the cluster-level container that holds the
4//! local datastore endpoint, the dnode listener configuration, the
5//! datacenter and rack tables, the per-peer connection pools, the
6//! consistency settings, and the response-manager constructors used
7//! by [`crate::cluster::dispatch::ClusterDispatcher`]. It owns the
8//! data-shape side of `struct server_pool` from the reference engine
9//! and is the entry point that the `dynomited` binary (Stage 12)
10//! and the embedding API (Stage 13) construct from a parsed
11//! [`crate::conf::Config`].
12
13use std::sync::Arc;
14use std::time::Duration;
15
16use parking_lot::RwLock;
17
18use crate::cluster::datacenter::Datacenter;
19use crate::cluster::peer::Peer;
20use crate::conf::{
21    ConfPool, ConsistencyLevel as ConfConsistencyLevel, DataStore, Distribution, HashType,
22};
23
24use crate::msg::ConsistencyLevel;
25use crate::net::auto_eject::AutoEject;
26
27/// Convert a YAML-form consistency string from a bucket-type
28/// stanza into the runtime [`ConsistencyLevel`] enum.
29///
30/// Values that fail validation fall back to `DcOne`; the
31/// configuration validator runs first, so by the time this is
32/// called every string is already known to be one of the four
33/// canonical names (case-insensitive).
34fn bucket_type_consistency(raw: &str) -> ConsistencyLevel {
35    match ConfConsistencyLevel::parse("bucket_type_consistency", raw) {
36        Ok(ConfConsistencyLevel::DcQuorum) => ConsistencyLevel::DcQuorum,
37        Ok(ConfConsistencyLevel::DcSafeQuorum) => ConsistencyLevel::DcSafeQuorum,
38        Ok(ConfConsistencyLevel::DcEachSafeQuorum) => ConsistencyLevel::DcEachSafeQuorum,
39        Ok(ConfConsistencyLevel::DcOne) | Err(_) => ConsistencyLevel::DcOne,
40    }
41}
42
43/// Resolved bucket-type bundle stored on the live
44/// [`PoolConfig`].
45///
46/// Mirrors [`crate::conf::ConfBucketType`] but with the
47/// consistency strings already parsed into
48/// [`ConsistencyLevel`] and the field-name semantics finalised
49/// for the dispatcher's hot path.
50#[derive(Clone, Debug, Eq, PartialEq)]
51pub struct BucketType {
52    /// Bucket name. Compared verbatim against the prefix
53    /// returned by [`crate::proto::redis::bucket_name`].
54    pub name: String,
55    /// Read consistency override.
56    pub read_consistency: ConsistencyLevel,
57    /// Write consistency override.
58    pub write_consistency: ConsistencyLevel,
59    /// Replica fan-out cap. `0` means no cap.
60    pub n_val: u8,
61}
62
63/// Minimal projection of the YAML pool block consumed by the
64/// cluster runtime.
65///
66/// Mirrors the fields the reference engine copies from
67/// `conf_pool` into `server_pool` during `server_pool_init`.
68#[derive(Clone, Debug)]
69pub struct PoolConfig {
70    /// Pool name.
71    pub name: String,
72    /// Local datacenter name.
73    pub dc: String,
74    /// Local rack name.
75    pub rack: String,
76    /// Backing datastore protocol.
77    pub data_store: DataStore,
78    /// Hash function used for token ring lookups.
79    pub hash: HashType,
80    /// Distribution algorithm used to map a hashed key to a
81    /// peer. Defaults to [`Distribution::Vnode`].
82    pub distribution: Distribution,
83    /// Optional shadow distribution. When `Some`, the
84    /// dispatcher computes both the live and shadow routes for
85    /// every request and bumps a counter when they disagree.
86    /// The actual route is the configured
87    /// [`Self::distribution`].
88    pub distribution_shadow: Option<Distribution>,
89    /// Read consistency level.
90    pub read_consistency: ConsistencyLevel,
91    /// Write consistency level.
92    pub write_consistency: ConsistencyLevel,
93    /// Operation timeout in milliseconds.
94    pub timeout_ms: u64,
95    /// Eject window (`server_retry_timeout_ms`).
96    pub server_retry_timeout_ms: u64,
97    /// Consecutive-failure threshold.
98    pub server_failure_limit: u32,
99    /// Honor `auto_eject_hosts`.
100    pub auto_eject_hosts: bool,
101    /// Whether gossip is enabled (`enable_gossip`).
102    pub enable_gossip: bool,
103    /// Per-bucket routing-property bundles. Empty when the
104    /// `bucket_types:` YAML stanza is unset.
105    pub bucket_types: Vec<BucketType>,
106    /// Name of the bucket type to apply when the request key has
107    /// no slash. References an entry of `bucket_types`; `None`
108    /// keeps the pool-level defaults.
109    pub default_bucket_type: Option<String>,
110    /// Hinted-handoff feature flag. When `true`, writes targeted
111    /// at peers in [`crate::cluster::peer::PeerState::Down`] (or
112    /// at peers whose outbound channel is closed / full) are
113    /// stored as hints in the node-local
114    /// [`crate::cluster::hints::HintStore`] and counted toward
115    /// the request's consistency threshold. The default `false`
116    /// preserves the legacy behaviour where Down targets are
117    /// silently skipped.
118    pub enable_hinted_handoff: bool,
119    /// Hint expiry, in seconds. Hints older than this are
120    /// dropped during the periodic sweep so the in-memory store
121    /// stays bounded.
122    pub hint_ttl_seconds: u64,
123    /// Upper bound on the in-memory hint store. Once the store
124    /// holds this many bytes, further enqueues fail and the
125    /// dispatcher falls back to its non-handoff error path.
126    pub hint_store_max_bytes: u64,
127    /// Hint drainer cadence, in milliseconds. Setting this to
128    /// zero is rejected by the configuration validator when
129    /// `enable_hinted_handoff` is true.
130    pub hint_drain_interval_ms: u64,
131}
132
133impl Default for PoolConfig {
134    fn default() -> Self {
135        Self {
136            name: "p".into(),
137            dc: "localdc".into(),
138            rack: "localrack".into(),
139            data_store: DataStore::Redis,
140            hash: HashType::Murmur,
141            distribution: Distribution::Vnode,
142            distribution_shadow: None,
143            read_consistency: ConsistencyLevel::DcOne,
144            write_consistency: ConsistencyLevel::DcOne,
145            timeout_ms: 5_000,
146            server_retry_timeout_ms: 30_000,
147            server_failure_limit: 2,
148            auto_eject_hosts: false,
149            enable_gossip: false,
150            bucket_types: Vec::new(),
151            default_bucket_type: None,
152            enable_hinted_handoff: false,
153            hint_ttl_seconds: 86_400,
154            hint_store_max_bytes: 64 * 1024 * 1024,
155            hint_drain_interval_ms: 30_000,
156        }
157    }
158}
159
160impl PoolConfig {
161    /// Look up a bucket type by name.
162    ///
163    /// Returns the matching [`BucketType`] when one is
164    /// configured. The dispatcher consults this on every request
165    /// to swap in per-bucket consistency / fan-out settings.
166    #[must_use]
167    pub fn lookup_bucket_type(&self, name: &[u8]) -> Option<&BucketType> {
168        self.bucket_types
169            .iter()
170            .find(|bt| bt.name.as_bytes() == name)
171    }
172
173    /// Resolve the bucket type that applies to a request whose
174    /// extracted bucket name is `bucket`. `None` falls back to
175    /// `default_bucket_type` (also possibly `None`).
176    #[must_use]
177    pub fn resolve_bucket_type(&self, bucket: Option<&[u8]>) -> Option<&BucketType> {
178        if let Some(b) = bucket {
179            if let Some(bt) = self.lookup_bucket_type(b) {
180                return Some(bt);
181            }
182        }
183        // Either no bucket prefix on the key, or the prefix did
184        // not match any configured type. Fall through to the
185        // default bucket type when one is named.
186        let name = self.default_bucket_type.as_deref()?;
187        self.lookup_bucket_type(name.as_bytes())
188    }
189}
190
191impl PoolConfig {
192    /// Construct a `PoolConfig` from a [`ConfPool`] block. Fields
193    /// missing from the YAML are filled with the same defaults the
194    /// reference engine applies in `conf_pool_each_transform` (the
195    /// caller is expected to have called
196    /// [`crate::conf::Config::finalize`] before this point).
197    ///
198    /// # Examples
199    ///
200    /// ```
201    /// use dynomite::cluster::pool::PoolConfig;
202    /// use dynomite::conf::Config;
203    /// let mut cfg = Config::parse_str(
204    ///     "p:\n  listen: 127.0.0.1:1\n  dyn_listen: 127.0.0.1:2\n  tokens: '1'\n  servers:\n  - 127.0.0.1:3:1\n  data_store: 0\n",
205    /// ).unwrap();
206    /// cfg.finalize();
207    /// let pc = PoolConfig::from_conf("p", cfg.pool());
208    /// assert_eq!(pc.name, "p");
209    /// ```
210    #[must_use]
211    pub fn from_conf(name: &str, pool: &ConfPool) -> Self {
212        let parse_consistency = |s: &Option<String>| {
213            s.as_deref()
214                .and_then(ConsistencyLevel::from_name)
215                .unwrap_or(ConsistencyLevel::DcOne)
216        };
217        let data_store = match pool.data_store {
218            Some(1) => DataStore::Memcache,
219            Some(2) => DataStore::Noxu,
220            _ => DataStore::Redis,
221        };
222        Self {
223            name: name.to_string(),
224            dc: pool.datacenter.clone().unwrap_or_else(|| "localdc".into()),
225            rack: pool.rack.clone().unwrap_or_else(|| "localrack".into()),
226            data_store,
227            hash: pool.hash.unwrap_or(HashType::Murmur),
228            distribution: pool.resolved_distribution(),
229            distribution_shadow: pool.distribution_shadow,
230            read_consistency: parse_consistency(&pool.read_consistency),
231            write_consistency: parse_consistency(&pool.write_consistency),
232            timeout_ms: pool
233                .timeout
234                .and_then(|n| u64::try_from(n).ok())
235                .unwrap_or(5_000),
236            server_retry_timeout_ms: pool
237                .server_retry_timeout
238                .and_then(|n| u64::try_from(n).ok())
239                .unwrap_or(30_000),
240            server_failure_limit: pool
241                .server_failure_limit
242                .and_then(|n| u32::try_from(n).ok())
243                .unwrap_or(2),
244            auto_eject_hosts: pool.auto_eject_hosts.unwrap_or(false),
245            enable_gossip: pool.enable_gossip.unwrap_or(false),
246            bucket_types: pool
247                .bucket_types
248                .iter()
249                .map(|bt| BucketType {
250                    name: bt.name.clone(),
251                    read_consistency: bucket_type_consistency(&bt.read_consistency),
252                    write_consistency: bucket_type_consistency(&bt.write_consistency),
253                    n_val: bt.n_val,
254                })
255                .collect(),
256            default_bucket_type: pool.default_bucket_type.clone(),
257            enable_hinted_handoff: pool.enable_hinted_handoff.unwrap_or(false),
258            hint_ttl_seconds: pool.hint_ttl_seconds.unwrap_or(86_400),
259            hint_store_max_bytes: pool.hint_store_max_bytes.unwrap_or(64 * 1024 * 1024),
260            hint_drain_interval_ms: pool.hint_drain_interval_ms.unwrap_or(30_000),
261        }
262    }
263}
264
265/// Cluster-wide owner.
266///
267/// Holds the topology (datacenters, racks), the peer list (peer
268/// index 0 is always the local node, mirroring the reference
269/// engine), and the per-peer auto-eject decision state.
270///
271/// `peers` and `datacenters` live behind `RwLock`s so the
272/// dispatcher can hold a read lock while gossip occasionally
273/// upgrades to write.
274///
275/// # Examples
276///
277/// ```
278/// use dynomite::cluster::pool::{PoolConfig, ServerPool};
279/// use dynomite::cluster::peer::{Peer, PeerEndpoint};
280/// use dynomite::hashkit::DynToken;
281/// let cfg = PoolConfig {
282///     dc: "dc1".into(),
283///     rack: "r1".into(),
284///     ..PoolConfig::default()
285/// };
286/// let local = Peer::new(
287///     0, PeerEndpoint::tcp("127.0.0.1".into(), 8101), "r1".into(), "dc1".into(),
288///     vec![DynToken::from_u32(1)], true, true, false,
289/// );
290/// let pool = ServerPool::new(cfg, vec![local]);
291/// assert_eq!(pool.peers().read().len(), 1);
292/// ```
293#[derive(Debug)]
294pub struct ServerPool {
295    config: PoolConfig,
296    peers: Arc<RwLock<Vec<Peer>>>,
297    datacenters: Arc<RwLock<Vec<Datacenter>>>,
298    auto_eject: Arc<RwLock<Vec<AutoEject>>>,
299}
300
301impl ServerPool {
302    /// Build a fresh pool from a [`PoolConfig`] and an initial peer
303    /// list (peer index 0 is the local node).
304    ///
305    /// Datacenters and racks are populated automatically from the
306    /// supplied peers; their continuum is rebuilt by
307    /// [`ServerPool::rebuild_ring`].
308    ///
309    /// # Examples
310    ///
311    /// ```
312    /// use dynomite::cluster::pool::{PoolConfig, ServerPool};
313    /// # use dynomite::cluster::peer::{Peer, PeerEndpoint};
314    /// # use dynomite::hashkit::DynToken;
315    /// # use dynomite::conf::{DataStore, HashType};
316    /// # use dynomite::msg::ConsistencyLevel;
317    /// # let cfg = PoolConfig::default();
318    /// # let local = Peer::new(
319    /// #    0, PeerEndpoint::tcp("h".into(), 1), "r".into(), "d".into(),
320    /// #    vec![DynToken::from_u32(0)], true, true, false,
321    /// # );
322    /// let pool = ServerPool::new(cfg, vec![local]);
323    /// pool.rebuild_ring();
324    /// assert_eq!(pool.datacenters().read().len(), 1);
325    /// ```
326    #[must_use]
327    pub fn new(config: PoolConfig, peers: Vec<Peer>) -> Self {
328        let mut dcs: Vec<Datacenter> = Vec::new();
329        for p in &peers {
330            let dc_idx = if let Some(i) = dcs.iter().position(|d| d.name() == p.dc()) {
331                i
332            } else {
333                dcs.push(Datacenter::new(p.dc().to_string()));
334                dcs.len() - 1
335            };
336            dcs[dc_idx].upsert_rack(p.rack().to_string());
337        }
338        let auto_eject_template = AutoEject::new(
339            config.auto_eject_hosts,
340            config.server_failure_limit,
341            Duration::from_millis(config.server_retry_timeout_ms),
342        );
343        let mut auto_ejects = Vec::with_capacity(peers.len());
344        for _ in &peers {
345            auto_ejects.push(auto_eject_template.clone());
346        }
347        let pool = Self {
348            config,
349            peers: Arc::new(RwLock::new(peers)),
350            datacenters: Arc::new(RwLock::new(dcs)),
351            auto_eject: Arc::new(RwLock::new(auto_ejects)),
352        };
353        pool.rebuild_ring();
354        pool
355    }
356
357    /// Configuration block.
358    #[must_use]
359    pub fn config(&self) -> &PoolConfig {
360        &self.config
361    }
362
363    /// Borrow the peer list (RwLock).
364    #[must_use]
365    pub fn peers(&self) -> &RwLock<Vec<Peer>> {
366        &self.peers
367    }
368
369    /// Shared `Arc` to the peer list.
370    #[must_use]
371    pub fn peers_arc(&self) -> Arc<RwLock<Vec<Peer>>> {
372        self.peers.clone()
373    }
374
375    /// Borrow the datacenter list.
376    #[must_use]
377    pub fn datacenters(&self) -> &RwLock<Vec<Datacenter>> {
378        &self.datacenters
379    }
380
381    /// Borrow the per-peer auto-eject deciders.
382    #[must_use]
383    pub fn auto_eject(&self) -> &RwLock<Vec<AutoEject>> {
384        &self.auto_eject
385    }
386
387    /// Rebuild the per-rack token continuum from the current peer
388    /// table. Mirrors `vnode_update`. When the configured
389    /// distribution is
390    /// [`crate::conf::Distribution::RandomSlicing`], a
391    /// [`crate::hashkit::random_slicing::RandomSlices`] table is
392    /// installed on each rack alongside the continuum so the
393    /// shadow-distribution path can still walk the vnode view.
394    pub fn rebuild_ring(&self) {
395        let peers = self.peers.read();
396        let mut dcs = self.datacenters.write();
397        // Make sure all (dc, rack) pairs exist.
398        for p in peers.iter() {
399            let dc_idx = if let Some(i) = dcs.iter().position(|d| d.name() == p.dc()) {
400                i
401            } else {
402                dcs.push(Datacenter::new(p.dc().to_string()));
403                dcs.len() - 1
404            };
405            dcs[dc_idx].upsert_rack(p.rack().to_string());
406        }
407        let entries: Vec<_> = peers
408            .iter()
409            .map(|p| crate::cluster::vnode::PeerTokens {
410                peer_idx: p.idx(),
411                dc: p.dc(),
412                rack: p.rack(),
413                tokens: p.tokens(),
414            })
415            .collect();
416        crate::cluster::vnode::rebuild_continuums(&mut dcs, &entries);
417        let live_or_shadow_uses_rs = matches!(
418            self.config.distribution,
419            crate::conf::Distribution::RandomSlicing
420        ) || matches!(
421            self.config.distribution_shadow,
422            Some(crate::conf::Distribution::RandomSlicing)
423        );
424        if live_or_shadow_uses_rs {
425            Self::install_random_slices(&peers, &mut dcs);
426        }
427    }
428
429    fn install_random_slices(peers: &[crate::cluster::peer::Peer], dcs: &mut [Datacenter]) {
430        use crate::hashkit::random_slicing::RandomSlices;
431        for dc in dcs.iter_mut() {
432            let dc_name = dc.name().to_string();
433            for rack in dc.racks_mut().iter_mut() {
434                let mut names: Vec<String> = peers
435                    .iter()
436                    .filter(|p| p.dc() == dc_name && p.rack() == rack.name())
437                    .map(|p| p.endpoint().pname())
438                    .collect();
439                names.sort();
440                names.dedup();
441                if names.is_empty() {
442                    continue;
443                }
444                let refs: Vec<&str> = names.iter().map(String::as_str).collect();
445                if let Ok(slices) = RandomSlices::from_uniform(&refs) {
446                    rack.set_random_slices(slices);
447                } else {
448                    tracing::warn!(
449                        target: "dynomite::cluster::pool",
450                        rack = rack.name(),
451                        dc = %dc_name,
452                        "random-slicing build failed; falling back to vnode for this rack"
453                    );
454                }
455            }
456        }
457    }
458
459    /// Walk the datacenters and choose, for each remote DC, a rack
460    /// for cross-DC replication. Mirrors
461    /// `preselect_remote_rack_for_replication`.
462    pub fn preselect_remote_racks(&self) {
463        let mut dcs = self.datacenters.write();
464        for dc in dcs.iter_mut() {
465            dc.sort_racks();
466        }
467        // Find the index of the local rack in the local DC.
468        let mut my_rack_index = 0usize;
469        for dc in dcs.iter() {
470            if dc.name() == self.config.dc {
471                if let Some(idx) = dc.rack_idx(&self.config.rack) {
472                    my_rack_index = idx;
473                }
474                break;
475            }
476        }
477        for dc in dcs.iter_mut() {
478            if dc.name() == self.config.dc {
479                dc.set_preselected_rack_idx(None);
480                continue;
481            }
482            let rack_count = dc.racks().len();
483            if rack_count == 0 {
484                dc.set_preselected_rack_idx(None);
485            } else {
486                dc.set_preselected_rack_idx(Some(my_rack_index % rack_count));
487            }
488        }
489    }
490
491    /// Initialise a per-DC [`crate::msg::ResponseMgr`] vector for
492    /// the supplied request. The walker visits every datacenter
493    /// and produces one manager per DC sized to the rack count.
494    /// Mirrors `init_response_mgr_all_dcs`.
495    ///
496    /// # Examples
497    ///
498    /// ```
499    /// # use dynomite::cluster::pool::{PoolConfig, ServerPool};
500    /// # use dynomite::cluster::peer::{Peer, PeerEndpoint};
501    /// # use dynomite::hashkit::DynToken;
502    /// # use dynomite::conf::{DataStore, HashType};
503    /// # use dynomite::msg::{ConsistencyLevel, Msg, MsgType};
504    /// # let cfg = PoolConfig::default();
505    /// # let local = Peer::new(
506    /// #    0, PeerEndpoint::tcp("h".into(), 1), "r".into(), "d".into(),
507    /// #    vec![DynToken::from_u32(0)], true, true, false,
508    /// # );
509    /// let pool = ServerPool::new(cfg, vec![local]);
510    /// let req = Msg::new(1, MsgType::ReqRedisGet, true);
511    /// let mgrs = pool.init_response_mgrs(&req);
512    /// assert_eq!(mgrs.len(), 1);
513    /// ```
514    #[must_use]
515    pub fn init_response_mgrs(&self, req: &crate::msg::Msg) -> Vec<crate::msg::ResponseMgr> {
516        use crate::msg::{ResponseMgr, MAX_REPLICAS_PER_DC};
517        let dcs = self.datacenters.read();
518        let mut out = Vec::with_capacity(dcs.len());
519        for dc in dcs.iter() {
520            let rack_count = dc.racks().len();
521            let max_responses = u8::try_from(rack_count.clamp(1, MAX_REPLICAS_PER_DC))
522                .unwrap_or(u8::try_from(MAX_REPLICAS_PER_DC).unwrap_or(1));
523            out.push(ResponseMgr::new(
524                req,
525                max_responses,
526                Some(dc.name().to_string()),
527            ));
528        }
529        out
530    }
531}
532
533#[cfg(test)]
534mod tests {
535    use super::*;
536    use crate::cluster::peer::PeerEndpoint;
537    use crate::hashkit::DynToken;
538
539    fn cfg(dc: &str, rack: &str) -> PoolConfig {
540        PoolConfig {
541            dc: dc.into(),
542            rack: rack.into(),
543            ..PoolConfig::default()
544        }
545    }
546
547    fn peer(idx: u32, dc: &str, rack: &str, tok: u32, is_local: bool, is_same: bool) -> Peer {
548        Peer::new(
549            idx,
550            PeerEndpoint::tcp("127.0.0.1".into(), 8101 + u16::try_from(idx).unwrap_or(0)),
551            rack.into(),
552            dc.into(),
553            vec![DynToken::from_u32(tok)],
554            is_local,
555            is_same,
556            false,
557        )
558    }
559
560    #[test]
561    fn build_pool_populates_topology() {
562        let pool = ServerPool::new(
563            cfg("dc1", "r1"),
564            vec![
565                peer(0, "dc1", "r1", 10, true, true),
566                peer(1, "dc1", "r2", 20, false, true),
567                peer(2, "dc2", "r1", 30, false, false),
568            ],
569        );
570        let topology = pool.datacenters().read();
571        let dc1 = topology.iter().find(|d| d.name() == "dc1").unwrap();
572        assert_eq!(dc1.racks().len(), 2);
573    }
574
575    #[test]
576    fn preselect_remote_picks_per_dc() {
577        let pool = ServerPool::new(
578            cfg("dc1", "rA"),
579            vec![
580                peer(0, "dc1", "rA", 10, true, true),
581                peer(1, "dc2", "rA", 20, false, false),
582                peer(2, "dc2", "rB", 30, false, false),
583            ],
584        );
585        pool.preselect_remote_racks();
586        let topology = pool.datacenters().read();
587        let dc2 = topology.iter().find(|d| d.name() == "dc2").unwrap();
588        // Local rack "rA" is at sorted index 0, dc2 has 2 racks, so
589        // preselected idx is 0 -> "rA".
590        assert_eq!(
591            dc2.preselected_rack()
592                .map(super::super::datacenter::Rack::name),
593            Some("rA")
594        );
595    }
596
597    #[test]
598    fn init_response_mgrs_one_per_dc() {
599        let pool = ServerPool::new(
600            cfg("dc1", "r1"),
601            vec![
602                peer(0, "dc1", "r1", 10, true, true),
603                peer(1, "dc2", "r1", 20, false, false),
604            ],
605        );
606        let req = crate::msg::Msg::new(1, crate::msg::MsgType::ReqRedisGet, true);
607        let mgrs = pool.init_response_mgrs(&req);
608        assert_eq!(mgrs.len(), 2);
609    }
610}