dynomite/cluster/pool.rs
1//! Server pool owner.
2//!
3//! [`ServerPool`] is the cluster-level container that holds the
4//! local datastore endpoint, the dnode listener configuration, the
5//! datacenter and rack tables, the per-peer connection pools, the
6//! consistency settings, and the response-manager constructors used
7//! by [`crate::cluster::dispatch::ClusterDispatcher`]. It owns the
8//! data-shape side of `struct server_pool` from the reference engine
9//! and is the entry point that the `dynomited` binary (Stage 12)
10//! and the embedding API (Stage 13) construct from a parsed
11//! [`crate::conf::Config`].
12
13use std::sync::Arc;
14use std::time::Duration;
15
16use parking_lot::RwLock;
17
18use crate::cluster::datacenter::Datacenter;
19use crate::cluster::peer::Peer;
20use crate::conf::{
21 ConfPool, ConsistencyLevel as ConfConsistencyLevel, DataStore, Distribution, HashType,
22};
23
24use crate::msg::ConsistencyLevel;
25use crate::net::auto_eject::AutoEject;
26
27/// Convert a YAML-form consistency string from a bucket-type
28/// stanza into the runtime [`ConsistencyLevel`] enum.
29///
30/// Values that fail validation fall back to `DcOne`; the
31/// configuration validator runs first, so by the time this is
32/// called every string is already known to be one of the four
33/// canonical names (case-insensitive).
34fn bucket_type_consistency(raw: &str) -> ConsistencyLevel {
35 match ConfConsistencyLevel::parse("bucket_type_consistency", raw) {
36 Ok(ConfConsistencyLevel::DcQuorum) => ConsistencyLevel::DcQuorum,
37 Ok(ConfConsistencyLevel::DcSafeQuorum) => ConsistencyLevel::DcSafeQuorum,
38 Ok(ConfConsistencyLevel::DcEachSafeQuorum) => ConsistencyLevel::DcEachSafeQuorum,
39 Ok(ConfConsistencyLevel::DcOne) | Err(_) => ConsistencyLevel::DcOne,
40 }
41}
42
43/// Resolved bucket-type bundle stored on the live
44/// [`PoolConfig`].
45///
46/// Mirrors [`crate::conf::ConfBucketType`] but with the
47/// consistency strings already parsed into
48/// [`ConsistencyLevel`] and the field-name semantics finalised
49/// for the dispatcher's hot path.
50#[derive(Clone, Debug, Eq, PartialEq)]
51pub struct BucketType {
52 /// Bucket name. Compared verbatim against the prefix
53 /// returned by [`crate::proto::redis::bucket_name`].
54 pub name: String,
55 /// Read consistency override.
56 pub read_consistency: ConsistencyLevel,
57 /// Write consistency override.
58 pub write_consistency: ConsistencyLevel,
59 /// Replica fan-out cap. `0` means no cap.
60 pub n_val: u8,
61}
62
63/// Minimal projection of the YAML pool block consumed by the
64/// cluster runtime.
65///
66/// Mirrors the fields the reference engine copies from
67/// `conf_pool` into `server_pool` during `server_pool_init`.
68#[derive(Clone, Debug)]
69pub struct PoolConfig {
70 /// Pool name.
71 pub name: String,
72 /// Local datacenter name.
73 pub dc: String,
74 /// Local rack name.
75 pub rack: String,
76 /// Backing datastore protocol.
77 pub data_store: DataStore,
78 /// Hash function used for token ring lookups.
79 pub hash: HashType,
80 /// Distribution algorithm used to map a hashed key to a
81 /// peer. Defaults to [`Distribution::Vnode`].
82 pub distribution: Distribution,
83 /// Optional shadow distribution. When `Some`, the
84 /// dispatcher computes both the live and shadow routes for
85 /// every request and bumps a counter when they disagree.
86 /// The actual route is the configured
87 /// [`Self::distribution`].
88 pub distribution_shadow: Option<Distribution>,
89 /// Read consistency level.
90 pub read_consistency: ConsistencyLevel,
91 /// Write consistency level.
92 pub write_consistency: ConsistencyLevel,
93 /// Operation timeout in milliseconds.
94 pub timeout_ms: u64,
95 /// Eject window (`server_retry_timeout_ms`).
96 pub server_retry_timeout_ms: u64,
97 /// Consecutive-failure threshold.
98 pub server_failure_limit: u32,
99 /// Honor `auto_eject_hosts`.
100 pub auto_eject_hosts: bool,
101 /// Whether gossip is enabled (`enable_gossip`).
102 pub enable_gossip: bool,
103 /// Per-bucket routing-property bundles. Empty when the
104 /// `bucket_types:` YAML stanza is unset.
105 pub bucket_types: Vec<BucketType>,
106 /// Name of the bucket type to apply when the request key has
107 /// no slash. References an entry of `bucket_types`; `None`
108 /// keeps the pool-level defaults.
109 pub default_bucket_type: Option<String>,
110 /// Hinted-handoff feature flag. When `true`, writes targeted
111 /// at peers in [`crate::cluster::peer::PeerState::Down`] (or
112 /// at peers whose outbound channel is closed / full) are
113 /// stored as hints in the node-local
114 /// [`crate::cluster::hints::HintStore`] and counted toward
115 /// the request's consistency threshold. The default `false`
116 /// preserves the legacy behaviour where Down targets are
117 /// silently skipped.
118 pub enable_hinted_handoff: bool,
119 /// Hint expiry, in seconds. Hints older than this are
120 /// dropped during the periodic sweep so the in-memory store
121 /// stays bounded.
122 pub hint_ttl_seconds: u64,
123 /// Upper bound on the in-memory hint store. Once the store
124 /// holds this many bytes, further enqueues fail and the
125 /// dispatcher falls back to its non-handoff error path.
126 pub hint_store_max_bytes: u64,
127 /// Hint drainer cadence, in milliseconds. Setting this to
128 /// zero is rejected by the configuration validator when
129 /// `enable_hinted_handoff` is true.
130 pub hint_drain_interval_ms: u64,
131}
132
133impl Default for PoolConfig {
134 fn default() -> Self {
135 Self {
136 name: "p".into(),
137 dc: "localdc".into(),
138 rack: "localrack".into(),
139 data_store: DataStore::Redis,
140 hash: HashType::Murmur,
141 distribution: Distribution::Vnode,
142 distribution_shadow: None,
143 read_consistency: ConsistencyLevel::DcOne,
144 write_consistency: ConsistencyLevel::DcOne,
145 timeout_ms: 5_000,
146 server_retry_timeout_ms: 30_000,
147 server_failure_limit: 2,
148 auto_eject_hosts: false,
149 enable_gossip: false,
150 bucket_types: Vec::new(),
151 default_bucket_type: None,
152 enable_hinted_handoff: false,
153 hint_ttl_seconds: 86_400,
154 hint_store_max_bytes: 64 * 1024 * 1024,
155 hint_drain_interval_ms: 30_000,
156 }
157 }
158}
159
160impl PoolConfig {
161 /// Look up a bucket type by name.
162 ///
163 /// Returns the matching [`BucketType`] when one is
164 /// configured. The dispatcher consults this on every request
165 /// to swap in per-bucket consistency / fan-out settings.
166 #[must_use]
167 pub fn lookup_bucket_type(&self, name: &[u8]) -> Option<&BucketType> {
168 self.bucket_types
169 .iter()
170 .find(|bt| bt.name.as_bytes() == name)
171 }
172
173 /// Resolve the bucket type that applies to a request whose
174 /// extracted bucket name is `bucket`. `None` falls back to
175 /// `default_bucket_type` (also possibly `None`).
176 #[must_use]
177 pub fn resolve_bucket_type(&self, bucket: Option<&[u8]>) -> Option<&BucketType> {
178 if let Some(b) = bucket {
179 if let Some(bt) = self.lookup_bucket_type(b) {
180 return Some(bt);
181 }
182 }
183 // Either no bucket prefix on the key, or the prefix did
184 // not match any configured type. Fall through to the
185 // default bucket type when one is named.
186 let name = self.default_bucket_type.as_deref()?;
187 self.lookup_bucket_type(name.as_bytes())
188 }
189}
190
191impl PoolConfig {
192 /// Construct a `PoolConfig` from a [`ConfPool`] block. Fields
193 /// missing from the YAML are filled with the same defaults the
194 /// reference engine applies in `conf_pool_each_transform` (the
195 /// caller is expected to have called
196 /// [`crate::conf::Config::finalize`] before this point).
197 ///
198 /// # Examples
199 ///
200 /// ```
201 /// use dynomite::cluster::pool::PoolConfig;
202 /// use dynomite::conf::Config;
203 /// let mut cfg = Config::parse_str(
204 /// "p:\n listen: 127.0.0.1:1\n dyn_listen: 127.0.0.1:2\n tokens: '1'\n servers:\n - 127.0.0.1:3:1\n data_store: 0\n",
205 /// ).unwrap();
206 /// cfg.finalize();
207 /// let pc = PoolConfig::from_conf("p", cfg.pool());
208 /// assert_eq!(pc.name, "p");
209 /// ```
210 #[must_use]
211 pub fn from_conf(name: &str, pool: &ConfPool) -> Self {
212 let parse_consistency = |s: &Option<String>| {
213 s.as_deref()
214 .and_then(ConsistencyLevel::from_name)
215 .unwrap_or(ConsistencyLevel::DcOne)
216 };
217 let data_store = match pool.data_store {
218 Some(1) => DataStore::Memcache,
219 Some(2) => DataStore::Noxu,
220 _ => DataStore::Redis,
221 };
222 Self {
223 name: name.to_string(),
224 dc: pool.datacenter.clone().unwrap_or_else(|| "localdc".into()),
225 rack: pool.rack.clone().unwrap_or_else(|| "localrack".into()),
226 data_store,
227 hash: pool.hash.unwrap_or(HashType::Murmur),
228 distribution: pool.resolved_distribution(),
229 distribution_shadow: pool.distribution_shadow,
230 read_consistency: parse_consistency(&pool.read_consistency),
231 write_consistency: parse_consistency(&pool.write_consistency),
232 timeout_ms: pool
233 .timeout
234 .and_then(|n| u64::try_from(n).ok())
235 .unwrap_or(5_000),
236 server_retry_timeout_ms: pool
237 .server_retry_timeout
238 .and_then(|n| u64::try_from(n).ok())
239 .unwrap_or(30_000),
240 server_failure_limit: pool
241 .server_failure_limit
242 .and_then(|n| u32::try_from(n).ok())
243 .unwrap_or(2),
244 auto_eject_hosts: pool.auto_eject_hosts.unwrap_or(false),
245 enable_gossip: pool.enable_gossip.unwrap_or(false),
246 bucket_types: pool
247 .bucket_types
248 .iter()
249 .map(|bt| BucketType {
250 name: bt.name.clone(),
251 read_consistency: bucket_type_consistency(&bt.read_consistency),
252 write_consistency: bucket_type_consistency(&bt.write_consistency),
253 n_val: bt.n_val,
254 })
255 .collect(),
256 default_bucket_type: pool.default_bucket_type.clone(),
257 enable_hinted_handoff: pool.enable_hinted_handoff.unwrap_or(false),
258 hint_ttl_seconds: pool.hint_ttl_seconds.unwrap_or(86_400),
259 hint_store_max_bytes: pool.hint_store_max_bytes.unwrap_or(64 * 1024 * 1024),
260 hint_drain_interval_ms: pool.hint_drain_interval_ms.unwrap_or(30_000),
261 }
262 }
263}
264
265/// Cluster-wide owner.
266///
267/// Holds the topology (datacenters, racks), the peer list (peer
268/// index 0 is always the local node, mirroring the reference
269/// engine), and the per-peer auto-eject decision state.
270///
271/// `peers` and `datacenters` live behind `RwLock`s so the
272/// dispatcher can hold a read lock while gossip occasionally
273/// upgrades to write.
274///
275/// # Examples
276///
277/// ```
278/// use dynomite::cluster::pool::{PoolConfig, ServerPool};
279/// use dynomite::cluster::peer::{Peer, PeerEndpoint};
280/// use dynomite::hashkit::DynToken;
281/// let cfg = PoolConfig {
282/// dc: "dc1".into(),
283/// rack: "r1".into(),
284/// ..PoolConfig::default()
285/// };
286/// let local = Peer::new(
287/// 0, PeerEndpoint::tcp("127.0.0.1".into(), 8101), "r1".into(), "dc1".into(),
288/// vec![DynToken::from_u32(1)], true, true, false,
289/// );
290/// let pool = ServerPool::new(cfg, vec![local]);
291/// assert_eq!(pool.peers().read().len(), 1);
292/// ```
293#[derive(Debug)]
294pub struct ServerPool {
295 config: PoolConfig,
296 peers: Arc<RwLock<Vec<Peer>>>,
297 datacenters: Arc<RwLock<Vec<Datacenter>>>,
298 auto_eject: Arc<RwLock<Vec<AutoEject>>>,
299}
300
301impl ServerPool {
302 /// Build a fresh pool from a [`PoolConfig`] and an initial peer
303 /// list (peer index 0 is the local node).
304 ///
305 /// Datacenters and racks are populated automatically from the
306 /// supplied peers; their continuum is rebuilt by
307 /// [`ServerPool::rebuild_ring`].
308 ///
309 /// # Examples
310 ///
311 /// ```
312 /// use dynomite::cluster::pool::{PoolConfig, ServerPool};
313 /// # use dynomite::cluster::peer::{Peer, PeerEndpoint};
314 /// # use dynomite::hashkit::DynToken;
315 /// # use dynomite::conf::{DataStore, HashType};
316 /// # use dynomite::msg::ConsistencyLevel;
317 /// # let cfg = PoolConfig::default();
318 /// # let local = Peer::new(
319 /// # 0, PeerEndpoint::tcp("h".into(), 1), "r".into(), "d".into(),
320 /// # vec![DynToken::from_u32(0)], true, true, false,
321 /// # );
322 /// let pool = ServerPool::new(cfg, vec![local]);
323 /// pool.rebuild_ring();
324 /// assert_eq!(pool.datacenters().read().len(), 1);
325 /// ```
326 #[must_use]
327 pub fn new(config: PoolConfig, peers: Vec<Peer>) -> Self {
328 let mut dcs: Vec<Datacenter> = Vec::new();
329 for p in &peers {
330 let dc_idx = if let Some(i) = dcs.iter().position(|d| d.name() == p.dc()) {
331 i
332 } else {
333 dcs.push(Datacenter::new(p.dc().to_string()));
334 dcs.len() - 1
335 };
336 dcs[dc_idx].upsert_rack(p.rack().to_string());
337 }
338 let auto_eject_template = AutoEject::new(
339 config.auto_eject_hosts,
340 config.server_failure_limit,
341 Duration::from_millis(config.server_retry_timeout_ms),
342 );
343 let mut auto_ejects = Vec::with_capacity(peers.len());
344 for _ in &peers {
345 auto_ejects.push(auto_eject_template.clone());
346 }
347 let pool = Self {
348 config,
349 peers: Arc::new(RwLock::new(peers)),
350 datacenters: Arc::new(RwLock::new(dcs)),
351 auto_eject: Arc::new(RwLock::new(auto_ejects)),
352 };
353 pool.rebuild_ring();
354 pool
355 }
356
357 /// Configuration block.
358 #[must_use]
359 pub fn config(&self) -> &PoolConfig {
360 &self.config
361 }
362
363 /// Borrow the peer list (RwLock).
364 #[must_use]
365 pub fn peers(&self) -> &RwLock<Vec<Peer>> {
366 &self.peers
367 }
368
369 /// Shared `Arc` to the peer list.
370 #[must_use]
371 pub fn peers_arc(&self) -> Arc<RwLock<Vec<Peer>>> {
372 self.peers.clone()
373 }
374
375 /// Borrow the datacenter list.
376 #[must_use]
377 pub fn datacenters(&self) -> &RwLock<Vec<Datacenter>> {
378 &self.datacenters
379 }
380
381 /// Borrow the per-peer auto-eject deciders.
382 #[must_use]
383 pub fn auto_eject(&self) -> &RwLock<Vec<AutoEject>> {
384 &self.auto_eject
385 }
386
387 /// Rebuild the per-rack token continuum from the current peer
388 /// table. Mirrors `vnode_update`. When the configured
389 /// distribution is
390 /// [`crate::conf::Distribution::RandomSlicing`], a
391 /// [`crate::hashkit::random_slicing::RandomSlices`] table is
392 /// installed on each rack alongside the continuum so the
393 /// shadow-distribution path can still walk the vnode view.
394 pub fn rebuild_ring(&self) {
395 let peers = self.peers.read();
396 let mut dcs = self.datacenters.write();
397 // Make sure all (dc, rack) pairs exist.
398 for p in peers.iter() {
399 let dc_idx = if let Some(i) = dcs.iter().position(|d| d.name() == p.dc()) {
400 i
401 } else {
402 dcs.push(Datacenter::new(p.dc().to_string()));
403 dcs.len() - 1
404 };
405 dcs[dc_idx].upsert_rack(p.rack().to_string());
406 }
407 let entries: Vec<_> = peers
408 .iter()
409 .map(|p| crate::cluster::vnode::PeerTokens {
410 peer_idx: p.idx(),
411 dc: p.dc(),
412 rack: p.rack(),
413 tokens: p.tokens(),
414 })
415 .collect();
416 crate::cluster::vnode::rebuild_continuums(&mut dcs, &entries);
417 let live_or_shadow_uses_rs = matches!(
418 self.config.distribution,
419 crate::conf::Distribution::RandomSlicing
420 ) || matches!(
421 self.config.distribution_shadow,
422 Some(crate::conf::Distribution::RandomSlicing)
423 );
424 if live_or_shadow_uses_rs {
425 Self::install_random_slices(&peers, &mut dcs);
426 }
427 }
428
429 fn install_random_slices(peers: &[crate::cluster::peer::Peer], dcs: &mut [Datacenter]) {
430 use crate::hashkit::random_slicing::RandomSlices;
431 for dc in dcs.iter_mut() {
432 let dc_name = dc.name().to_string();
433 for rack in dc.racks_mut().iter_mut() {
434 let mut names: Vec<String> = peers
435 .iter()
436 .filter(|p| p.dc() == dc_name && p.rack() == rack.name())
437 .map(|p| p.endpoint().pname())
438 .collect();
439 names.sort();
440 names.dedup();
441 if names.is_empty() {
442 continue;
443 }
444 let refs: Vec<&str> = names.iter().map(String::as_str).collect();
445 if let Ok(slices) = RandomSlices::from_uniform(&refs) {
446 rack.set_random_slices(slices);
447 } else {
448 tracing::warn!(
449 target: "dynomite::cluster::pool",
450 rack = rack.name(),
451 dc = %dc_name,
452 "random-slicing build failed; falling back to vnode for this rack"
453 );
454 }
455 }
456 }
457 }
458
459 /// Walk the datacenters and choose, for each remote DC, a rack
460 /// for cross-DC replication. Mirrors
461 /// `preselect_remote_rack_for_replication`.
462 pub fn preselect_remote_racks(&self) {
463 let mut dcs = self.datacenters.write();
464 for dc in dcs.iter_mut() {
465 dc.sort_racks();
466 }
467 // Find the index of the local rack in the local DC.
468 let mut my_rack_index = 0usize;
469 for dc in dcs.iter() {
470 if dc.name() == self.config.dc {
471 if let Some(idx) = dc.rack_idx(&self.config.rack) {
472 my_rack_index = idx;
473 }
474 break;
475 }
476 }
477 for dc in dcs.iter_mut() {
478 if dc.name() == self.config.dc {
479 dc.set_preselected_rack_idx(None);
480 continue;
481 }
482 let rack_count = dc.racks().len();
483 if rack_count == 0 {
484 dc.set_preselected_rack_idx(None);
485 } else {
486 dc.set_preselected_rack_idx(Some(my_rack_index % rack_count));
487 }
488 }
489 }
490
491 /// Initialise a per-DC [`crate::msg::ResponseMgr`] vector for
492 /// the supplied request. The walker visits every datacenter
493 /// and produces one manager per DC sized to the rack count.
494 /// Mirrors `init_response_mgr_all_dcs`.
495 ///
496 /// # Examples
497 ///
498 /// ```
499 /// # use dynomite::cluster::pool::{PoolConfig, ServerPool};
500 /// # use dynomite::cluster::peer::{Peer, PeerEndpoint};
501 /// # use dynomite::hashkit::DynToken;
502 /// # use dynomite::conf::{DataStore, HashType};
503 /// # use dynomite::msg::{ConsistencyLevel, Msg, MsgType};
504 /// # let cfg = PoolConfig::default();
505 /// # let local = Peer::new(
506 /// # 0, PeerEndpoint::tcp("h".into(), 1), "r".into(), "d".into(),
507 /// # vec![DynToken::from_u32(0)], true, true, false,
508 /// # );
509 /// let pool = ServerPool::new(cfg, vec![local]);
510 /// let req = Msg::new(1, MsgType::ReqRedisGet, true);
511 /// let mgrs = pool.init_response_mgrs(&req);
512 /// assert_eq!(mgrs.len(), 1);
513 /// ```
514 #[must_use]
515 pub fn init_response_mgrs(&self, req: &crate::msg::Msg) -> Vec<crate::msg::ResponseMgr> {
516 use crate::msg::{ResponseMgr, MAX_REPLICAS_PER_DC};
517 let dcs = self.datacenters.read();
518 let mut out = Vec::with_capacity(dcs.len());
519 for dc in dcs.iter() {
520 let rack_count = dc.racks().len();
521 let max_responses = u8::try_from(rack_count.clamp(1, MAX_REPLICAS_PER_DC))
522 .unwrap_or(u8::try_from(MAX_REPLICAS_PER_DC).unwrap_or(1));
523 out.push(ResponseMgr::new(
524 req,
525 max_responses,
526 Some(dc.name().to_string()),
527 ));
528 }
529 out
530 }
531}
532
533#[cfg(test)]
534mod tests {
535 use super::*;
536 use crate::cluster::peer::PeerEndpoint;
537 use crate::hashkit::DynToken;
538
539 fn cfg(dc: &str, rack: &str) -> PoolConfig {
540 PoolConfig {
541 dc: dc.into(),
542 rack: rack.into(),
543 ..PoolConfig::default()
544 }
545 }
546
547 fn peer(idx: u32, dc: &str, rack: &str, tok: u32, is_local: bool, is_same: bool) -> Peer {
548 Peer::new(
549 idx,
550 PeerEndpoint::tcp("127.0.0.1".into(), 8101 + u16::try_from(idx).unwrap_or(0)),
551 rack.into(),
552 dc.into(),
553 vec![DynToken::from_u32(tok)],
554 is_local,
555 is_same,
556 false,
557 )
558 }
559
560 #[test]
561 fn build_pool_populates_topology() {
562 let pool = ServerPool::new(
563 cfg("dc1", "r1"),
564 vec![
565 peer(0, "dc1", "r1", 10, true, true),
566 peer(1, "dc1", "r2", 20, false, true),
567 peer(2, "dc2", "r1", 30, false, false),
568 ],
569 );
570 let topology = pool.datacenters().read();
571 let dc1 = topology.iter().find(|d| d.name() == "dc1").unwrap();
572 assert_eq!(dc1.racks().len(), 2);
573 }
574
575 #[test]
576 fn preselect_remote_picks_per_dc() {
577 let pool = ServerPool::new(
578 cfg("dc1", "rA"),
579 vec![
580 peer(0, "dc1", "rA", 10, true, true),
581 peer(1, "dc2", "rA", 20, false, false),
582 peer(2, "dc2", "rB", 30, false, false),
583 ],
584 );
585 pool.preselect_remote_racks();
586 let topology = pool.datacenters().read();
587 let dc2 = topology.iter().find(|d| d.name() == "dc2").unwrap();
588 // Local rack "rA" is at sorted index 0, dc2 has 2 racks, so
589 // preselected idx is 0 -> "rA".
590 assert_eq!(
591 dc2.preselected_rack()
592 .map(super::super::datacenter::Rack::name),
593 Some("rA")
594 );
595 }
596
597 #[test]
598 fn init_response_mgrs_one_per_dc() {
599 let pool = ServerPool::new(
600 cfg("dc1", "r1"),
601 vec![
602 peer(0, "dc1", "r1", 10, true, true),
603 peer(1, "dc2", "r1", 20, false, false),
604 ],
605 );
606 let req = crate::msg::Msg::new(1, crate::msg::MsgType::ReqRedisGet, true);
607 let mgrs = pool.init_response_mgrs(&req);
608 assert_eq!(mgrs.len(), 2);
609 }
610}