use std::collections::BTreeMap;
use std::fmt;
use std::path::PathBuf;
use serde::{Deserialize, Serialize};
use super::endpoint::ConfListen;
use super::enums::{ConsistencyLevel, DataStore, Distribution, HashType, SecureServerOption};
use super::error::ConfError;
use super::server::{ConfDynSeed, ConfServer};
use super::tokens::TokenList;
pub mod defaults {
pub const TIMEOUT_MS: i64 = 5_000;
pub const LISTEN_BACKLOG: i64 = 512;
pub const CLIENT_CONNECTIONS: i64 = 0;
pub const DATA_STORE: i64 = 0;
pub const PRECONNECT: bool = false;
pub const AUTO_EJECT_HOSTS: bool = true;
pub const SERVER_RETRY_TIMEOUT_MS: i64 = 10 * 1000;
pub const SERVER_FAILURE_LIMIT: i64 = 3;
pub const DYN_READ_TIMEOUT_MS: i64 = 10_000;
pub const DYN_WRITE_TIMEOUT_MS: i64 = 10_000;
pub const DYN_CONNECTIONS: i64 = 100;
pub const GOS_INTERVAL_MS: i64 = 30_000;
pub const ENABLE_HINTED_HANDOFF: bool = false;
pub const HINT_TTL_SECONDS: u64 = 86_400;
pub const HINT_STORE_MAX_BYTES: u64 = 64 * 1024 * 1024;
pub const HINT_DRAIN_INTERVAL_MS: u64 = 30_000;
pub const CONN_MSG_RATE: u32 = 50_000;
pub const STATS_INTERVAL_MS: i64 = 30 * 1000;
pub const STATS_PNAME: &str = "0.0.0.0:22222";
pub const DATASTORE_CONNECTIONS: u8 = 1;
pub const LOCAL_PEER_CONNECTIONS: u8 = 1;
pub const REMOTE_PEER_CONNECTIONS: u8 = 1;
pub const RACK: &str = "localrack";
pub const DC: &str = "localdc";
pub const SECURE_SERVER_OPTION: &str = "none";
pub const CONSISTENCY: &str = "DC_ONE";
pub const SEED_PROVIDER: &str = "simple_provider";
pub const ENV: &str = "aws";
pub const PEM_KEY_FILE: &str = "conf/dynomite.pem";
pub const RECON_KEY_FILE: &str = "conf/recon_key.pem";
pub const RECON_IV_FILE: &str = "conf/recon_iv.pem";
pub const RECON_INTERVAL_SECONDS: u64 = 300;
pub const MBUF_MIN_SIZE: i64 = 512;
pub const MBUF_MAX_SIZE: i64 = 512_000;
pub const ALLOC_MSGS_MIN: i64 = 100_000;
pub const ALLOC_MSGS_MAX: i64 = 1_000_000;
}
#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize)]
#[serde(transparent)]
pub struct Servers(pub(crate) Vec<ConfServer>);
impl Servers {
pub fn from_vec(v: Vec<ConfServer>) -> Self {
Self(v)
}
}
impl Servers {
pub fn entries(&self) -> &[ConfServer] {
&self.0
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn datastore(&self) -> Option<&ConfServer> {
self.0.first()
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(deny_unknown_fields, default)]
pub struct ConfPool {
pub listen: Option<ConfListen>,
pub dyn_listen: Option<ConfListen>,
pub stats_listen: Option<ConfListen>,
pub hash: Option<HashType>,
pub hash_tag: Option<String>,
#[serde(default)]
pub distribution: Option<Distribution>,
#[serde(default)]
pub distribution_shadow: Option<Distribution>,
#[serde(default)]
pub server_connections: Option<i64>,
pub timeout: Option<i64>,
pub backlog: Option<i64>,
pub client_connections: Option<i64>,
#[serde(default, deserialize_with = "deserialize_data_store")]
pub data_store: Option<i64>,
#[serde(default)]
pub noxu_path: Option<PathBuf>,
pub preconnect: Option<bool>,
#[serde(default)]
pub redis_requirepass: Option<String>,
pub auto_eject_hosts: Option<bool>,
pub server_retry_timeout: Option<i64>,
pub server_failure_limit: Option<i64>,
pub servers: Option<Servers>,
pub dyn_read_timeout: Option<i64>,
pub dyn_write_timeout: Option<i64>,
pub dyn_seed_provider: Option<String>,
pub dyn_seeds: Option<Vec<ConfDynSeed>>,
pub dyn_port: Option<i64>,
pub dyn_connections: Option<i64>,
pub rack: Option<String>,
pub tokens: Option<TokenList>,
pub gos_interval: Option<i64>,
pub secure_server_option: Option<String>,
pub pem_key_file: Option<String>,
pub recon_key_file: Option<String>,
pub recon_iv_file: Option<String>,
#[serde(default)]
pub recon_interval_seconds: Option<u64>,
pub datacenter: Option<String>,
pub env: Option<String>,
pub conn_msg_rate: Option<u32>,
pub read_consistency: Option<String>,
pub write_consistency: Option<String>,
pub stats_interval: Option<i64>,
pub enable_gossip: Option<bool>,
#[serde(default)]
pub peer_tls_cert: Option<PathBuf>,
#[serde(default)]
pub peer_tls_key: Option<PathBuf>,
#[serde(default)]
pub peer_tls_ca: Option<PathBuf>,
#[serde(default)]
pub peer_tls_profiles: BTreeMap<String, ConfTlsProfile>,
pub mbuf_size: Option<i64>,
pub max_msgs: Option<i64>,
pub datastore_connections: Option<u8>,
pub local_peer_connections: Option<u8>,
pub remote_peer_connections: Option<u8>,
pub read_repairs_enabled: Option<bool>,
#[serde(default)]
pub enable_hinted_handoff: Option<bool>,
#[serde(default)]
pub hint_ttl_seconds: Option<u64>,
#[serde(default)]
pub hint_store_max_bytes: Option<u64>,
#[serde(default)]
pub hint_drain_interval_ms: Option<u64>,
pub log_format: Option<String>,
#[serde(default)]
pub observability: Option<ObservabilityConfig>,
#[serde(default)]
pub bucket_types: Vec<ConfBucketType>,
#[serde(default)]
pub default_bucket_type: Option<String>,
#[serde(default)]
pub riak: Option<ConfRiak>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(deny_unknown_fields, default)]
pub struct ConfRiak {
pub pbc_listen: Option<String>,
pub http_listen: Option<String>,
pub aae_enabled: Option<bool>,
pub aae_full_sweep_interval_seconds: Option<u64>,
pub aae_segment_interval_seconds: Option<u64>,
#[serde(default)]
pub tls_cert: Option<PathBuf>,
#[serde(default)]
pub tls_key: Option<PathBuf>,
#[serde(default)]
pub tls_ca: Option<PathBuf>,
#[serde(default)]
pub wasm_modules: Option<Vec<ConfRiakWasmModule>>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(deny_unknown_fields)]
pub struct ConfRiakWasmModule {
pub id: String,
pub path: PathBuf,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(deny_unknown_fields, default)]
pub struct ConfTlsProfile {
pub cert: Option<PathBuf>,
pub key: Option<PathBuf>,
pub ca: Option<PathBuf>,
}
impl ConfTlsProfile {
pub fn validate(&self, dc: &str) -> Result<(), ConfError> {
match (self.cert.as_deref(), self.key.as_deref()) {
(Some(_), Some(_)) | (None, None) => {}
(Some(c), None) => {
return Err(ConfError::BadServer {
field: "peer_tls_profiles.cert",
value: c.display().to_string(),
reason: format!(
"peer_tls_profiles[{dc}].cert is set but .key is not; both must be set together"
),
});
}
(None, Some(k)) => {
return Err(ConfError::BadServer {
field: "peer_tls_profiles.key",
value: k.display().to_string(),
reason: format!(
"peer_tls_profiles[{dc}].key is set but .cert is not; both must be set together"
),
});
}
}
if self.ca.is_some() && self.cert.is_none() {
return Err(ConfError::BadServer {
field: "peer_tls_profiles.ca",
value: self
.ca
.as_ref()
.map_or_else(String::new, |p| p.display().to_string()),
reason: format!(
"peer_tls_profiles[{dc}].ca requires .cert and .key to also be set"
),
});
}
Ok(())
}
}
impl ConfRiak {
pub fn validate(&self) -> Result<(), ConfError> {
if let Some(addr) = self.pbc_listen.as_deref() {
validate_riak_addr("pbc_listen", addr)?;
}
if let Some(addr) = self.http_listen.as_deref() {
validate_riak_addr("http_listen", addr)?;
}
if let Some(n) = self.aae_full_sweep_interval_seconds {
if n == 0 {
return Err(ConfError::BadServer {
field: "aae_full_sweep_interval_seconds",
value: n.to_string(),
reason: "must be > 0".into(),
});
}
}
if let Some(n) = self.aae_segment_interval_seconds {
if n == 0 {
return Err(ConfError::BadServer {
field: "aae_segment_interval_seconds",
value: n.to_string(),
reason: "must be > 0".into(),
});
}
}
if let (Some(seg), Some(full)) = (
self.aae_segment_interval_seconds,
self.aae_full_sweep_interval_seconds,
) {
if seg > full {
return Err(ConfError::BadServer {
field: "aae_segment_interval_seconds",
value: seg.to_string(),
reason: format!("must be <= aae_full_sweep_interval_seconds ({full})"),
});
}
}
validate_tls_pair(
"tls_cert",
"tls_key",
self.tls_cert.as_deref(),
self.tls_key.as_deref(),
)?;
if self.tls_ca.is_some() && self.tls_cert.is_none() {
return Err(ConfError::BadServer {
field: "tls_ca",
value: self
.tls_ca
.as_ref()
.map_or_else(String::new, |p| p.display().to_string()),
reason: "requires tls_cert and tls_key to also be set".into(),
});
}
if let Some(modules) = self.wasm_modules.as_deref() {
let mut seen: std::collections::BTreeSet<&str> = std::collections::BTreeSet::new();
for m in modules {
if m.id.is_empty() {
return Err(ConfError::BadServer {
field: "wasm_modules.id",
value: String::new(),
reason: "wasm module id must not be empty".into(),
});
}
if !seen.insert(m.id.as_str()) {
return Err(ConfError::BadServer {
field: "wasm_modules.id",
value: m.id.clone(),
reason: "wasm module ids must be unique".into(),
});
}
if !m.path.is_file() {
return Err(ConfError::BadServer {
field: "wasm_modules.path",
value: m.path.display().to_string(),
reason: format!("wasm module file not found for id '{}'", m.id),
});
}
}
}
Ok(())
}
}
fn validate_tls_pair(
cert_field: &'static str,
key_field: &'static str,
cert: Option<&std::path::Path>,
key: Option<&std::path::Path>,
) -> Result<(), ConfError> {
match (cert, key) {
(Some(_), Some(_)) | (None, None) => Ok(()),
(Some(c), None) => Err(ConfError::BadServer {
field: cert_field,
value: c.display().to_string(),
reason: format!(
"{cert_field} is set but {key_field} is not; both must be set together"
),
}),
(None, Some(k)) => Err(ConfError::BadServer {
field: key_field,
value: k.display().to_string(),
reason: format!(
"{key_field} is set but {cert_field} is not; both must be set together"
),
}),
}
}
fn validate_riak_addr(field: &'static str, value: &str) -> Result<(), ConfError> {
use std::net::ToSocketAddrs;
if value.is_empty() {
return Err(ConfError::BadServer {
field,
value: value.to_string(),
reason: "riak listen address must not be empty".into(),
});
}
if value.parse::<std::net::SocketAddr>().is_ok() {
return Ok(());
}
match value.to_socket_addrs() {
Ok(mut iter) => {
if iter.next().is_some() {
Ok(())
} else {
Err(ConfError::BadServer {
field,
value: value.to_string(),
reason: "resolved to no addresses".into(),
})
}
}
Err(e) => Err(ConfError::BadServer {
field,
value: value.to_string(),
reason: format!("could not resolve: {e}"),
}),
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
#[serde(deny_unknown_fields)]
pub struct ConfBucketType {
pub name: String,
pub read_consistency: String,
pub write_consistency: String,
#[serde(default)]
pub n_val: u8,
}
impl ConfBucketType {
pub fn read_level(&self) -> Result<ConsistencyLevel, ConfError> {
ConsistencyLevel::parse("read_consistency", &self.read_consistency)
}
pub fn write_level(&self) -> Result<ConsistencyLevel, ConfError> {
ConsistencyLevel::parse("write_consistency", &self.write_consistency)
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
#[serde(deny_unknown_fields, default)]
pub struct ObservabilityConfig {
pub otlp_traces_endpoint: Option<String>,
pub otlp_logs_endpoint: Option<String>,
pub service_name: Option<String>,
pub traces_sampling: Option<f64>,
}
impl ConfPool {
#[must_use]
pub fn resolved_distribution(&self) -> Distribution {
match self.distribution {
None | Some(Distribution::Vnode) => Distribution::Vnode,
Some(Distribution::RandomSlicing) => Distribution::RandomSlicing,
Some(other) => {
tracing::warn!(
target: "dynomite::conf",
distribution = other.as_str(),
"distribution mode '{}' is a legacy alias and resolves to 'vnode'; \
update the YAML to either 'vnode' or 'random_slicing'",
other
);
Distribution::Vnode
}
}
}
}
impl ConfPool {
pub fn apply_defaults(&mut self) {
if self.dyn_seed_provider.is_none() {
self.dyn_seed_provider = Some(defaults::SEED_PROVIDER.to_string());
}
if self.hash.is_none() {
self.hash = Some(HashType::Murmur);
}
if self.timeout.is_none() {
self.timeout = Some(defaults::TIMEOUT_MS);
}
if self.backlog.is_none() {
self.backlog = Some(defaults::LISTEN_BACKLOG);
}
self.client_connections = Some(defaults::CLIENT_CONNECTIONS);
if self.data_store.is_none() {
self.data_store = Some(defaults::DATA_STORE);
}
if self.preconnect.is_none() {
self.preconnect = Some(defaults::PRECONNECT);
}
if self.auto_eject_hosts.is_none() {
self.auto_eject_hosts = Some(defaults::AUTO_EJECT_HOSTS);
}
if self.server_retry_timeout.is_none() {
self.server_retry_timeout = Some(defaults::SERVER_RETRY_TIMEOUT_MS);
}
if self.server_failure_limit.is_none() {
self.server_failure_limit = Some(defaults::SERVER_FAILURE_LIMIT);
}
if self.dyn_read_timeout.is_none() {
self.dyn_read_timeout = Some(defaults::DYN_READ_TIMEOUT_MS);
}
if self.dyn_write_timeout.is_none() {
self.dyn_write_timeout = Some(defaults::DYN_WRITE_TIMEOUT_MS);
}
if self.dyn_connections.is_none() {
self.dyn_connections = Some(defaults::DYN_CONNECTIONS);
}
if self.gos_interval.is_none() {
self.gos_interval = Some(defaults::GOS_INTERVAL_MS);
}
if self.conn_msg_rate.is_none() {
self.conn_msg_rate = Some(defaults::CONN_MSG_RATE);
}
if self.rack.is_none() {
self.rack = Some(defaults::RACK.to_string());
}
if self.datacenter.is_none() {
self.datacenter = Some(defaults::DC.to_string());
}
if self.secure_server_option.is_none() {
self.secure_server_option = Some(defaults::SECURE_SERVER_OPTION.to_string());
}
if self.read_consistency.is_none() {
self.read_consistency = Some(defaults::CONSISTENCY.to_string());
}
if self.write_consistency.is_none() {
self.write_consistency = Some(defaults::CONSISTENCY.to_string());
}
if self.stats_interval.is_none() {
self.stats_interval = Some(defaults::STATS_INTERVAL_MS);
}
if self.stats_listen.is_none() {
self.stats_listen = Some(
ConfListen::parse("stats_listen", defaults::STATS_PNAME)
.expect("invariant: STATS_PNAME constant is valid"),
);
}
if self.env.is_none() {
self.env = Some(defaults::ENV.to_string());
}
if self.pem_key_file.is_none() {
self.pem_key_file = Some(defaults::PEM_KEY_FILE.to_string());
}
if self.recon_key_file.is_none() {
self.recon_key_file = Some(defaults::RECON_KEY_FILE.to_string());
}
if self.recon_iv_file.is_none() {
self.recon_iv_file = Some(defaults::RECON_IV_FILE.to_string());
}
if self.recon_interval_seconds.is_none() {
self.recon_interval_seconds = Some(defaults::RECON_INTERVAL_SECONDS);
}
if self.datastore_connections.is_none() {
self.datastore_connections = Some(defaults::DATASTORE_CONNECTIONS);
}
if self.local_peer_connections.is_none() {
self.local_peer_connections = Some(defaults::LOCAL_PEER_CONNECTIONS);
}
if self.remote_peer_connections.is_none() {
self.remote_peer_connections = Some(defaults::REMOTE_PEER_CONNECTIONS);
}
if self.read_repairs_enabled.is_none() {
self.read_repairs_enabled = Some(false);
}
if self.enable_gossip.is_none() {
self.enable_gossip = Some(false);
}
self.apply_hinted_handoff_defaults();
}
fn apply_hinted_handoff_defaults(&mut self) {
if self.enable_hinted_handoff.is_none() {
self.enable_hinted_handoff = Some(defaults::ENABLE_HINTED_HANDOFF);
}
if self.hint_ttl_seconds.is_none() {
self.hint_ttl_seconds = Some(defaults::HINT_TTL_SECONDS);
}
if self.hint_store_max_bytes.is_none() {
self.hint_store_max_bytes = Some(defaults::HINT_STORE_MAX_BYTES);
}
if self.hint_drain_interval_ms.is_none() {
self.hint_drain_interval_ms = Some(defaults::HINT_DRAIN_INTERVAL_MS);
}
}
pub fn validate(&self, pool_name: &str) -> Result<(), ConfError> {
if pool_name.is_empty() {
return Err(ConfError::EmptyPoolName);
}
if self.listen.is_none() {
return Err(ConfError::MissingRequired("listen"));
}
self.validate_numeric_ranges()?;
self.validate_mbuf_size()?;
self.validate_max_msgs()?;
if let Some(n) = self.data_store {
let ds = DataStore::from_int(n)?;
if ds == DataStore::Noxu {
self.validate_noxu()?;
}
}
if let Some(tag) = &self.hash_tag {
if tag.chars().count() != 2 {
return Err(ConfError::BadHashTag(tag.clone()));
}
}
let secure = if let Some(s) = &self.secure_server_option {
SecureServerOption::parse(s)?
} else {
SecureServerOption::None
};
if let Some(s) = &self.read_consistency {
ConsistencyLevel::parse("read_consistency", s)?;
}
if let Some(s) = &self.write_consistency {
ConsistencyLevel::parse("write_consistency", s)?;
}
if secure != SecureServerOption::None {
match &self.pem_key_file {
Some(s) if !s.is_empty() => {}
_ => return Err(ConfError::MissingRequired("pem_key_file")),
}
}
if let Some(s) = &self.log_format {
crate::core::log::LogFormat::parse(s).map_err(|e| ConfError::BadServer {
field: "log_format",
value: s.clone(),
reason: e.to_string(),
})?;
}
self.validate_bucket_types()?;
self.validate_hinted_handoff()?;
self.validate_peer_tls()?;
if let Some(r) = &self.riak {
r.validate()?;
}
match &self.servers {
None => return Err(ConfError::MissingRequired("servers")),
Some(s) if s.is_empty() => return Err(ConfError::MissingRequired("servers")),
Some(s) if s.len() > 1 => {
return Err(ConfError::BadServer {
field: "servers",
value: s.len().to_string(),
reason: "expected exactly one datastore entry".to_string(),
});
}
Some(_) => {}
}
Ok(())
}
fn validate_numeric_ranges(&self) -> Result<(), ConfError> {
check_positive("timeout", self.timeout)?;
check_positive("backlog", self.backlog)?;
check_non_negative("client_connections", self.client_connections)?;
check_positive("server_retry_timeout", self.server_retry_timeout)?;
check_positive("server_failure_limit", self.server_failure_limit)?;
check_positive("dyn_read_timeout", self.dyn_read_timeout)?;
check_positive("dyn_write_timeout", self.dyn_write_timeout)?;
check_positive("gos_interval", self.gos_interval)?;
check_positive("stats_interval", self.stats_interval)?;
if let Some(n) = self.dyn_connections {
if n <= 0 {
return Err(ConfError::OutOfRange {
field: "dyn_connections",
value: n,
reason: "must be a positive non-zero number",
});
}
}
Ok(())
}
fn validate_mbuf_size(&self) -> Result<(), ConfError> {
let Some(n) = self.mbuf_size else {
return Ok(());
};
if n <= 0 {
return Err(ConfError::OutOfRange {
field: "mbuf_size",
value: n,
reason: "must be a positive number",
});
}
if !(defaults::MBUF_MIN_SIZE..=defaults::MBUF_MAX_SIZE).contains(&n) {
return Err(ConfError::OutOfRange {
field: "mbuf_size",
value: n,
reason: "must be between 512 and 512000 bytes",
});
}
if n % 16 != 0 {
return Err(ConfError::OutOfRange {
field: "mbuf_size",
value: n,
reason: "must be a multiple of 16",
});
}
Ok(())
}
fn validate_max_msgs(&self) -> Result<(), ConfError> {
let Some(n) = self.max_msgs else {
return Ok(());
};
if n <= 0 {
return Err(ConfError::OutOfRange {
field: "max_msgs",
value: n,
reason: "requires a non-zero number",
});
}
if !(defaults::ALLOC_MSGS_MIN..=defaults::ALLOC_MSGS_MAX).contains(&n) {
return Err(ConfError::OutOfRange {
field: "max_msgs",
value: n,
reason: "must be between 100000 and 1000000 messages",
});
}
Ok(())
}
fn validate_bucket_types(&self) -> Result<(), ConfError> {
use std::collections::BTreeSet;
let mut seen: BTreeSet<&str> = BTreeSet::new();
for bt in &self.bucket_types {
if bt.name.is_empty() {
return Err(ConfError::BadServer {
field: "bucket_types",
value: String::new(),
reason: "bucket-type name must not be empty".to_string(),
});
}
if !seen.insert(bt.name.as_str()) {
return Err(ConfError::BadServer {
field: "bucket_types",
value: bt.name.clone(),
reason: "duplicate bucket-type name".to_string(),
});
}
ConsistencyLevel::parse("read_consistency", &bt.read_consistency)?;
ConsistencyLevel::parse("write_consistency", &bt.write_consistency)?;
}
if let Some(name) = &self.default_bucket_type {
if !self.bucket_types.iter().any(|bt| &bt.name == name) {
return Err(ConfError::BadServer {
field: "default_bucket_type",
value: name.clone(),
reason: "references an undefined bucket-type name".to_string(),
});
}
}
Ok(())
}
fn validate_noxu(&self) -> Result<(), ConfError> {
if !crate::conf::is_noxu_supported() {
return Err(ConfError::BadNoxuConfig(
"noxu data_store requires dynomited built with --features riak",
));
}
match self.noxu_path.as_deref() {
Some(p) if !p.as_os_str().is_empty() => Ok(()),
_ => Err(ConfError::BadNoxuConfig(
"data_store: noxu requires a non-empty 'noxu_path:' directive",
)),
}
}
fn validate_hinted_handoff(&self) -> Result<(), ConfError> {
if self.enable_hinted_handoff != Some(true) {
return Ok(());
}
if let Some(ttl) = self.hint_ttl_seconds {
if ttl == 0 {
return Err(ConfError::BadServer {
field: "hint_ttl_seconds",
value: ttl.to_string(),
reason: "must be a positive number when enable_hinted_handoff is true"
.to_string(),
});
}
}
if let Some(cap) = self.hint_store_max_bytes {
if cap == 0 {
return Err(ConfError::BadServer {
field: "hint_store_max_bytes",
value: cap.to_string(),
reason: "must be a positive number when enable_hinted_handoff is true"
.to_string(),
});
}
}
if let Some(period) = self.hint_drain_interval_ms {
if period == 0 {
return Err(ConfError::BadServer {
field: "hint_drain_interval_ms",
value: period.to_string(),
reason: "must be a positive number when enable_hinted_handoff is true"
.to_string(),
});
}
}
Ok(())
}
fn validate_peer_tls(&self) -> Result<(), ConfError> {
validate_tls_pair(
"peer_tls_cert",
"peer_tls_key",
self.peer_tls_cert.as_deref(),
self.peer_tls_key.as_deref(),
)?;
if self.peer_tls_ca.is_some() && self.peer_tls_cert.is_none() {
return Err(ConfError::BadServer {
field: "peer_tls_ca",
value: self
.peer_tls_ca
.as_ref()
.map_or_else(String::new, |p| p.display().to_string()),
reason: "requires peer_tls_cert and peer_tls_key to also be set".into(),
});
}
for (dc, profile) in &self.peer_tls_profiles {
if dc.is_empty() {
return Err(ConfError::BadServer {
field: "peer_tls_profiles",
value: String::new(),
reason: "per-DC TLS profile name must not be empty".into(),
});
}
profile.validate(dc)?;
}
Ok(())
}
}
impl fmt::Display for ConfPool {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match serde_yaml::to_string(self) {
Ok(s) => f.write_str(&s),
Err(_) => Err(fmt::Error),
}
}
}
fn check_positive(field: &'static str, v: Option<i64>) -> Result<(), ConfError> {
if let Some(n) = v {
if n <= 0 {
return Err(ConfError::OutOfRange {
field,
value: n,
reason: "must be a positive number",
});
}
}
Ok(())
}
fn check_non_negative(field: &'static str, v: Option<i64>) -> Result<(), ConfError> {
if let Some(n) = v {
if n < 0 {
return Err(ConfError::OutOfRange {
field,
value: n,
reason: "must be a non-negative number",
});
}
}
Ok(())
}
fn deserialize_data_store<'de, D>(de: D) -> Result<Option<i64>, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::{self, Visitor};
use std::fmt;
struct V;
impl<'de> Visitor<'de> for V {
type Value = Option<i64>;
fn expecting(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("a data_store value: integer (0, 1, 2) or string (redis, memcache, noxu)")
}
fn visit_none<E: de::Error>(self) -> Result<Self::Value, E> {
Ok(None)
}
fn visit_unit<E: de::Error>(self) -> Result<Self::Value, E> {
Ok(None)
}
fn visit_some<D2: serde::Deserializer<'de>>(
self,
de: D2,
) -> Result<Self::Value, D2::Error> {
de.deserialize_any(V)
}
fn visit_i64<E: de::Error>(self, v: i64) -> Result<Self::Value, E> {
DataStore::from_int(v)
.map(|d| Some(d.as_int()))
.map_err(|e| E::custom(e.to_string()))
}
fn visit_u64<E: de::Error>(self, v: u64) -> Result<Self::Value, E> {
let n = i64::try_from(v).map_err(|_| E::custom("data_store integer overflow"))?;
self.visit_i64(n)
}
fn visit_str<E: de::Error>(self, v: &str) -> Result<Self::Value, E> {
DataStore::from_name(v)
.map(|d| Some(d.as_int()))
.map_err(|_| {
E::custom(format!(
"data_store: unknown name '{v}'; expected one of: redis, memcache, noxu"
))
})
}
fn visit_string<E: de::Error>(self, v: String) -> Result<Self::Value, E> {
self.visit_str(&v)
}
}
de.deserialize_any(V)
}
#[cfg(test)]
mod tests {
use super::*;
fn pool() -> ConfPool {
ConfPool {
listen: Some(ConfListen::parse("listen", "127.0.0.1:8102").unwrap()),
servers: Some(Servers::from_vec(vec![ConfServer::parse(
"127.0.0.1:6379:1",
)
.unwrap()])),
tokens: Some(TokenList::parse("0").unwrap()),
..ConfPool::default()
}
}
#[test]
fn validate_minimal_post_finalize() {
let mut p = pool();
p.apply_defaults();
p.validate("dyn_o_mite").unwrap();
}
#[test]
fn missing_listen_rejected() {
let mut p = pool();
p.listen = None;
p.apply_defaults();
assert!(matches!(
p.validate("p"),
Err(ConfError::MissingRequired("listen"))
));
}
#[test]
fn out_of_range_mbuf_rejected() {
let mut p = pool();
p.mbuf_size = Some(127);
p.apply_defaults();
assert!(matches!(p.validate("p"), Err(ConfError::OutOfRange { .. })));
}
#[test]
fn distribution_field_round_trips_through_yaml() {
let yaml = r"
p:
listen: 127.0.0.1:8102
dyn_listen: 127.0.0.1:8101
tokens: '0'
servers:
- 127.0.0.1:6379:1
data_store: 0
distribution: random_slicing
distribution_shadow: vnode
hash: murmur3_x64_64
";
let parsed: std::collections::BTreeMap<String, ConfPool> =
serde_yaml::from_str(yaml).unwrap();
let pool = parsed.get("p").unwrap();
assert_eq!(pool.distribution, Some(Distribution::RandomSlicing));
assert_eq!(pool.distribution_shadow, Some(Distribution::Vnode));
assert_eq!(pool.hash, Some(HashType::Murmur3X64_64));
assert_eq!(pool.resolved_distribution(), Distribution::RandomSlicing);
}
#[test]
fn distribution_legacy_alias_resolves_to_vnode() {
let mut p = pool();
p.distribution = Some(Distribution::Ketama);
assert_eq!(p.resolved_distribution(), Distribution::Vnode);
p.distribution = Some(Distribution::Modula);
assert_eq!(p.resolved_distribution(), Distribution::Vnode);
p.distribution = Some(Distribution::Random);
assert_eq!(p.resolved_distribution(), Distribution::Vnode);
}
#[test]
fn distribution_default_unset_is_vnode() {
let p = pool();
assert!(p.distribution.is_none());
assert_eq!(p.resolved_distribution(), Distribution::Vnode);
}
#[test]
fn mbuf_size_not_multiple_of_16_rejected() {
let mut p = pool();
p.mbuf_size = Some(513);
p.apply_defaults();
assert!(matches!(p.validate("p"), Err(ConfError::OutOfRange { .. })));
}
#[test]
fn pem_required_when_secure() {
let mut p = pool();
p.secure_server_option = Some("datacenter".to_string());
p.pem_key_file = Some(String::new());
p.apply_defaults();
assert!(matches!(
p.validate("p"),
Err(ConfError::MissingRequired("pem_key_file"))
));
}
#[test]
fn data_store_out_of_range_rejected() {
let mut p = pool();
p.data_store = Some(7);
p.apply_defaults();
assert!(matches!(p.validate("p"), Err(ConfError::BadDataStore(7))));
}
static NOXU_FLAG_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
#[test]
fn data_store_noxu_requires_riak_feature() {
let _g = NOXU_FLAG_LOCK
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let prev = crate::conf::is_noxu_supported();
crate::conf::set_noxu_supported(false);
let mut p = pool();
p.data_store = Some(2);
p.noxu_path = Some("/tmp/test".into());
p.apply_defaults();
let err = p.validate("p");
crate::conf::set_noxu_supported(prev);
match err {
Err(ConfError::BadNoxuConfig(msg)) => {
assert!(msg.contains("--features riak"), "unexpected message: {msg}");
}
other => panic!("expected BadNoxuConfig, got {other:?}"),
}
}
#[test]
fn data_store_noxu_requires_path() {
let _g = NOXU_FLAG_LOCK
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let prev = crate::conf::is_noxu_supported();
crate::conf::set_noxu_supported(true);
let mut p = pool();
p.data_store = Some(2);
p.noxu_path = None;
p.apply_defaults();
let err = p.validate("p");
crate::conf::set_noxu_supported(prev);
match err {
Err(ConfError::BadNoxuConfig(msg)) => {
assert!(msg.contains("noxu_path"), "unexpected message: {msg}");
}
other => panic!("expected BadNoxuConfig, got {other:?}"),
}
}
#[test]
fn data_store_noxu_yaml_round_trip_string_form() {
let yaml = r"
listen: 127.0.0.1:8102
servers:
- 127.0.0.1:6379:1
tokens: '0'
data_store: noxu
noxu_path: /tmp/test
";
let p: ConfPool = serde_yaml::from_str(yaml).unwrap();
assert_eq!(p.data_store, Some(2));
assert_eq!(
p.noxu_path.as_deref(),
Some(std::path::Path::new("/tmp/test"))
);
let dumped = serde_yaml::to_string(&p).unwrap();
let p2: ConfPool = serde_yaml::from_str(&dumped).unwrap();
assert_eq!(p2.data_store, p.data_store);
assert_eq!(p2.noxu_path, p.noxu_path);
}
#[test]
fn data_store_yaml_int_form_still_works() {
let yaml = r"
listen: 127.0.0.1:8102
servers:
- 127.0.0.1:6379:1
tokens: '0'
data_store: 2
noxu_path: /tmp/test
";
let p: ConfPool = serde_yaml::from_str(yaml).unwrap();
assert_eq!(p.data_store, Some(2));
}
#[test]
fn data_store_string_form_unknown_rejected() {
let yaml = r"
listen: 127.0.0.1:8102
servers:
- 127.0.0.1:6379:1
tokens: '0'
data_store: postgres
";
let err = serde_yaml::from_str::<ConfPool>(yaml).unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("unknown name") || msg.contains("data_store"),
"unexpected message: {msg}"
);
}
#[test]
fn hash_tag_must_be_two_chars() {
let mut p = pool();
p.hash_tag = Some("abc".to_string());
p.apply_defaults();
assert!(matches!(p.validate("p"), Err(ConfError::BadHashTag(_))));
}
#[test]
fn empty_servers_rejected() {
let mut p = pool();
p.servers = Some(Servers::from_vec(vec![]));
p.apply_defaults();
assert!(matches!(
p.validate("p"),
Err(ConfError::MissingRequired("servers"))
));
}
#[test]
fn log_format_known_values_accepted() {
for value in ["default", "rfc5424", "rfc3164", "json", "ndjson", "DEFAULT"] {
let mut p = pool();
p.log_format = Some(value.to_string());
p.apply_defaults();
assert!(p.validate("p").is_ok(), "value {value:?} should validate");
}
}
#[test]
fn log_format_unknown_rejected() {
let mut p = pool();
p.log_format = Some("yaml".to_string());
p.apply_defaults();
let err = p.validate("p").unwrap_err();
assert!(
matches!(
err,
ConfError::BadServer {
field: "log_format",
..
}
),
"unexpected error: {err:?}"
);
}
#[test]
fn observability_block_round_trips() {
let yaml = r"
observability:
otlp_logs_endpoint: http://collector:4317
service_name: dynomited
listen: 127.0.0.1:8102
servers:
- 127.0.0.1:6379:1
tokens: '0'
";
let p: ConfPool = serde_yaml::from_str(yaml).unwrap();
let obs = p.observability.as_ref().expect("observability set");
assert_eq!(
obs.otlp_logs_endpoint.as_deref(),
Some("http://collector:4317")
);
assert_eq!(obs.service_name.as_deref(), Some("dynomited"));
}
#[test]
fn bucket_types_round_trip() {
let yaml = r"
listen: 127.0.0.1:8102
servers:
- 127.0.0.1:6379:1
tokens: '0'
bucket_types:
- name: hot
read_consistency: DC_QUORUM
write_consistency: DC_EACH_SAFE_QUORUM
n_val: 3
- name: cold
read_consistency: DC_ONE
write_consistency: DC_ONE
n_val: 1
default_bucket_type: cold
";
let p: ConfPool = serde_yaml::from_str(yaml).unwrap();
assert_eq!(p.bucket_types.len(), 2);
assert_eq!(p.bucket_types[0].name, "hot");
assert_eq!(p.bucket_types[0].n_val, 3);
assert_eq!(
p.bucket_types[0].read_level().unwrap(),
crate::conf::ConsistencyLevel::DcQuorum,
);
assert_eq!(p.default_bucket_type.as_deref(), Some("cold"));
let dumped = serde_yaml::to_string(&p).unwrap();
let p2: ConfPool = serde_yaml::from_str(&dumped).unwrap();
assert_eq!(p2.bucket_types, p.bucket_types);
assert_eq!(p2.default_bucket_type, p.default_bucket_type);
}
#[test]
fn bucket_types_default_is_empty() {
let mut p = pool();
p.apply_defaults();
assert!(p.bucket_types.is_empty());
assert!(p.default_bucket_type.is_none());
assert!(p.validate("p").is_ok());
}
#[test]
fn duplicate_bucket_type_name_rejected() {
let mut p = pool();
p.bucket_types = vec![
ConfBucketType {
name: "a".into(),
read_consistency: "DC_ONE".into(),
write_consistency: "DC_ONE".into(),
n_val: 0,
},
ConfBucketType {
name: "a".into(),
read_consistency: "DC_ONE".into(),
write_consistency: "DC_ONE".into(),
n_val: 0,
},
];
p.apply_defaults();
let err = p.validate("p").unwrap_err();
assert!(
matches!(
err,
ConfError::BadServer {
field: "bucket_types",
..
}
),
"unexpected error: {err:?}",
);
}
#[test]
fn bucket_type_unknown_consistency_rejected() {
let mut p = pool();
p.bucket_types = vec![ConfBucketType {
name: "a".into(),
read_consistency: "DC_PURPLE".into(),
write_consistency: "DC_ONE".into(),
n_val: 0,
}];
p.apply_defaults();
let err = p.validate("p").unwrap_err();
assert!(matches!(err, ConfError::BadConsistency { .. }));
}
#[test]
fn unknown_default_bucket_type_rejected() {
let mut p = pool();
p.default_bucket_type = Some("missing".into());
p.apply_defaults();
let err = p.validate("p").unwrap_err();
assert!(matches!(
err,
ConfError::BadServer {
field: "default_bucket_type",
..
}
));
}
#[test]
fn hinted_handoff_default_off_with_canonical_constants() {
let mut p = pool();
p.apply_defaults();
assert_eq!(p.enable_hinted_handoff, Some(false));
assert_eq!(p.hint_ttl_seconds, Some(defaults::HINT_TTL_SECONDS));
assert_eq!(p.hint_store_max_bytes, Some(defaults::HINT_STORE_MAX_BYTES));
assert_eq!(
p.hint_drain_interval_ms,
Some(defaults::HINT_DRAIN_INTERVAL_MS)
);
assert!(p.validate("p").is_ok());
}
#[test]
fn hinted_handoff_yaml_round_trip() {
let yaml = r"
listen: 127.0.0.1:8102
servers:
- 127.0.0.1:6379:1
tokens: '0'
enable_hinted_handoff: true
hint_ttl_seconds: 7200
hint_store_max_bytes: 8388608
hint_drain_interval_ms: 5000
";
let p: ConfPool = serde_yaml::from_str(yaml).unwrap();
assert_eq!(p.enable_hinted_handoff, Some(true));
assert_eq!(p.hint_ttl_seconds, Some(7200));
assert_eq!(p.hint_store_max_bytes, Some(8_388_608));
assert_eq!(p.hint_drain_interval_ms, Some(5_000));
let dumped = serde_yaml::to_string(&p).unwrap();
let p2: ConfPool = serde_yaml::from_str(&dumped).unwrap();
assert_eq!(p2.enable_hinted_handoff, p.enable_hinted_handoff);
assert_eq!(p2.hint_ttl_seconds, p.hint_ttl_seconds);
assert_eq!(p2.hint_store_max_bytes, p.hint_store_max_bytes);
assert_eq!(p2.hint_drain_interval_ms, p.hint_drain_interval_ms);
}
#[test]
fn hinted_handoff_zero_ttl_rejected_when_enabled() {
let mut p = pool();
p.enable_hinted_handoff = Some(true);
p.hint_ttl_seconds = Some(0);
p.apply_defaults();
let err = p.validate("p").unwrap_err();
assert!(matches!(
err,
ConfError::BadServer {
field: "hint_ttl_seconds",
..
}
));
}
#[test]
fn hinted_handoff_zero_max_bytes_rejected_when_enabled() {
let mut p = pool();
p.enable_hinted_handoff = Some(true);
p.hint_store_max_bytes = Some(0);
p.apply_defaults();
let err = p.validate("p").unwrap_err();
assert!(matches!(
err,
ConfError::BadServer {
field: "hint_store_max_bytes",
..
}
));
}
#[test]
fn hinted_handoff_zero_values_ignored_when_disabled() {
let mut p = pool();
p.enable_hinted_handoff = Some(false);
p.hint_ttl_seconds = Some(0);
p.hint_store_max_bytes = Some(0);
p.hint_drain_interval_ms = Some(0);
p.apply_defaults();
assert!(p.validate("p").is_ok());
}
#[test]
fn riak_block_validates_when_unset() {
let mut p = pool();
p.riak = Some(ConfRiak::default());
p.apply_defaults();
assert!(p.validate("p").is_ok());
}
#[test]
fn riak_block_validates_with_addresses() {
let mut p = pool();
p.riak = Some(ConfRiak {
pbc_listen: Some("127.0.0.1:8087".into()),
http_listen: Some("127.0.0.1:8098".into()),
..ConfRiak::default()
});
p.apply_defaults();
assert!(p.validate("p").is_ok());
}
#[test]
fn riak_block_rejects_bad_pbc_addr() {
let mut p = pool();
p.riak = Some(ConfRiak {
pbc_listen: Some(String::new()),
..ConfRiak::default()
});
p.apply_defaults();
assert!(matches!(p.validate("p"), Err(ConfError::BadServer { .. })));
}
#[test]
fn riak_block_rejects_segment_above_full_sweep() {
let mut p = pool();
p.riak = Some(ConfRiak {
aae_segment_interval_seconds: Some(120),
aae_full_sweep_interval_seconds: Some(60),
..ConfRiak::default()
});
p.apply_defaults();
assert!(matches!(p.validate("p"), Err(ConfError::BadServer { .. })));
}
#[test]
fn riak_block_round_trips_through_yaml() {
let yaml = r"
p:
listen: 127.0.0.1:1
dyn_listen: 127.0.0.1:2
tokens: '0'
servers:
- 127.0.0.1:3:1
data_store: 0
riak:
pbc_listen: 127.0.0.1:8087
http_listen: 127.0.0.1:8098
aae_enabled: true
aae_full_sweep_interval_seconds: 3600
aae_segment_interval_seconds: 30
";
let cfg: std::collections::BTreeMap<String, ConfPool> = serde_yaml::from_str(yaml).unwrap();
let p = cfg.get("p").unwrap();
let r = p.riak.as_ref().unwrap();
assert_eq!(r.pbc_listen.as_deref(), Some("127.0.0.1:8087"));
assert_eq!(r.http_listen.as_deref(), Some("127.0.0.1:8098"));
assert_eq!(r.aae_enabled, Some(true));
assert_eq!(r.aae_full_sweep_interval_seconds, Some(3600));
assert_eq!(r.aae_segment_interval_seconds, Some(30));
}
#[test]
fn peer_tls_pair_unset_is_ok() {
let mut p = pool();
p.apply_defaults();
assert!(p.validate("p").is_ok(), "plaintext default must validate");
}
#[test]
fn peer_tls_pair_both_set_is_ok() {
let mut p = pool();
p.peer_tls_cert = Some(std::path::PathBuf::from("/etc/dynomite/peer.crt"));
p.peer_tls_key = Some(std::path::PathBuf::from("/etc/dynomite/peer.key"));
p.apply_defaults();
assert!(p.validate("p").is_ok());
}
#[test]
fn peer_tls_cert_without_key_rejected() {
let mut p = pool();
p.peer_tls_cert = Some(std::path::PathBuf::from("/x.crt"));
p.apply_defaults();
let err = p.validate("p").unwrap_err();
assert!(
matches!(
err,
ConfError::BadServer {
field: "peer_tls_cert",
..
}
),
"got {err:?}"
);
}
#[test]
fn peer_tls_key_without_cert_rejected() {
let mut p = pool();
p.peer_tls_key = Some(std::path::PathBuf::from("/x.key"));
p.apply_defaults();
let err = p.validate("p").unwrap_err();
assert!(
matches!(
err,
ConfError::BadServer {
field: "peer_tls_key",
..
}
),
"got {err:?}"
);
}
#[test]
fn peer_tls_ca_without_cert_rejected() {
let mut p = pool();
p.peer_tls_ca = Some(std::path::PathBuf::from("/x.ca"));
p.apply_defaults();
let err = p.validate("p").unwrap_err();
assert!(
matches!(
err,
ConfError::BadServer {
field: "peer_tls_ca",
..
}
),
"got {err:?}"
);
}
#[test]
fn riak_tls_cert_without_key_rejected() {
let mut p = pool();
p.riak = Some(ConfRiak {
pbc_listen: Some("127.0.0.1:8087".into()),
tls_cert: Some(std::path::PathBuf::from("/x.crt")),
..ConfRiak::default()
});
p.apply_defaults();
let err = p.validate("p").unwrap_err();
assert!(
matches!(
err,
ConfError::BadServer {
field: "tls_cert",
..
}
),
"got {err:?}"
);
}
#[test]
fn riak_tls_pair_both_set_is_ok() {
let mut p = pool();
p.riak = Some(ConfRiak {
pbc_listen: Some("127.0.0.1:8087".into()),
tls_cert: Some(std::path::PathBuf::from("/x.crt")),
tls_key: Some(std::path::PathBuf::from("/x.key")),
..ConfRiak::default()
});
p.apply_defaults();
assert!(p.validate("p").is_ok());
}
#[test]
fn riak_wasm_modules_yaml_round_trip() {
let dir = tempfile::tempdir().unwrap();
let m1 = dir.path().join("identity.wasm");
let m2 = dir.path().join("sum.wasm");
std::fs::write(&m1, b"\0asm\x01\0\0\0").unwrap();
std::fs::write(&m2, b"\0asm\x01\0\0\0").unwrap();
let yaml = format!(
r"
listen: 127.0.0.1:8102
servers:
- 127.0.0.1:6379:1
tokens: '0'
riak:
pbc_listen: 127.0.0.1:8087
wasm_modules:
- id: identity
path: {m1}
- id: sum
path: {m2}
",
m1 = m1.display(),
m2 = m2.display(),
);
let p: ConfPool = serde_yaml::from_str(&yaml).unwrap();
let r = p.riak.as_ref().unwrap();
let mods = r.wasm_modules.as_ref().unwrap();
assert_eq!(mods.len(), 2);
assert_eq!(mods[0].id, "identity");
assert_eq!(mods[0].path, m1);
assert_eq!(mods[1].id, "sum");
assert_eq!(mods[1].path, m2);
let dumped = serde_yaml::to_string(&p).unwrap();
let p2: ConfPool = serde_yaml::from_str(&dumped).unwrap();
assert_eq!(p2.riak.unwrap().wasm_modules, r.wasm_modules);
}
#[test]
fn riak_wasm_modules_unique_ids_required() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("m.wasm");
std::fs::write(&path, b"\0").unwrap();
let r = ConfRiak {
wasm_modules: Some(vec![
ConfRiakWasmModule {
id: "m".into(),
path: path.clone(),
},
ConfRiakWasmModule {
id: "m".into(),
path: path.clone(),
},
]),
..ConfRiak::default()
};
let err = r.validate().unwrap_err();
assert!(matches!(
err,
ConfError::BadServer {
field: "wasm_modules.id",
..
}
));
}
#[test]
fn riak_wasm_modules_path_must_exist() {
let r = ConfRiak {
wasm_modules: Some(vec![ConfRiakWasmModule {
id: "missing".into(),
path: std::path::PathBuf::from("/no/such/path/at/all.wasm"),
}]),
..ConfRiak::default()
};
let err = r.validate().unwrap_err();
assert!(matches!(
err,
ConfError::BadServer {
field: "wasm_modules.path",
..
}
));
}
#[test]
fn riak_wasm_modules_empty_id_rejected() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("m.wasm");
std::fs::write(&path, b"\0").unwrap();
let r = ConfRiak {
wasm_modules: Some(vec![ConfRiakWasmModule {
id: String::new(),
path,
}]),
..ConfRiak::default()
};
let err = r.validate().unwrap_err();
assert!(matches!(
err,
ConfError::BadServer {
field: "wasm_modules.id",
..
}
));
}
#[test]
fn peer_tls_profile_pair_unset_is_ok() {
let p = ConfTlsProfile::default();
assert!(p.validate("dc1").is_ok());
}
#[test]
fn peer_tls_profile_cert_without_key_rejected() {
let p = ConfTlsProfile {
cert: Some(std::path::PathBuf::from("/x.crt")),
..ConfTlsProfile::default()
};
let err = p.validate("dc1").unwrap_err();
assert!(matches!(
err,
ConfError::BadServer {
field: "peer_tls_profiles.cert",
..
}
));
}
#[test]
fn peer_tls_profile_key_without_cert_rejected() {
let p = ConfTlsProfile {
key: Some(std::path::PathBuf::from("/x.key")),
..ConfTlsProfile::default()
};
let err = p.validate("dc1").unwrap_err();
assert!(matches!(
err,
ConfError::BadServer {
field: "peer_tls_profiles.key",
..
}
));
}
#[test]
fn peer_tls_profile_ca_without_cert_rejected() {
let p = ConfTlsProfile {
ca: Some(std::path::PathBuf::from("/x.ca")),
..ConfTlsProfile::default()
};
let err = p.validate("dc1").unwrap_err();
assert!(matches!(
err,
ConfError::BadServer {
field: "peer_tls_profiles.ca",
..
}
));
}
#[test]
fn peer_tls_profiles_empty_dc_name_rejected() {
let mut p = pool();
p.peer_tls_profiles.insert(
String::new(),
ConfTlsProfile {
cert: Some(std::path::PathBuf::from("/x.crt")),
key: Some(std::path::PathBuf::from("/x.key")),
ca: None,
},
);
p.apply_defaults();
let err = p.validate("p").unwrap_err();
assert!(matches!(
err,
ConfError::BadServer {
field: "peer_tls_profiles",
..
}
));
}
#[test]
fn peer_tls_profiles_per_dc_pair_validates() {
let mut p = pool();
p.peer_tls_profiles.insert(
"dc1".into(),
ConfTlsProfile {
cert: Some(std::path::PathBuf::from("/dc1.crt")),
key: Some(std::path::PathBuf::from("/dc1.key")),
ca: None,
},
);
p.apply_defaults();
assert!(p.validate("p").is_ok());
}
#[test]
fn peer_tls_profiles_per_dc_cert_without_key_rejected() {
let mut p = pool();
p.peer_tls_profiles.insert(
"dc1".into(),
ConfTlsProfile {
cert: Some(std::path::PathBuf::from("/dc1.crt")),
key: None,
ca: None,
},
);
p.apply_defaults();
let err = p.validate("p").unwrap_err();
assert!(matches!(
err,
ConfError::BadServer {
field: "peer_tls_profiles.cert",
..
}
));
}
#[test]
fn peer_tls_profiles_yaml_round_trip() {
let yaml = r"
listen: 127.0.0.1:8102
servers:
- 127.0.0.1:6379:1
tokens: '0'
peer_tls_profiles:
dc1:
cert: /etc/dynomite/dc1.pem
key: /etc/dynomite/dc1.key
ca: /etc/dynomite/dc1-ca.pem
dc2:
cert: /etc/dynomite/dc2.pem
key: /etc/dynomite/dc2.key
";
let p: ConfPool = serde_yaml::from_str(yaml).unwrap();
assert_eq!(p.peer_tls_profiles.len(), 2);
assert_eq!(
p.peer_tls_profiles["dc1"].cert.as_deref(),
Some(std::path::Path::new("/etc/dynomite/dc1.pem"))
);
assert!(p.peer_tls_profiles["dc2"].ca.is_none());
let dumped = serde_yaml::to_string(&p).unwrap();
let p2: ConfPool = serde_yaml::from_str(&dumped).unwrap();
assert_eq!(p2.peer_tls_profiles, p.peer_tls_profiles);
}
}