use std::sync::Arc;
use std::time::Duration;
use parking_lot::RwLock;
use crate::cluster::datacenter::Datacenter;
use crate::cluster::peer::Peer;
use crate::conf::{
ConfPool, ConsistencyLevel as ConfConsistencyLevel, DataStore, Distribution, HashType,
};
use crate::msg::ConsistencyLevel;
use crate::net::auto_eject::AutoEject;
fn bucket_type_consistency(raw: &str) -> ConsistencyLevel {
match ConfConsistencyLevel::parse("bucket_type_consistency", raw) {
Ok(ConfConsistencyLevel::DcQuorum) => ConsistencyLevel::DcQuorum,
Ok(ConfConsistencyLevel::DcSafeQuorum) => ConsistencyLevel::DcSafeQuorum,
Ok(ConfConsistencyLevel::DcEachSafeQuorum) => ConsistencyLevel::DcEachSafeQuorum,
Ok(ConfConsistencyLevel::DcOne) | Err(_) => ConsistencyLevel::DcOne,
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct BucketType {
pub name: String,
pub read_consistency: ConsistencyLevel,
pub write_consistency: ConsistencyLevel,
pub n_val: u8,
}
#[derive(Clone, Debug)]
pub struct PoolConfig {
pub name: String,
pub dc: String,
pub rack: String,
pub data_store: DataStore,
pub hash: HashType,
pub distribution: Distribution,
pub distribution_shadow: Option<Distribution>,
pub read_consistency: ConsistencyLevel,
pub write_consistency: ConsistencyLevel,
pub timeout_ms: u64,
pub server_retry_timeout_ms: u64,
pub server_failure_limit: u32,
pub auto_eject_hosts: bool,
pub enable_gossip: bool,
pub bucket_types: Vec<BucketType>,
pub default_bucket_type: Option<String>,
pub enable_hinted_handoff: bool,
pub hint_ttl_seconds: u64,
pub hint_store_max_bytes: u64,
pub hint_drain_interval_ms: u64,
}
impl Default for PoolConfig {
fn default() -> Self {
Self {
name: "p".into(),
dc: "localdc".into(),
rack: "localrack".into(),
data_store: DataStore::Redis,
hash: HashType::Murmur,
distribution: Distribution::Vnode,
distribution_shadow: None,
read_consistency: ConsistencyLevel::DcOne,
write_consistency: ConsistencyLevel::DcOne,
timeout_ms: 5_000,
server_retry_timeout_ms: 30_000,
server_failure_limit: 2,
auto_eject_hosts: false,
enable_gossip: false,
bucket_types: Vec::new(),
default_bucket_type: None,
enable_hinted_handoff: false,
hint_ttl_seconds: 86_400,
hint_store_max_bytes: 64 * 1024 * 1024,
hint_drain_interval_ms: 30_000,
}
}
}
impl PoolConfig {
#[must_use]
pub fn lookup_bucket_type(&self, name: &[u8]) -> Option<&BucketType> {
self.bucket_types
.iter()
.find(|bt| bt.name.as_bytes() == name)
}
#[must_use]
pub fn resolve_bucket_type(&self, bucket: Option<&[u8]>) -> Option<&BucketType> {
if let Some(b) = bucket {
if let Some(bt) = self.lookup_bucket_type(b) {
return Some(bt);
}
}
let name = self.default_bucket_type.as_deref()?;
self.lookup_bucket_type(name.as_bytes())
}
}
impl PoolConfig {
#[must_use]
pub fn from_conf(name: &str, pool: &ConfPool) -> Self {
let parse_consistency = |s: &Option<String>| {
s.as_deref()
.and_then(ConsistencyLevel::from_name)
.unwrap_or(ConsistencyLevel::DcOne)
};
let data_store = match pool.data_store {
Some(1) => DataStore::Memcache,
Some(2) => DataStore::Noxu,
_ => DataStore::Redis,
};
Self {
name: name.to_string(),
dc: pool.datacenter.clone().unwrap_or_else(|| "localdc".into()),
rack: pool.rack.clone().unwrap_or_else(|| "localrack".into()),
data_store,
hash: pool.hash.unwrap_or(HashType::Murmur),
distribution: pool.resolved_distribution(),
distribution_shadow: pool.distribution_shadow,
read_consistency: parse_consistency(&pool.read_consistency),
write_consistency: parse_consistency(&pool.write_consistency),
timeout_ms: pool
.timeout
.and_then(|n| u64::try_from(n).ok())
.unwrap_or(5_000),
server_retry_timeout_ms: pool
.server_retry_timeout
.and_then(|n| u64::try_from(n).ok())
.unwrap_or(30_000),
server_failure_limit: pool
.server_failure_limit
.and_then(|n| u32::try_from(n).ok())
.unwrap_or(2),
auto_eject_hosts: pool.auto_eject_hosts.unwrap_or(false),
enable_gossip: pool.enable_gossip.unwrap_or(false),
bucket_types: pool
.bucket_types
.iter()
.map(|bt| BucketType {
name: bt.name.clone(),
read_consistency: bucket_type_consistency(&bt.read_consistency),
write_consistency: bucket_type_consistency(&bt.write_consistency),
n_val: bt.n_val,
})
.collect(),
default_bucket_type: pool.default_bucket_type.clone(),
enable_hinted_handoff: pool.enable_hinted_handoff.unwrap_or(false),
hint_ttl_seconds: pool.hint_ttl_seconds.unwrap_or(86_400),
hint_store_max_bytes: pool.hint_store_max_bytes.unwrap_or(64 * 1024 * 1024),
hint_drain_interval_ms: pool.hint_drain_interval_ms.unwrap_or(30_000),
}
}
}
#[derive(Debug)]
pub struct ServerPool {
config: PoolConfig,
peers: Arc<RwLock<Vec<Peer>>>,
datacenters: Arc<RwLock<Vec<Datacenter>>>,
auto_eject: Arc<RwLock<Vec<AutoEject>>>,
}
impl ServerPool {
#[must_use]
pub fn new(config: PoolConfig, peers: Vec<Peer>) -> Self {
let mut dcs: Vec<Datacenter> = Vec::new();
for p in &peers {
let dc_idx = if let Some(i) = dcs.iter().position(|d| d.name() == p.dc()) {
i
} else {
dcs.push(Datacenter::new(p.dc().to_string()));
dcs.len() - 1
};
dcs[dc_idx].upsert_rack(p.rack().to_string());
}
let auto_eject_template = AutoEject::new(
config.auto_eject_hosts,
config.server_failure_limit,
Duration::from_millis(config.server_retry_timeout_ms),
);
let mut auto_ejects = Vec::with_capacity(peers.len());
for _ in &peers {
auto_ejects.push(auto_eject_template.clone());
}
let pool = Self {
config,
peers: Arc::new(RwLock::new(peers)),
datacenters: Arc::new(RwLock::new(dcs)),
auto_eject: Arc::new(RwLock::new(auto_ejects)),
};
pool.rebuild_ring();
pool
}
#[must_use]
pub fn config(&self) -> &PoolConfig {
&self.config
}
#[must_use]
pub fn peers(&self) -> &RwLock<Vec<Peer>> {
&self.peers
}
#[must_use]
pub fn peers_arc(&self) -> Arc<RwLock<Vec<Peer>>> {
self.peers.clone()
}
#[must_use]
pub fn datacenters(&self) -> &RwLock<Vec<Datacenter>> {
&self.datacenters
}
#[must_use]
pub fn auto_eject(&self) -> &RwLock<Vec<AutoEject>> {
&self.auto_eject
}
pub fn rebuild_ring(&self) {
let peers = self.peers.read();
let mut dcs = self.datacenters.write();
for p in peers.iter() {
let dc_idx = if let Some(i) = dcs.iter().position(|d| d.name() == p.dc()) {
i
} else {
dcs.push(Datacenter::new(p.dc().to_string()));
dcs.len() - 1
};
dcs[dc_idx].upsert_rack(p.rack().to_string());
}
let entries: Vec<_> = peers
.iter()
.map(|p| crate::cluster::vnode::PeerTokens {
peer_idx: p.idx(),
dc: p.dc(),
rack: p.rack(),
tokens: p.tokens(),
})
.collect();
crate::cluster::vnode::rebuild_continuums(&mut dcs, &entries);
let live_or_shadow_uses_rs = matches!(
self.config.distribution,
crate::conf::Distribution::RandomSlicing
) || matches!(
self.config.distribution_shadow,
Some(crate::conf::Distribution::RandomSlicing)
);
if live_or_shadow_uses_rs {
Self::install_random_slices(&peers, &mut dcs);
}
}
fn install_random_slices(peers: &[crate::cluster::peer::Peer], dcs: &mut [Datacenter]) {
use crate::hashkit::random_slicing::RandomSlices;
for dc in dcs.iter_mut() {
let dc_name = dc.name().to_string();
for rack in dc.racks_mut().iter_mut() {
let mut names: Vec<String> = peers
.iter()
.filter(|p| p.dc() == dc_name && p.rack() == rack.name())
.map(|p| p.endpoint().pname())
.collect();
names.sort();
names.dedup();
if names.is_empty() {
continue;
}
let refs: Vec<&str> = names.iter().map(String::as_str).collect();
if let Ok(slices) = RandomSlices::from_uniform(&refs) {
rack.set_random_slices(slices);
} else {
tracing::warn!(
target: "dynomite::cluster::pool",
rack = rack.name(),
dc = %dc_name,
"random-slicing build failed; falling back to vnode for this rack"
);
}
}
}
}
pub fn preselect_remote_racks(&self) {
let mut dcs = self.datacenters.write();
for dc in dcs.iter_mut() {
dc.sort_racks();
}
let mut my_rack_index = 0usize;
for dc in dcs.iter() {
if dc.name() == self.config.dc {
if let Some(idx) = dc.rack_idx(&self.config.rack) {
my_rack_index = idx;
}
break;
}
}
for dc in dcs.iter_mut() {
if dc.name() == self.config.dc {
dc.set_preselected_rack_idx(None);
continue;
}
let rack_count = dc.racks().len();
if rack_count == 0 {
dc.set_preselected_rack_idx(None);
} else {
dc.set_preselected_rack_idx(Some(my_rack_index % rack_count));
}
}
}
#[must_use]
pub fn init_response_mgrs(&self, req: &crate::msg::Msg) -> Vec<crate::msg::ResponseMgr> {
use crate::msg::{ResponseMgr, MAX_REPLICAS_PER_DC};
let dcs = self.datacenters.read();
let mut out = Vec::with_capacity(dcs.len());
for dc in dcs.iter() {
let rack_count = dc.racks().len();
let max_responses = u8::try_from(rack_count.clamp(1, MAX_REPLICAS_PER_DC))
.unwrap_or(u8::try_from(MAX_REPLICAS_PER_DC).unwrap_or(1));
out.push(ResponseMgr::new(
req,
max_responses,
Some(dc.name().to_string()),
));
}
out
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cluster::peer::PeerEndpoint;
use crate::hashkit::DynToken;
fn cfg(dc: &str, rack: &str) -> PoolConfig {
PoolConfig {
dc: dc.into(),
rack: rack.into(),
..PoolConfig::default()
}
}
fn peer(idx: u32, dc: &str, rack: &str, tok: u32, is_local: bool, is_same: bool) -> Peer {
Peer::new(
idx,
PeerEndpoint::tcp("127.0.0.1".into(), 8101 + u16::try_from(idx).unwrap_or(0)),
rack.into(),
dc.into(),
vec![DynToken::from_u32(tok)],
is_local,
is_same,
false,
)
}
#[test]
fn build_pool_populates_topology() {
let pool = ServerPool::new(
cfg("dc1", "r1"),
vec![
peer(0, "dc1", "r1", 10, true, true),
peer(1, "dc1", "r2", 20, false, true),
peer(2, "dc2", "r1", 30, false, false),
],
);
let topology = pool.datacenters().read();
let dc1 = topology.iter().find(|d| d.name() == "dc1").unwrap();
assert_eq!(dc1.racks().len(), 2);
}
#[test]
fn preselect_remote_picks_per_dc() {
let pool = ServerPool::new(
cfg("dc1", "rA"),
vec![
peer(0, "dc1", "rA", 10, true, true),
peer(1, "dc2", "rA", 20, false, false),
peer(2, "dc2", "rB", 30, false, false),
],
);
pool.preselect_remote_racks();
let topology = pool.datacenters().read();
let dc2 = topology.iter().find(|d| d.name() == "dc2").unwrap();
assert_eq!(
dc2.preselected_rack()
.map(super::super::datacenter::Rack::name),
Some("rA")
);
}
#[test]
fn init_response_mgrs_one_per_dc() {
let pool = ServerPool::new(
cfg("dc1", "r1"),
vec![
peer(0, "dc1", "r1", 10, true, true),
peer(1, "dc2", "r1", 20, false, false),
],
);
let req = crate::msg::Msg::new(1, crate::msg::MsgType::ReqRedisGet, true);
let mgrs = pool.init_response_mgrs(&req);
assert_eq!(mgrs.len(), 2);
}
}