1use std::collections::BTreeMap;
10use std::fmt;
11use std::path::PathBuf;
12
13use serde::{Deserialize, Serialize};
14
15use super::endpoint::ConfListen;
16use super::enums::{
17 ConsistencyLevel, DataStore, Distribution, HashType, SecureServerOption, Transport,
18};
19use super::error::ConfError;
20use super::server::{ConfDynSeed, ConfServer};
21use super::tokens::TokenList;
22
23pub mod defaults {
26 pub const TIMEOUT_MS: i64 = 5_000;
28 pub const LISTEN_BACKLOG: i64 = 512;
30 pub const CLIENT_CONNECTIONS: i64 = 0;
32 pub const DATA_STORE: i64 = 0;
34 pub const PRECONNECT: bool = false;
41 pub const AUTO_EJECT_HOSTS: bool = true;
43 pub const SERVER_RETRY_TIMEOUT_MS: i64 = 10 * 1000;
45 pub const SERVER_FAILURE_LIMIT: i64 = 3;
47 pub const DYN_READ_TIMEOUT_MS: i64 = 10_000;
49 pub const DYN_WRITE_TIMEOUT_MS: i64 = 10_000;
51 pub const DYN_CONNECTIONS: i64 = 100;
53 pub const GOS_INTERVAL_MS: i64 = 30_000;
55 pub const ENABLE_HINTED_HANDOFF: bool = false;
58 pub const HINT_TTL_SECONDS: u64 = 86_400;
61 pub const HINT_STORE_MAX_BYTES: u64 = 64 * 1024 * 1024;
64 pub const HINT_DRAIN_INTERVAL_MS: u64 = 30_000;
67 pub const CONN_MSG_RATE: u32 = 50_000;
69 pub const STATS_INTERVAL_MS: i64 = 30 * 1000;
71 pub const STATS_PNAME: &str = "0.0.0.0:22222";
73 pub const DATASTORE_CONNECTIONS: u8 = 1;
75 pub const LOCAL_PEER_CONNECTIONS: u8 = 1;
77 pub const REMOTE_PEER_CONNECTIONS: u8 = 1;
79 pub const RACK: &str = "localrack";
81 pub const DC: &str = "localdc";
83 pub const SECURE_SERVER_OPTION: &str = "none";
85 pub const CONSISTENCY: &str = "DC_ONE";
87 pub const SEED_PROVIDER: &str = "simple_provider";
89 pub const ENV: &str = "aws";
91 pub const PEM_KEY_FILE: &str = "conf/dynomite.pem";
93 pub const RECON_KEY_FILE: &str = "conf/recon_key.pem";
95 pub const RECON_IV_FILE: &str = "conf/recon_iv.pem";
97 pub const RECON_INTERVAL_SECONDS: u64 = 300;
101 pub const MBUF_MIN_SIZE: i64 = 512;
103 pub const MBUF_MAX_SIZE: i64 = 512_000;
105 pub const ALLOC_MSGS_MIN: i64 = 100_000;
107 pub const ALLOC_MSGS_MAX: i64 = 1_000_000;
109}
110
111#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize)]
124#[serde(transparent)]
125pub struct Servers(pub(crate) Vec<ConfServer>);
126
127impl Servers {
128 pub fn from_vec(v: Vec<ConfServer>) -> Self {
139 Self(v)
140 }
141}
142
143impl Servers {
144 pub fn entries(&self) -> &[ConfServer] {
154 &self.0
155 }
156 pub fn len(&self) -> usize {
165 self.0.len()
166 }
167 pub fn is_empty(&self) -> bool {
176 self.0.is_empty()
177 }
178 pub fn datastore(&self) -> Option<&ConfServer> {
189 self.0.first()
190 }
191}
192
193#[derive(Debug, Clone, Default, Serialize, Deserialize)]
206#[serde(deny_unknown_fields, default)]
207pub struct ConfPool {
208 pub listen: Option<ConfListen>,
210 pub dyn_listen: Option<ConfListen>,
212 pub stats_listen: Option<ConfListen>,
214
215 pub hash: Option<HashType>,
217 pub hash_tag: Option<String>,
219
220 #[serde(default)]
226 pub distribution: Option<Distribution>,
227 #[serde(default)]
234 pub distribution_shadow: Option<Distribution>,
235 #[serde(default)]
237 pub server_connections: Option<i64>,
238
239 pub timeout: Option<i64>,
241 pub backlog: Option<i64>,
243 pub client_connections: Option<i64>,
245 #[serde(default, deserialize_with = "deserialize_data_store")]
251 pub data_store: Option<i64>,
252 #[serde(default)]
258 pub noxu_path: Option<PathBuf>,
259 pub preconnect: Option<bool>,
261 #[serde(default)]
268 pub redis_requirepass: Option<String>,
269 pub auto_eject_hosts: Option<bool>,
271 pub server_retry_timeout: Option<i64>,
273 pub server_failure_limit: Option<i64>,
275
276 pub servers: Option<Servers>,
278
279 pub dyn_read_timeout: Option<i64>,
281 pub dyn_write_timeout: Option<i64>,
283 pub dyn_seed_provider: Option<String>,
285 pub dyn_seeds: Option<Vec<ConfDynSeed>>,
287 pub dyn_port: Option<i64>,
289 pub dyn_connections: Option<i64>,
291 pub rack: Option<String>,
293 pub tokens: Option<TokenList>,
295 pub gos_interval: Option<i64>,
297 pub secure_server_option: Option<String>,
299 pub pem_key_file: Option<String>,
301 pub recon_key_file: Option<String>,
303 pub recon_iv_file: Option<String>,
305 #[serde(default)]
314 pub recon_interval_seconds: Option<u64>,
315 pub datacenter: Option<String>,
317 pub env: Option<String>,
319 pub conn_msg_rate: Option<u32>,
321 pub read_consistency: Option<String>,
323 pub write_consistency: Option<String>,
325 pub stats_interval: Option<i64>,
327 pub enable_gossip: Option<bool>,
329 #[serde(default)]
336 pub peer_tls_cert: Option<PathBuf>,
337 #[serde(default)]
340 pub peer_tls_key: Option<PathBuf>,
341 #[serde(default)]
349 pub peer_tls_ca: Option<PathBuf>,
350 #[serde(default)]
370 pub peer_tls_profiles: BTreeMap<String, ConfTlsProfile>,
371 pub mbuf_size: Option<i64>,
373 pub max_msgs: Option<i64>,
375 pub datastore_connections: Option<u8>,
377 pub local_peer_connections: Option<u8>,
379 pub remote_peer_connections: Option<u8>,
381 pub read_repairs_enabled: Option<bool>,
383 #[serde(default)]
395 pub enable_hinted_handoff: Option<bool>,
396 #[serde(default)]
401 pub hint_ttl_seconds: Option<u64>,
402 #[serde(default)]
410 pub hint_store_max_bytes: Option<u64>,
411 #[serde(default)]
415 pub hint_drain_interval_ms: Option<u64>,
416 pub log_format: Option<String>,
424
425 #[serde(default)]
431 pub observability: Option<ObservabilityConfig>,
432
433 #[serde(default)]
447 pub bucket_types: Vec<ConfBucketType>,
448
449 #[serde(default)]
455 pub default_bucket_type: Option<String>,
456
457 #[serde(default)]
470 pub riak: Option<ConfRiak>,
471
472 #[serde(default)]
485 pub transport: Option<Transport>,
486
487 #[serde(default)]
492 pub quic_cert_file: Option<PathBuf>,
493
494 #[serde(default)]
499 pub quic_key_file: Option<PathBuf>,
500}
501
502#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
528#[serde(deny_unknown_fields, default)]
529pub struct ConfRiak {
530 pub pbc_listen: Option<String>,
534 pub http_listen: Option<String>,
538 pub aae_enabled: Option<bool>,
541 pub aae_full_sweep_interval_seconds: Option<u64>,
545 pub aae_segment_interval_seconds: Option<u64>,
549 #[serde(default)]
556 pub tls_cert: Option<PathBuf>,
557 #[serde(default)]
560 pub tls_key: Option<PathBuf>,
561 #[serde(default)]
567 pub tls_ca: Option<PathBuf>,
568 #[serde(default)]
584 pub wasm_modules: Option<Vec<ConfRiakWasmModule>>,
585}
586
587#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
602#[serde(deny_unknown_fields)]
603pub struct ConfRiakWasmModule {
604 pub id: String,
607 pub path: PathBuf,
610}
611
612#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
636#[serde(deny_unknown_fields, default)]
637pub struct ConfTlsProfile {
638 pub cert: Option<PathBuf>,
640 pub key: Option<PathBuf>,
642 pub ca: Option<PathBuf>,
650}
651
652impl ConfTlsProfile {
653 pub fn validate(&self, dc: &str) -> Result<(), ConfError> {
679 match (self.cert.as_deref(), self.key.as_deref()) {
680 (Some(_), Some(_)) | (None, None) => {}
681 (Some(c), None) => {
682 return Err(ConfError::BadServer {
683 field: "peer_tls_profiles.cert",
684 value: c.display().to_string(),
685 reason: format!(
686 "peer_tls_profiles[{dc}].cert is set but .key is not; both must be set together"
687 ),
688 });
689 }
690 (None, Some(k)) => {
691 return Err(ConfError::BadServer {
692 field: "peer_tls_profiles.key",
693 value: k.display().to_string(),
694 reason: format!(
695 "peer_tls_profiles[{dc}].key is set but .cert is not; both must be set together"
696 ),
697 });
698 }
699 }
700 if self.ca.is_some() && self.cert.is_none() {
701 return Err(ConfError::BadServer {
702 field: "peer_tls_profiles.ca",
703 value: self
704 .ca
705 .as_ref()
706 .map_or_else(String::new, |p| p.display().to_string()),
707 reason: format!(
708 "peer_tls_profiles[{dc}].ca requires .cert and .key to also be set"
709 ),
710 });
711 }
712 Ok(())
713 }
714}
715
716impl ConfRiak {
717 pub fn validate(&self) -> Result<(), ConfError> {
736 if let Some(addr) = self.pbc_listen.as_deref() {
737 validate_riak_addr("pbc_listen", addr)?;
738 }
739 if let Some(addr) = self.http_listen.as_deref() {
740 validate_riak_addr("http_listen", addr)?;
741 }
742 if let Some(n) = self.aae_full_sweep_interval_seconds {
743 if n == 0 {
744 return Err(ConfError::BadServer {
745 field: "aae_full_sweep_interval_seconds",
746 value: n.to_string(),
747 reason: "must be > 0".into(),
748 });
749 }
750 }
751 if let Some(n) = self.aae_segment_interval_seconds {
752 if n == 0 {
753 return Err(ConfError::BadServer {
754 field: "aae_segment_interval_seconds",
755 value: n.to_string(),
756 reason: "must be > 0".into(),
757 });
758 }
759 }
760 if let (Some(seg), Some(full)) = (
761 self.aae_segment_interval_seconds,
762 self.aae_full_sweep_interval_seconds,
763 ) {
764 if seg > full {
765 return Err(ConfError::BadServer {
766 field: "aae_segment_interval_seconds",
767 value: seg.to_string(),
768 reason: format!("must be <= aae_full_sweep_interval_seconds ({full})"),
769 });
770 }
771 }
772 validate_tls_pair(
773 "tls_cert",
774 "tls_key",
775 self.tls_cert.as_deref(),
776 self.tls_key.as_deref(),
777 )?;
778 if self.tls_ca.is_some() && self.tls_cert.is_none() {
779 return Err(ConfError::BadServer {
780 field: "tls_ca",
781 value: self
782 .tls_ca
783 .as_ref()
784 .map_or_else(String::new, |p| p.display().to_string()),
785 reason: "requires tls_cert and tls_key to also be set".into(),
786 });
787 }
788 if let Some(modules) = self.wasm_modules.as_deref() {
789 let mut seen: std::collections::BTreeSet<&str> = std::collections::BTreeSet::new();
790 for m in modules {
791 if m.id.is_empty() {
792 return Err(ConfError::BadServer {
793 field: "wasm_modules.id",
794 value: String::new(),
795 reason: "wasm module id must not be empty".into(),
796 });
797 }
798 if !seen.insert(m.id.as_str()) {
799 return Err(ConfError::BadServer {
800 field: "wasm_modules.id",
801 value: m.id.clone(),
802 reason: "wasm module ids must be unique".into(),
803 });
804 }
805 if !m.path.is_file() {
806 return Err(ConfError::BadServer {
807 field: "wasm_modules.path",
808 value: m.path.display().to_string(),
809 reason: format!("wasm module file not found for id '{}'", m.id),
810 });
811 }
812 }
813 }
814 Ok(())
815 }
816}
817
818fn validate_tls_pair(
822 cert_field: &'static str,
823 key_field: &'static str,
824 cert: Option<&std::path::Path>,
825 key: Option<&std::path::Path>,
826) -> Result<(), ConfError> {
827 match (cert, key) {
828 (Some(_), Some(_)) | (None, None) => Ok(()),
829 (Some(c), None) => Err(ConfError::BadServer {
830 field: cert_field,
831 value: c.display().to_string(),
832 reason: format!(
833 "{cert_field} is set but {key_field} is not; both must be set together"
834 ),
835 }),
836 (None, Some(k)) => Err(ConfError::BadServer {
837 field: key_field,
838 value: k.display().to_string(),
839 reason: format!(
840 "{key_field} is set but {cert_field} is not; both must be set together"
841 ),
842 }),
843 }
844}
845
846fn validate_riak_addr(field: &'static str, value: &str) -> Result<(), ConfError> {
847 use std::net::ToSocketAddrs;
848 if value.is_empty() {
849 return Err(ConfError::BadServer {
850 field,
851 value: value.to_string(),
852 reason: "riak listen address must not be empty".into(),
853 });
854 }
855 if value.parse::<std::net::SocketAddr>().is_ok() {
859 return Ok(());
860 }
861 match value.to_socket_addrs() {
862 Ok(mut iter) => {
863 if iter.next().is_some() {
864 Ok(())
865 } else {
866 Err(ConfError::BadServer {
867 field,
868 value: value.to_string(),
869 reason: "resolved to no addresses".into(),
870 })
871 }
872 }
873 Err(e) => Err(ConfError::BadServer {
874 field,
875 value: value.to_string(),
876 reason: format!("could not resolve: {e}"),
877 }),
878 }
879}
880
881#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
913#[serde(deny_unknown_fields)]
914pub struct ConfBucketType {
915 pub name: String,
920 pub read_consistency: String,
926 pub write_consistency: String,
929 #[serde(default)]
934 pub n_val: u8,
935}
936
937impl ConfBucketType {
938 pub fn read_level(&self) -> Result<ConsistencyLevel, ConfError> {
953 ConsistencyLevel::parse("read_consistency", &self.read_consistency)
954 }
955
956 pub fn write_level(&self) -> Result<ConsistencyLevel, ConfError> {
971 ConsistencyLevel::parse("write_consistency", &self.write_consistency)
972 }
973}
974
975#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
994#[serde(deny_unknown_fields, default)]
995pub struct ObservabilityConfig {
996 pub otlp_traces_endpoint: Option<String>,
1001 pub otlp_logs_endpoint: Option<String>,
1007 pub service_name: Option<String>,
1010 pub traces_sampling: Option<f64>,
1013}
1014
1015impl ConfPool {
1016 #[must_use]
1036 pub fn resolved_distribution(&self) -> Distribution {
1037 match self.distribution {
1038 None | Some(Distribution::Vnode) => Distribution::Vnode,
1039 Some(Distribution::RandomSlicing) => Distribution::RandomSlicing,
1040 Some(other) => {
1041 tracing::warn!(
1042 target: "dynomite::conf",
1043 distribution = other.as_str(),
1044 "distribution mode '{}' is a legacy alias and resolves to 'vnode'; \
1045 update the YAML to either 'vnode' or 'random_slicing'",
1046 other
1047 );
1048 Distribution::Vnode
1049 }
1050 }
1051 }
1052}
1053
1054impl ConfPool {
1055 pub fn apply_defaults(&mut self) {
1067 if self.dyn_seed_provider.is_none() {
1068 self.dyn_seed_provider = Some(defaults::SEED_PROVIDER.to_string());
1069 }
1070 if self.hash.is_none() {
1071 self.hash = Some(HashType::Murmur);
1072 }
1073 if self.timeout.is_none() {
1074 self.timeout = Some(defaults::TIMEOUT_MS);
1075 }
1076 if self.backlog.is_none() {
1077 self.backlog = Some(defaults::LISTEN_BACKLOG);
1078 }
1079 self.client_connections = Some(defaults::CLIENT_CONNECTIONS);
1082 if self.data_store.is_none() {
1083 self.data_store = Some(defaults::DATA_STORE);
1084 }
1085 if self.preconnect.is_none() {
1086 self.preconnect = Some(defaults::PRECONNECT);
1087 }
1088 if self.auto_eject_hosts.is_none() {
1089 self.auto_eject_hosts = Some(defaults::AUTO_EJECT_HOSTS);
1090 }
1091 if self.server_retry_timeout.is_none() {
1092 self.server_retry_timeout = Some(defaults::SERVER_RETRY_TIMEOUT_MS);
1093 }
1094 if self.server_failure_limit.is_none() {
1095 self.server_failure_limit = Some(defaults::SERVER_FAILURE_LIMIT);
1096 }
1097 if self.dyn_read_timeout.is_none() {
1098 self.dyn_read_timeout = Some(defaults::DYN_READ_TIMEOUT_MS);
1099 }
1100 if self.dyn_write_timeout.is_none() {
1101 self.dyn_write_timeout = Some(defaults::DYN_WRITE_TIMEOUT_MS);
1102 }
1103 if self.dyn_connections.is_none() {
1104 self.dyn_connections = Some(defaults::DYN_CONNECTIONS);
1105 }
1106 if self.gos_interval.is_none() {
1107 self.gos_interval = Some(defaults::GOS_INTERVAL_MS);
1108 }
1109 if self.conn_msg_rate.is_none() {
1110 self.conn_msg_rate = Some(defaults::CONN_MSG_RATE);
1111 }
1112 if self.rack.is_none() {
1113 self.rack = Some(defaults::RACK.to_string());
1114 }
1115 if self.datacenter.is_none() {
1116 self.datacenter = Some(defaults::DC.to_string());
1117 }
1118 if self.secure_server_option.is_none() {
1119 self.secure_server_option = Some(defaults::SECURE_SERVER_OPTION.to_string());
1120 }
1121 if self.read_consistency.is_none() {
1122 self.read_consistency = Some(defaults::CONSISTENCY.to_string());
1123 }
1124 if self.write_consistency.is_none() {
1125 self.write_consistency = Some(defaults::CONSISTENCY.to_string());
1126 }
1127 if self.stats_interval.is_none() {
1128 self.stats_interval = Some(defaults::STATS_INTERVAL_MS);
1129 }
1130 if self.stats_listen.is_none() {
1131 self.stats_listen = Some(
1133 ConfListen::parse("stats_listen", defaults::STATS_PNAME)
1134 .expect("invariant: STATS_PNAME constant is valid"),
1135 );
1136 }
1137 if self.env.is_none() {
1138 self.env = Some(defaults::ENV.to_string());
1139 }
1140 if self.pem_key_file.is_none() {
1141 self.pem_key_file = Some(defaults::PEM_KEY_FILE.to_string());
1142 }
1143 if self.recon_key_file.is_none() {
1144 self.recon_key_file = Some(defaults::RECON_KEY_FILE.to_string());
1145 }
1146 if self.recon_iv_file.is_none() {
1147 self.recon_iv_file = Some(defaults::RECON_IV_FILE.to_string());
1148 }
1149 if self.recon_interval_seconds.is_none() {
1150 self.recon_interval_seconds = Some(defaults::RECON_INTERVAL_SECONDS);
1151 }
1152 if self.datastore_connections.is_none() {
1153 self.datastore_connections = Some(defaults::DATASTORE_CONNECTIONS);
1154 }
1155 if self.local_peer_connections.is_none() {
1156 self.local_peer_connections = Some(defaults::LOCAL_PEER_CONNECTIONS);
1157 }
1158 if self.remote_peer_connections.is_none() {
1159 self.remote_peer_connections = Some(defaults::REMOTE_PEER_CONNECTIONS);
1160 }
1161 if self.read_repairs_enabled.is_none() {
1162 self.read_repairs_enabled = Some(false);
1163 }
1164 if self.enable_gossip.is_none() {
1165 self.enable_gossip = Some(false);
1166 }
1167 self.apply_hinted_handoff_defaults();
1168 }
1169
1170 fn apply_hinted_handoff_defaults(&mut self) {
1176 if self.enable_hinted_handoff.is_none() {
1177 self.enable_hinted_handoff = Some(defaults::ENABLE_HINTED_HANDOFF);
1178 }
1179 if self.hint_ttl_seconds.is_none() {
1180 self.hint_ttl_seconds = Some(defaults::HINT_TTL_SECONDS);
1181 }
1182 if self.hint_store_max_bytes.is_none() {
1183 self.hint_store_max_bytes = Some(defaults::HINT_STORE_MAX_BYTES);
1184 }
1185 if self.hint_drain_interval_ms.is_none() {
1186 self.hint_drain_interval_ms = Some(defaults::HINT_DRAIN_INTERVAL_MS);
1187 }
1188 if self.transport.is_none() {
1189 self.transport = Some(Transport::default());
1190 }
1191 }
1192
1193 pub fn validate(&self, pool_name: &str) -> Result<(), ConfError> {
1210 if pool_name.is_empty() {
1211 return Err(ConfError::EmptyPoolName);
1212 }
1213
1214 if self.listen.is_none() {
1215 return Err(ConfError::MissingRequired("listen"));
1216 }
1217
1218 self.validate_numeric_ranges()?;
1219 self.validate_mbuf_size()?;
1220 self.validate_max_msgs()?;
1221
1222 if let Some(n) = self.data_store {
1223 let ds = DataStore::from_int(n)?;
1224 if ds == DataStore::Noxu {
1225 self.validate_noxu()?;
1226 }
1227 }
1228 if let Some(tag) = &self.hash_tag {
1229 if tag.chars().count() != 2 {
1230 return Err(ConfError::BadHashTag(tag.clone()));
1231 }
1232 }
1233
1234 let secure = if let Some(s) = &self.secure_server_option {
1235 SecureServerOption::parse(s)?
1236 } else {
1237 SecureServerOption::None
1238 };
1239 if let Some(s) = &self.read_consistency {
1240 ConsistencyLevel::parse("read_consistency", s)?;
1241 }
1242 if let Some(s) = &self.write_consistency {
1243 ConsistencyLevel::parse("write_consistency", s)?;
1244 }
1245 if secure != SecureServerOption::None {
1246 match &self.pem_key_file {
1247 Some(s) if !s.is_empty() => {}
1248 _ => return Err(ConfError::MissingRequired("pem_key_file")),
1249 }
1250 }
1251
1252 if let Some(s) = &self.log_format {
1253 crate::core::log::LogFormat::parse(s).map_err(|e| ConfError::BadServer {
1254 field: "log_format",
1255 value: s.clone(),
1256 reason: e.to_string(),
1257 })?;
1258 }
1259
1260 self.validate_bucket_types()?;
1261 self.validate_hinted_handoff()?;
1262 self.validate_peer_tls()?;
1263 self.validate_transport()?;
1264 if let Some(r) = &self.riak {
1265 r.validate()?;
1266 }
1267
1268 match &self.servers {
1269 None => return Err(ConfError::MissingRequired("servers")),
1270 Some(s) if s.is_empty() => return Err(ConfError::MissingRequired("servers")),
1271 Some(s) if s.len() > 1 => {
1272 return Err(ConfError::BadServer {
1273 field: "servers",
1274 value: s.len().to_string(),
1275 reason: "expected exactly one datastore entry".to_string(),
1276 });
1277 }
1278 Some(_) => {}
1279 }
1280
1281 Ok(())
1282 }
1283
1284 fn validate_numeric_ranges(&self) -> Result<(), ConfError> {
1285 check_positive("timeout", self.timeout)?;
1286 check_positive("backlog", self.backlog)?;
1287 check_non_negative("client_connections", self.client_connections)?;
1288 check_positive("server_retry_timeout", self.server_retry_timeout)?;
1289 check_positive("server_failure_limit", self.server_failure_limit)?;
1290 check_positive("dyn_read_timeout", self.dyn_read_timeout)?;
1291 check_positive("dyn_write_timeout", self.dyn_write_timeout)?;
1292 check_positive("gos_interval", self.gos_interval)?;
1293 check_positive("stats_interval", self.stats_interval)?;
1294
1295 if let Some(n) = self.dyn_connections {
1296 if n <= 0 {
1297 return Err(ConfError::OutOfRange {
1298 field: "dyn_connections",
1299 value: n,
1300 reason: "must be a positive non-zero number",
1301 });
1302 }
1303 }
1304 Ok(())
1305 }
1306
1307 fn validate_mbuf_size(&self) -> Result<(), ConfError> {
1308 let Some(n) = self.mbuf_size else {
1309 return Ok(());
1310 };
1311 if n <= 0 {
1312 return Err(ConfError::OutOfRange {
1313 field: "mbuf_size",
1314 value: n,
1315 reason: "must be a positive number",
1316 });
1317 }
1318 if !(defaults::MBUF_MIN_SIZE..=defaults::MBUF_MAX_SIZE).contains(&n) {
1319 return Err(ConfError::OutOfRange {
1320 field: "mbuf_size",
1321 value: n,
1322 reason: "must be between 512 and 512000 bytes",
1323 });
1324 }
1325 if n % 16 != 0 {
1326 return Err(ConfError::OutOfRange {
1327 field: "mbuf_size",
1328 value: n,
1329 reason: "must be a multiple of 16",
1330 });
1331 }
1332 Ok(())
1333 }
1334
1335 fn validate_max_msgs(&self) -> Result<(), ConfError> {
1336 let Some(n) = self.max_msgs else {
1337 return Ok(());
1338 };
1339 if n <= 0 {
1340 return Err(ConfError::OutOfRange {
1341 field: "max_msgs",
1342 value: n,
1343 reason: "requires a non-zero number",
1344 });
1345 }
1346 if !(defaults::ALLOC_MSGS_MIN..=defaults::ALLOC_MSGS_MAX).contains(&n) {
1347 return Err(ConfError::OutOfRange {
1348 field: "max_msgs",
1349 value: n,
1350 reason: "must be between 100000 and 1000000 messages",
1351 });
1352 }
1353 Ok(())
1354 }
1355
1356 fn validate_bucket_types(&self) -> Result<(), ConfError> {
1357 use std::collections::BTreeSet;
1358 let mut seen: BTreeSet<&str> = BTreeSet::new();
1359 for bt in &self.bucket_types {
1360 if bt.name.is_empty() {
1361 return Err(ConfError::BadServer {
1362 field: "bucket_types",
1363 value: String::new(),
1364 reason: "bucket-type name must not be empty".to_string(),
1365 });
1366 }
1367 if !seen.insert(bt.name.as_str()) {
1368 return Err(ConfError::BadServer {
1369 field: "bucket_types",
1370 value: bt.name.clone(),
1371 reason: "duplicate bucket-type name".to_string(),
1372 });
1373 }
1374 ConsistencyLevel::parse("read_consistency", &bt.read_consistency)?;
1375 ConsistencyLevel::parse("write_consistency", &bt.write_consistency)?;
1376 }
1377 if let Some(name) = &self.default_bucket_type {
1378 if !self.bucket_types.iter().any(|bt| &bt.name == name) {
1379 return Err(ConfError::BadServer {
1380 field: "default_bucket_type",
1381 value: name.clone(),
1382 reason: "references an undefined bucket-type name".to_string(),
1383 });
1384 }
1385 }
1386 Ok(())
1387 }
1388
1389 fn validate_noxu(&self) -> Result<(), ConfError> {
1406 if !crate::conf::is_noxu_supported() {
1407 return Err(ConfError::BadNoxuConfig(
1408 "noxu data_store requires dynomited built with --features riak",
1409 ));
1410 }
1411 match self.noxu_path.as_deref() {
1412 Some(p) if !p.as_os_str().is_empty() => Ok(()),
1413 _ => Err(ConfError::BadNoxuConfig(
1414 "data_store: noxu requires a non-empty 'noxu_path:' directive",
1415 )),
1416 }
1417 }
1418
1419 fn validate_hinted_handoff(&self) -> Result<(), ConfError> {
1420 if self.enable_hinted_handoff != Some(true) {
1421 return Ok(());
1422 }
1423 if let Some(ttl) = self.hint_ttl_seconds {
1424 if ttl == 0 {
1425 return Err(ConfError::BadServer {
1426 field: "hint_ttl_seconds",
1427 value: ttl.to_string(),
1428 reason: "must be a positive number when enable_hinted_handoff is true"
1429 .to_string(),
1430 });
1431 }
1432 }
1433 if let Some(cap) = self.hint_store_max_bytes {
1434 if cap == 0 {
1435 return Err(ConfError::BadServer {
1436 field: "hint_store_max_bytes",
1437 value: cap.to_string(),
1438 reason: "must be a positive number when enable_hinted_handoff is true"
1439 .to_string(),
1440 });
1441 }
1442 }
1443 if let Some(period) = self.hint_drain_interval_ms {
1444 if period == 0 {
1445 return Err(ConfError::BadServer {
1446 field: "hint_drain_interval_ms",
1447 value: period.to_string(),
1448 reason: "must be a positive number when enable_hinted_handoff is true"
1449 .to_string(),
1450 });
1451 }
1452 }
1453 Ok(())
1454 }
1455
1456 fn validate_peer_tls(&self) -> Result<(), ConfError> {
1466 validate_tls_pair(
1467 "peer_tls_cert",
1468 "peer_tls_key",
1469 self.peer_tls_cert.as_deref(),
1470 self.peer_tls_key.as_deref(),
1471 )?;
1472 if self.peer_tls_ca.is_some() && self.peer_tls_cert.is_none() {
1473 return Err(ConfError::BadServer {
1474 field: "peer_tls_ca",
1475 value: self
1476 .peer_tls_ca
1477 .as_ref()
1478 .map_or_else(String::new, |p| p.display().to_string()),
1479 reason: "requires peer_tls_cert and peer_tls_key to also be set".into(),
1480 });
1481 }
1482 for (dc, profile) in &self.peer_tls_profiles {
1483 if dc.is_empty() {
1484 return Err(ConfError::BadServer {
1485 field: "peer_tls_profiles",
1486 value: String::new(),
1487 reason: "per-DC TLS profile name must not be empty".into(),
1488 });
1489 }
1490 profile.validate(dc)?;
1491 }
1492 Ok(())
1493 }
1494
1495 fn validate_transport(&self) -> Result<(), ConfError> {
1506 let resolved = self.transport.unwrap_or_default();
1507 if resolved != Transport::Quic {
1508 return Ok(());
1509 }
1510 match (
1511 self.quic_cert_file.as_deref(),
1512 self.quic_key_file.as_deref(),
1513 ) {
1514 (Some(_), Some(_)) => Ok(()),
1515 (None, _) => Err(ConfError::BadServer {
1516 field: "quic_cert_file",
1517 value: String::new(),
1518 reason: "transport: quic requires quic_cert_file to be set".into(),
1519 }),
1520 (Some(_), None) => Err(ConfError::BadServer {
1521 field: "quic_key_file",
1522 value: String::new(),
1523 reason: "transport: quic requires quic_key_file to be set".into(),
1524 }),
1525 }
1526 }
1527}
1528
1529impl fmt::Display for ConfPool {
1530 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1531 match serde_yaml::to_string(self) {
1535 Ok(s) => f.write_str(&s),
1536 Err(_) => Err(fmt::Error),
1537 }
1538 }
1539}
1540
1541fn check_positive(field: &'static str, v: Option<i64>) -> Result<(), ConfError> {
1542 if let Some(n) = v {
1543 if n <= 0 {
1544 return Err(ConfError::OutOfRange {
1545 field,
1546 value: n,
1547 reason: "must be a positive number",
1548 });
1549 }
1550 }
1551 Ok(())
1552}
1553
1554fn check_non_negative(field: &'static str, v: Option<i64>) -> Result<(), ConfError> {
1555 if let Some(n) = v {
1556 if n < 0 {
1557 return Err(ConfError::OutOfRange {
1558 field,
1559 value: n,
1560 reason: "must be a non-negative number",
1561 });
1562 }
1563 }
1564 Ok(())
1565}
1566
1567fn deserialize_data_store<'de, D>(de: D) -> Result<Option<i64>, D::Error>
1572where
1573 D: serde::Deserializer<'de>,
1574{
1575 use serde::de::{self, Visitor};
1576 use std::fmt;
1577
1578 struct V;
1579 impl<'de> Visitor<'de> for V {
1580 type Value = Option<i64>;
1581
1582 fn expecting(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1583 f.write_str("a data_store value: integer (0, 1, 2) or string (redis, memcache, noxu)")
1584 }
1585
1586 fn visit_none<E: de::Error>(self) -> Result<Self::Value, E> {
1587 Ok(None)
1588 }
1589
1590 fn visit_unit<E: de::Error>(self) -> Result<Self::Value, E> {
1591 Ok(None)
1592 }
1593
1594 fn visit_some<D2: serde::Deserializer<'de>>(
1595 self,
1596 de: D2,
1597 ) -> Result<Self::Value, D2::Error> {
1598 de.deserialize_any(V)
1599 }
1600
1601 fn visit_i64<E: de::Error>(self, v: i64) -> Result<Self::Value, E> {
1602 DataStore::from_int(v)
1603 .map(|d| Some(d.as_int()))
1604 .map_err(|e| E::custom(e.to_string()))
1605 }
1606
1607 fn visit_u64<E: de::Error>(self, v: u64) -> Result<Self::Value, E> {
1608 let n = i64::try_from(v).map_err(|_| E::custom("data_store integer overflow"))?;
1609 self.visit_i64(n)
1610 }
1611
1612 fn visit_str<E: de::Error>(self, v: &str) -> Result<Self::Value, E> {
1613 DataStore::from_name(v)
1614 .map(|d| Some(d.as_int()))
1615 .map_err(|_| {
1616 E::custom(format!(
1617 "data_store: unknown name '{v}'; expected one of: redis, memcache, noxu"
1618 ))
1619 })
1620 }
1621
1622 fn visit_string<E: de::Error>(self, v: String) -> Result<Self::Value, E> {
1623 self.visit_str(&v)
1624 }
1625 }
1626
1627 de.deserialize_any(V)
1628}
1629
1630#[cfg(test)]
1631mod tests {
1632 use super::*;
1633
1634 fn pool() -> ConfPool {
1635 ConfPool {
1636 listen: Some(ConfListen::parse("listen", "127.0.0.1:8102").unwrap()),
1637 servers: Some(Servers::from_vec(vec![ConfServer::parse(
1638 "127.0.0.1:6379:1",
1639 )
1640 .unwrap()])),
1641 tokens: Some(TokenList::parse("0").unwrap()),
1642 ..ConfPool::default()
1643 }
1644 }
1645
1646 #[test]
1647 fn validate_minimal_post_finalize() {
1648 let mut p = pool();
1649 p.apply_defaults();
1650 p.validate("dyn_o_mite").unwrap();
1651 }
1652
1653 #[test]
1654 fn missing_listen_rejected() {
1655 let mut p = pool();
1656 p.listen = None;
1657 p.apply_defaults();
1658 assert!(matches!(
1659 p.validate("p"),
1660 Err(ConfError::MissingRequired("listen"))
1661 ));
1662 }
1663
1664 #[test]
1665 fn out_of_range_mbuf_rejected() {
1666 let mut p = pool();
1667 p.mbuf_size = Some(127);
1668 p.apply_defaults();
1669 assert!(matches!(p.validate("p"), Err(ConfError::OutOfRange { .. })));
1670 }
1671
1672 #[test]
1673 fn distribution_field_round_trips_through_yaml() {
1674 let yaml = r"
1675p:
1676 listen: 127.0.0.1:8102
1677 dyn_listen: 127.0.0.1:8101
1678 tokens: '0'
1679 servers:
1680 - 127.0.0.1:6379:1
1681 data_store: 0
1682 distribution: random_slicing
1683 distribution_shadow: vnode
1684 hash: murmur3_x64_64
1685";
1686 let parsed: std::collections::BTreeMap<String, ConfPool> =
1687 serde_yaml::from_str(yaml).unwrap();
1688 let pool = parsed.get("p").unwrap();
1689 assert_eq!(pool.distribution, Some(Distribution::RandomSlicing));
1690 assert_eq!(pool.distribution_shadow, Some(Distribution::Vnode));
1691 assert_eq!(pool.hash, Some(HashType::Murmur3X64_64));
1692 assert_eq!(pool.resolved_distribution(), Distribution::RandomSlicing);
1693 }
1694
1695 #[test]
1696 fn distribution_legacy_alias_resolves_to_vnode() {
1697 let mut p = pool();
1698 p.distribution = Some(Distribution::Ketama);
1699 assert_eq!(p.resolved_distribution(), Distribution::Vnode);
1700 p.distribution = Some(Distribution::Modula);
1701 assert_eq!(p.resolved_distribution(), Distribution::Vnode);
1702 p.distribution = Some(Distribution::Random);
1703 assert_eq!(p.resolved_distribution(), Distribution::Vnode);
1704 }
1705
1706 #[test]
1707 fn distribution_default_unset_is_vnode() {
1708 let p = pool();
1709 assert!(p.distribution.is_none());
1710 assert_eq!(p.resolved_distribution(), Distribution::Vnode);
1711 }
1712
1713 #[test]
1714 fn mbuf_size_not_multiple_of_16_rejected() {
1715 let mut p = pool();
1716 p.mbuf_size = Some(513);
1717 p.apply_defaults();
1718 assert!(matches!(p.validate("p"), Err(ConfError::OutOfRange { .. })));
1719 }
1720
1721 #[test]
1722 fn pem_required_when_secure() {
1723 let mut p = pool();
1724 p.secure_server_option = Some("datacenter".to_string());
1725 p.pem_key_file = Some(String::new());
1726 p.apply_defaults();
1727 assert!(matches!(
1730 p.validate("p"),
1731 Err(ConfError::MissingRequired("pem_key_file"))
1732 ));
1733 }
1734
1735 #[test]
1736 fn data_store_out_of_range_rejected() {
1737 let mut p = pool();
1738 p.data_store = Some(7);
1739 p.apply_defaults();
1740 assert!(matches!(p.validate("p"), Err(ConfError::BadDataStore(7))));
1741 }
1742
1743 static NOXU_FLAG_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
1748
1749 #[test]
1750 fn data_store_noxu_requires_riak_feature() {
1751 let _g = NOXU_FLAG_LOCK
1752 .lock()
1753 .unwrap_or_else(std::sync::PoisonError::into_inner);
1754 let prev = crate::conf::is_noxu_supported();
1757 crate::conf::set_noxu_supported(false);
1758 let mut p = pool();
1759 p.data_store = Some(2);
1760 p.noxu_path = Some("/tmp/test".into());
1761 p.apply_defaults();
1762 let err = p.validate("p");
1763 crate::conf::set_noxu_supported(prev);
1764 match err {
1765 Err(ConfError::BadNoxuConfig(msg)) => {
1766 assert!(msg.contains("--features riak"), "unexpected message: {msg}");
1767 }
1768 other => panic!("expected BadNoxuConfig, got {other:?}"),
1769 }
1770 }
1771
1772 #[test]
1773 fn data_store_noxu_requires_path() {
1774 let _g = NOXU_FLAG_LOCK
1775 .lock()
1776 .unwrap_or_else(std::sync::PoisonError::into_inner);
1777 let prev = crate::conf::is_noxu_supported();
1778 crate::conf::set_noxu_supported(true);
1779 let mut p = pool();
1780 p.data_store = Some(2);
1781 p.noxu_path = None;
1782 p.apply_defaults();
1783 let err = p.validate("p");
1784 crate::conf::set_noxu_supported(prev);
1785 match err {
1786 Err(ConfError::BadNoxuConfig(msg)) => {
1787 assert!(msg.contains("noxu_path"), "unexpected message: {msg}");
1788 }
1789 other => panic!("expected BadNoxuConfig, got {other:?}"),
1790 }
1791 }
1792
1793 #[test]
1794 fn data_store_noxu_yaml_round_trip_string_form() {
1795 let yaml = r"
1798listen: 127.0.0.1:8102
1799servers:
1800- 127.0.0.1:6379:1
1801tokens: '0'
1802data_store: noxu
1803noxu_path: /tmp/test
1804";
1805 let p: ConfPool = serde_yaml::from_str(yaml).unwrap();
1806 assert_eq!(p.data_store, Some(2));
1807 assert_eq!(
1808 p.noxu_path.as_deref(),
1809 Some(std::path::Path::new("/tmp/test"))
1810 );
1811 let dumped = serde_yaml::to_string(&p).unwrap();
1813 let p2: ConfPool = serde_yaml::from_str(&dumped).unwrap();
1814 assert_eq!(p2.data_store, p.data_store);
1815 assert_eq!(p2.noxu_path, p.noxu_path);
1816 }
1817
1818 #[test]
1819 fn data_store_yaml_int_form_still_works() {
1820 let yaml = r"
1821listen: 127.0.0.1:8102
1822servers:
1823- 127.0.0.1:6379:1
1824tokens: '0'
1825data_store: 2
1826noxu_path: /tmp/test
1827";
1828 let p: ConfPool = serde_yaml::from_str(yaml).unwrap();
1829 assert_eq!(p.data_store, Some(2));
1830 }
1831
1832 #[test]
1833 fn data_store_string_form_unknown_rejected() {
1834 let yaml = r"
1835listen: 127.0.0.1:8102
1836servers:
1837- 127.0.0.1:6379:1
1838tokens: '0'
1839data_store: postgres
1840";
1841 let err = serde_yaml::from_str::<ConfPool>(yaml).unwrap_err();
1842 let msg = err.to_string();
1843 assert!(
1844 msg.contains("unknown name") || msg.contains("data_store"),
1845 "unexpected message: {msg}"
1846 );
1847 }
1848
1849 #[test]
1850 fn hash_tag_must_be_two_chars() {
1851 let mut p = pool();
1852 p.hash_tag = Some("abc".to_string());
1853 p.apply_defaults();
1854 assert!(matches!(p.validate("p"), Err(ConfError::BadHashTag(_))));
1855 }
1856
1857 #[test]
1858 fn empty_servers_rejected() {
1859 let mut p = pool();
1860 p.servers = Some(Servers::from_vec(vec![]));
1861 p.apply_defaults();
1862 assert!(matches!(
1863 p.validate("p"),
1864 Err(ConfError::MissingRequired("servers"))
1865 ));
1866 }
1867
1868 #[test]
1869 fn log_format_known_values_accepted() {
1870 for value in ["default", "rfc5424", "rfc3164", "json", "ndjson", "DEFAULT"] {
1871 let mut p = pool();
1872 p.log_format = Some(value.to_string());
1873 p.apply_defaults();
1874 assert!(p.validate("p").is_ok(), "value {value:?} should validate");
1875 }
1876 }
1877
1878 #[test]
1879 fn log_format_unknown_rejected() {
1880 let mut p = pool();
1881 p.log_format = Some("yaml".to_string());
1882 p.apply_defaults();
1883 let err = p.validate("p").unwrap_err();
1884 assert!(
1885 matches!(
1886 err,
1887 ConfError::BadServer {
1888 field: "log_format",
1889 ..
1890 }
1891 ),
1892 "unexpected error: {err:?}"
1893 );
1894 }
1895
1896 #[test]
1897 fn observability_block_round_trips() {
1898 let yaml = r"
1899observability:
1900 otlp_logs_endpoint: http://collector:4317
1901 service_name: dynomited
1902listen: 127.0.0.1:8102
1903servers:
1904- 127.0.0.1:6379:1
1905tokens: '0'
1906";
1907 let p: ConfPool = serde_yaml::from_str(yaml).unwrap();
1908 let obs = p.observability.as_ref().expect("observability set");
1909 assert_eq!(
1910 obs.otlp_logs_endpoint.as_deref(),
1911 Some("http://collector:4317")
1912 );
1913 assert_eq!(obs.service_name.as_deref(), Some("dynomited"));
1914 }
1915
1916 #[test]
1917 fn bucket_types_round_trip() {
1918 let yaml = r"
1919listen: 127.0.0.1:8102
1920servers:
1921- 127.0.0.1:6379:1
1922tokens: '0'
1923bucket_types:
1924- name: hot
1925 read_consistency: DC_QUORUM
1926 write_consistency: DC_EACH_SAFE_QUORUM
1927 n_val: 3
1928- name: cold
1929 read_consistency: DC_ONE
1930 write_consistency: DC_ONE
1931 n_val: 1
1932default_bucket_type: cold
1933";
1934 let p: ConfPool = serde_yaml::from_str(yaml).unwrap();
1935 assert_eq!(p.bucket_types.len(), 2);
1936 assert_eq!(p.bucket_types[0].name, "hot");
1937 assert_eq!(p.bucket_types[0].n_val, 3);
1938 assert_eq!(
1939 p.bucket_types[0].read_level().unwrap(),
1940 crate::conf::ConsistencyLevel::DcQuorum,
1941 );
1942 assert_eq!(p.default_bucket_type.as_deref(), Some("cold"));
1943 let dumped = serde_yaml::to_string(&p).unwrap();
1945 let p2: ConfPool = serde_yaml::from_str(&dumped).unwrap();
1946 assert_eq!(p2.bucket_types, p.bucket_types);
1947 assert_eq!(p2.default_bucket_type, p.default_bucket_type);
1948 }
1949
1950 #[test]
1951 fn bucket_types_default_is_empty() {
1952 let mut p = pool();
1953 p.apply_defaults();
1954 assert!(p.bucket_types.is_empty());
1955 assert!(p.default_bucket_type.is_none());
1956 assert!(p.validate("p").is_ok());
1957 }
1958
1959 #[test]
1960 fn duplicate_bucket_type_name_rejected() {
1961 let mut p = pool();
1962 p.bucket_types = vec![
1963 ConfBucketType {
1964 name: "a".into(),
1965 read_consistency: "DC_ONE".into(),
1966 write_consistency: "DC_ONE".into(),
1967 n_val: 0,
1968 },
1969 ConfBucketType {
1970 name: "a".into(),
1971 read_consistency: "DC_ONE".into(),
1972 write_consistency: "DC_ONE".into(),
1973 n_val: 0,
1974 },
1975 ];
1976 p.apply_defaults();
1977 let err = p.validate("p").unwrap_err();
1978 assert!(
1979 matches!(
1980 err,
1981 ConfError::BadServer {
1982 field: "bucket_types",
1983 ..
1984 }
1985 ),
1986 "unexpected error: {err:?}",
1987 );
1988 }
1989
1990 #[test]
1991 fn bucket_type_unknown_consistency_rejected() {
1992 let mut p = pool();
1993 p.bucket_types = vec![ConfBucketType {
1994 name: "a".into(),
1995 read_consistency: "DC_PURPLE".into(),
1996 write_consistency: "DC_ONE".into(),
1997 n_val: 0,
1998 }];
1999 p.apply_defaults();
2000 let err = p.validate("p").unwrap_err();
2001 assert!(matches!(err, ConfError::BadConsistency { .. }));
2002 }
2003
2004 #[test]
2005 fn unknown_default_bucket_type_rejected() {
2006 let mut p = pool();
2007 p.default_bucket_type = Some("missing".into());
2008 p.apply_defaults();
2009 let err = p.validate("p").unwrap_err();
2010 assert!(matches!(
2011 err,
2012 ConfError::BadServer {
2013 field: "default_bucket_type",
2014 ..
2015 }
2016 ));
2017 }
2018
2019 #[test]
2020 fn hinted_handoff_default_off_with_canonical_constants() {
2021 let mut p = pool();
2022 p.apply_defaults();
2023 assert_eq!(p.enable_hinted_handoff, Some(false));
2024 assert_eq!(p.hint_ttl_seconds, Some(defaults::HINT_TTL_SECONDS));
2025 assert_eq!(p.hint_store_max_bytes, Some(defaults::HINT_STORE_MAX_BYTES));
2026 assert_eq!(
2027 p.hint_drain_interval_ms,
2028 Some(defaults::HINT_DRAIN_INTERVAL_MS)
2029 );
2030 assert!(p.validate("p").is_ok());
2031 }
2032
2033 #[test]
2034 fn hinted_handoff_yaml_round_trip() {
2035 let yaml = r"
2036listen: 127.0.0.1:8102
2037servers:
2038- 127.0.0.1:6379:1
2039tokens: '0'
2040enable_hinted_handoff: true
2041hint_ttl_seconds: 7200
2042hint_store_max_bytes: 8388608
2043hint_drain_interval_ms: 5000
2044";
2045 let p: ConfPool = serde_yaml::from_str(yaml).unwrap();
2046 assert_eq!(p.enable_hinted_handoff, Some(true));
2047 assert_eq!(p.hint_ttl_seconds, Some(7200));
2048 assert_eq!(p.hint_store_max_bytes, Some(8_388_608));
2049 assert_eq!(p.hint_drain_interval_ms, Some(5_000));
2050 let dumped = serde_yaml::to_string(&p).unwrap();
2051 let p2: ConfPool = serde_yaml::from_str(&dumped).unwrap();
2052 assert_eq!(p2.enable_hinted_handoff, p.enable_hinted_handoff);
2053 assert_eq!(p2.hint_ttl_seconds, p.hint_ttl_seconds);
2054 assert_eq!(p2.hint_store_max_bytes, p.hint_store_max_bytes);
2055 assert_eq!(p2.hint_drain_interval_ms, p.hint_drain_interval_ms);
2056 }
2057
2058 #[test]
2059 fn hinted_handoff_zero_ttl_rejected_when_enabled() {
2060 let mut p = pool();
2061 p.enable_hinted_handoff = Some(true);
2062 p.hint_ttl_seconds = Some(0);
2063 p.apply_defaults();
2064 let err = p.validate("p").unwrap_err();
2067 assert!(matches!(
2068 err,
2069 ConfError::BadServer {
2070 field: "hint_ttl_seconds",
2071 ..
2072 }
2073 ));
2074 }
2075
2076 #[test]
2077 fn hinted_handoff_zero_max_bytes_rejected_when_enabled() {
2078 let mut p = pool();
2079 p.enable_hinted_handoff = Some(true);
2080 p.hint_store_max_bytes = Some(0);
2081 p.apply_defaults();
2082 let err = p.validate("p").unwrap_err();
2083 assert!(matches!(
2084 err,
2085 ConfError::BadServer {
2086 field: "hint_store_max_bytes",
2087 ..
2088 }
2089 ));
2090 }
2091
2092 #[test]
2093 fn hinted_handoff_zero_values_ignored_when_disabled() {
2094 let mut p = pool();
2098 p.enable_hinted_handoff = Some(false);
2099 p.hint_ttl_seconds = Some(0);
2100 p.hint_store_max_bytes = Some(0);
2101 p.hint_drain_interval_ms = Some(0);
2102 p.apply_defaults();
2103 assert!(p.validate("p").is_ok());
2104 }
2105
2106 #[test]
2107 fn riak_block_validates_when_unset() {
2108 let mut p = pool();
2109 p.riak = Some(ConfRiak::default());
2110 p.apply_defaults();
2111 assert!(p.validate("p").is_ok());
2112 }
2113
2114 #[test]
2115 fn riak_block_validates_with_addresses() {
2116 let mut p = pool();
2117 p.riak = Some(ConfRiak {
2118 pbc_listen: Some("127.0.0.1:8087".into()),
2119 http_listen: Some("127.0.0.1:8098".into()),
2120 ..ConfRiak::default()
2121 });
2122 p.apply_defaults();
2123 assert!(p.validate("p").is_ok());
2124 }
2125
2126 #[test]
2127 fn riak_block_rejects_bad_pbc_addr() {
2128 let mut p = pool();
2129 p.riak = Some(ConfRiak {
2130 pbc_listen: Some(String::new()),
2131 ..ConfRiak::default()
2132 });
2133 p.apply_defaults();
2134 assert!(matches!(p.validate("p"), Err(ConfError::BadServer { .. })));
2135 }
2136
2137 #[test]
2138 fn riak_block_rejects_segment_above_full_sweep() {
2139 let mut p = pool();
2140 p.riak = Some(ConfRiak {
2141 aae_segment_interval_seconds: Some(120),
2142 aae_full_sweep_interval_seconds: Some(60),
2143 ..ConfRiak::default()
2144 });
2145 p.apply_defaults();
2146 assert!(matches!(p.validate("p"), Err(ConfError::BadServer { .. })));
2147 }
2148
2149 #[test]
2150 fn riak_block_round_trips_through_yaml() {
2151 let yaml = r"
2152p:
2153 listen: 127.0.0.1:1
2154 dyn_listen: 127.0.0.1:2
2155 tokens: '0'
2156 servers:
2157 - 127.0.0.1:3:1
2158 data_store: 0
2159 riak:
2160 pbc_listen: 127.0.0.1:8087
2161 http_listen: 127.0.0.1:8098
2162 aae_enabled: true
2163 aae_full_sweep_interval_seconds: 3600
2164 aae_segment_interval_seconds: 30
2165";
2166 let cfg: std::collections::BTreeMap<String, ConfPool> = serde_yaml::from_str(yaml).unwrap();
2167 let p = cfg.get("p").unwrap();
2168 let r = p.riak.as_ref().unwrap();
2169 assert_eq!(r.pbc_listen.as_deref(), Some("127.0.0.1:8087"));
2170 assert_eq!(r.http_listen.as_deref(), Some("127.0.0.1:8098"));
2171 assert_eq!(r.aae_enabled, Some(true));
2172 assert_eq!(r.aae_full_sweep_interval_seconds, Some(3600));
2173 assert_eq!(r.aae_segment_interval_seconds, Some(30));
2174 }
2175
2176 #[test]
2177 fn peer_tls_pair_unset_is_ok() {
2178 let mut p = pool();
2179 p.apply_defaults();
2180 assert!(p.validate("p").is_ok(), "plaintext default must validate");
2181 }
2182
2183 #[test]
2184 fn peer_tls_pair_both_set_is_ok() {
2185 let mut p = pool();
2186 p.peer_tls_cert = Some(std::path::PathBuf::from("/etc/dynomite/peer.crt"));
2187 p.peer_tls_key = Some(std::path::PathBuf::from("/etc/dynomite/peer.key"));
2188 p.apply_defaults();
2189 assert!(p.validate("p").is_ok());
2190 }
2191
2192 #[test]
2193 fn peer_tls_cert_without_key_rejected() {
2194 let mut p = pool();
2195 p.peer_tls_cert = Some(std::path::PathBuf::from("/x.crt"));
2196 p.apply_defaults();
2197 let err = p.validate("p").unwrap_err();
2198 assert!(
2199 matches!(
2200 err,
2201 ConfError::BadServer {
2202 field: "peer_tls_cert",
2203 ..
2204 }
2205 ),
2206 "got {err:?}"
2207 );
2208 }
2209
2210 #[test]
2211 fn peer_tls_key_without_cert_rejected() {
2212 let mut p = pool();
2213 p.peer_tls_key = Some(std::path::PathBuf::from("/x.key"));
2214 p.apply_defaults();
2215 let err = p.validate("p").unwrap_err();
2216 assert!(
2217 matches!(
2218 err,
2219 ConfError::BadServer {
2220 field: "peer_tls_key",
2221 ..
2222 }
2223 ),
2224 "got {err:?}"
2225 );
2226 }
2227
2228 #[test]
2229 fn peer_tls_ca_without_cert_rejected() {
2230 let mut p = pool();
2231 p.peer_tls_ca = Some(std::path::PathBuf::from("/x.ca"));
2232 p.apply_defaults();
2233 let err = p.validate("p").unwrap_err();
2234 assert!(
2235 matches!(
2236 err,
2237 ConfError::BadServer {
2238 field: "peer_tls_ca",
2239 ..
2240 }
2241 ),
2242 "got {err:?}"
2243 );
2244 }
2245
2246 #[test]
2247 fn riak_tls_cert_without_key_rejected() {
2248 let mut p = pool();
2249 p.riak = Some(ConfRiak {
2250 pbc_listen: Some("127.0.0.1:8087".into()),
2251 tls_cert: Some(std::path::PathBuf::from("/x.crt")),
2252 ..ConfRiak::default()
2253 });
2254 p.apply_defaults();
2255 let err = p.validate("p").unwrap_err();
2256 assert!(
2257 matches!(
2258 err,
2259 ConfError::BadServer {
2260 field: "tls_cert",
2261 ..
2262 }
2263 ),
2264 "got {err:?}"
2265 );
2266 }
2267
2268 #[test]
2269 fn riak_tls_pair_both_set_is_ok() {
2270 let mut p = pool();
2271 p.riak = Some(ConfRiak {
2272 pbc_listen: Some("127.0.0.1:8087".into()),
2273 tls_cert: Some(std::path::PathBuf::from("/x.crt")),
2274 tls_key: Some(std::path::PathBuf::from("/x.key")),
2275 ..ConfRiak::default()
2276 });
2277 p.apply_defaults();
2278 assert!(p.validate("p").is_ok());
2279 }
2280
2281 #[test]
2282 fn riak_wasm_modules_yaml_round_trip() {
2283 let dir = tempfile::tempdir().unwrap();
2284 let m1 = dir.path().join("identity.wasm");
2285 let m2 = dir.path().join("sum.wasm");
2286 std::fs::write(&m1, b"\0asm\x01\0\0\0").unwrap();
2287 std::fs::write(&m2, b"\0asm\x01\0\0\0").unwrap();
2288 let yaml = format!(
2289 r"
2290listen: 127.0.0.1:8102
2291servers:
2292- 127.0.0.1:6379:1
2293tokens: '0'
2294riak:
2295 pbc_listen: 127.0.0.1:8087
2296 wasm_modules:
2297 - id: identity
2298 path: {m1}
2299 - id: sum
2300 path: {m2}
2301",
2302 m1 = m1.display(),
2303 m2 = m2.display(),
2304 );
2305 let p: ConfPool = serde_yaml::from_str(&yaml).unwrap();
2306 let r = p.riak.as_ref().unwrap();
2307 let mods = r.wasm_modules.as_ref().unwrap();
2308 assert_eq!(mods.len(), 2);
2309 assert_eq!(mods[0].id, "identity");
2310 assert_eq!(mods[0].path, m1);
2311 assert_eq!(mods[1].id, "sum");
2312 assert_eq!(mods[1].path, m2);
2313 let dumped = serde_yaml::to_string(&p).unwrap();
2315 let p2: ConfPool = serde_yaml::from_str(&dumped).unwrap();
2316 assert_eq!(p2.riak.unwrap().wasm_modules, r.wasm_modules);
2317 }
2318
2319 #[test]
2320 fn riak_wasm_modules_unique_ids_required() {
2321 let dir = tempfile::tempdir().unwrap();
2322 let path = dir.path().join("m.wasm");
2323 std::fs::write(&path, b"\0").unwrap();
2324 let r = ConfRiak {
2325 wasm_modules: Some(vec![
2326 ConfRiakWasmModule {
2327 id: "m".into(),
2328 path: path.clone(),
2329 },
2330 ConfRiakWasmModule {
2331 id: "m".into(),
2332 path: path.clone(),
2333 },
2334 ]),
2335 ..ConfRiak::default()
2336 };
2337 let err = r.validate().unwrap_err();
2338 assert!(matches!(
2339 err,
2340 ConfError::BadServer {
2341 field: "wasm_modules.id",
2342 ..
2343 }
2344 ));
2345 }
2346
2347 #[test]
2348 fn riak_wasm_modules_path_must_exist() {
2349 let r = ConfRiak {
2350 wasm_modules: Some(vec![ConfRiakWasmModule {
2351 id: "missing".into(),
2352 path: std::path::PathBuf::from("/no/such/path/at/all.wasm"),
2353 }]),
2354 ..ConfRiak::default()
2355 };
2356 let err = r.validate().unwrap_err();
2357 assert!(matches!(
2358 err,
2359 ConfError::BadServer {
2360 field: "wasm_modules.path",
2361 ..
2362 }
2363 ));
2364 }
2365
2366 #[test]
2367 fn riak_wasm_modules_empty_id_rejected() {
2368 let dir = tempfile::tempdir().unwrap();
2369 let path = dir.path().join("m.wasm");
2370 std::fs::write(&path, b"\0").unwrap();
2371 let r = ConfRiak {
2372 wasm_modules: Some(vec![ConfRiakWasmModule {
2373 id: String::new(),
2374 path,
2375 }]),
2376 ..ConfRiak::default()
2377 };
2378 let err = r.validate().unwrap_err();
2379 assert!(matches!(
2380 err,
2381 ConfError::BadServer {
2382 field: "wasm_modules.id",
2383 ..
2384 }
2385 ));
2386 }
2387
2388 #[test]
2389 fn peer_tls_profile_pair_unset_is_ok() {
2390 let p = ConfTlsProfile::default();
2391 assert!(p.validate("dc1").is_ok());
2392 }
2393
2394 #[test]
2395 fn peer_tls_profile_cert_without_key_rejected() {
2396 let p = ConfTlsProfile {
2397 cert: Some(std::path::PathBuf::from("/x.crt")),
2398 ..ConfTlsProfile::default()
2399 };
2400 let err = p.validate("dc1").unwrap_err();
2401 assert!(matches!(
2402 err,
2403 ConfError::BadServer {
2404 field: "peer_tls_profiles.cert",
2405 ..
2406 }
2407 ));
2408 }
2409
2410 #[test]
2411 fn peer_tls_profile_key_without_cert_rejected() {
2412 let p = ConfTlsProfile {
2413 key: Some(std::path::PathBuf::from("/x.key")),
2414 ..ConfTlsProfile::default()
2415 };
2416 let err = p.validate("dc1").unwrap_err();
2417 assert!(matches!(
2418 err,
2419 ConfError::BadServer {
2420 field: "peer_tls_profiles.key",
2421 ..
2422 }
2423 ));
2424 }
2425
2426 #[test]
2427 fn peer_tls_profile_ca_without_cert_rejected() {
2428 let p = ConfTlsProfile {
2429 ca: Some(std::path::PathBuf::from("/x.ca")),
2430 ..ConfTlsProfile::default()
2431 };
2432 let err = p.validate("dc1").unwrap_err();
2433 assert!(matches!(
2434 err,
2435 ConfError::BadServer {
2436 field: "peer_tls_profiles.ca",
2437 ..
2438 }
2439 ));
2440 }
2441
2442 #[test]
2443 fn peer_tls_profiles_empty_dc_name_rejected() {
2444 let mut p = pool();
2445 p.peer_tls_profiles.insert(
2446 String::new(),
2447 ConfTlsProfile {
2448 cert: Some(std::path::PathBuf::from("/x.crt")),
2449 key: Some(std::path::PathBuf::from("/x.key")),
2450 ca: None,
2451 },
2452 );
2453 p.apply_defaults();
2454 let err = p.validate("p").unwrap_err();
2455 assert!(matches!(
2456 err,
2457 ConfError::BadServer {
2458 field: "peer_tls_profiles",
2459 ..
2460 }
2461 ));
2462 }
2463
2464 #[test]
2465 fn peer_tls_profiles_per_dc_pair_validates() {
2466 let mut p = pool();
2467 p.peer_tls_profiles.insert(
2468 "dc1".into(),
2469 ConfTlsProfile {
2470 cert: Some(std::path::PathBuf::from("/dc1.crt")),
2471 key: Some(std::path::PathBuf::from("/dc1.key")),
2472 ca: None,
2473 },
2474 );
2475 p.apply_defaults();
2476 assert!(p.validate("p").is_ok());
2477 }
2478
2479 #[test]
2480 fn peer_tls_profiles_per_dc_cert_without_key_rejected() {
2481 let mut p = pool();
2482 p.peer_tls_profiles.insert(
2483 "dc1".into(),
2484 ConfTlsProfile {
2485 cert: Some(std::path::PathBuf::from("/dc1.crt")),
2486 key: None,
2487 ca: None,
2488 },
2489 );
2490 p.apply_defaults();
2491 let err = p.validate("p").unwrap_err();
2492 assert!(matches!(
2493 err,
2494 ConfError::BadServer {
2495 field: "peer_tls_profiles.cert",
2496 ..
2497 }
2498 ));
2499 }
2500
2501 #[test]
2502 fn peer_tls_profiles_yaml_round_trip() {
2503 let yaml = r"
2504listen: 127.0.0.1:8102
2505servers:
2506- 127.0.0.1:6379:1
2507tokens: '0'
2508peer_tls_profiles:
2509 dc1:
2510 cert: /etc/dynomite/dc1.pem
2511 key: /etc/dynomite/dc1.key
2512 ca: /etc/dynomite/dc1-ca.pem
2513 dc2:
2514 cert: /etc/dynomite/dc2.pem
2515 key: /etc/dynomite/dc2.key
2516";
2517 let p: ConfPool = serde_yaml::from_str(yaml).unwrap();
2518 assert_eq!(p.peer_tls_profiles.len(), 2);
2519 assert_eq!(
2520 p.peer_tls_profiles["dc1"].cert.as_deref(),
2521 Some(std::path::Path::new("/etc/dynomite/dc1.pem"))
2522 );
2523 assert!(p.peer_tls_profiles["dc2"].ca.is_none());
2524 let dumped = serde_yaml::to_string(&p).unwrap();
2525 let p2: ConfPool = serde_yaml::from_str(&dumped).unwrap();
2526 assert_eq!(p2.peer_tls_profiles, p.peer_tls_profiles);
2527 }
2528
2529 #[test]
2530 fn transport_default_is_tcp_after_finalize() {
2531 let mut p = pool();
2532 p.apply_defaults();
2533 assert_eq!(p.transport, Some(Transport::Tcp));
2534 assert!(p.validate("p").is_ok());
2535 }
2536
2537 #[test]
2538 fn transport_quic_yaml_round_trip() {
2539 let yaml = r"
2540listen: 127.0.0.1:8102
2541servers:
2542- 127.0.0.1:6379:1
2543tokens: '0'
2544transport: quic
2545quic_cert_file: /tmp/test.crt
2546quic_key_file: /tmp/test.key
2547";
2548 let p: ConfPool = serde_yaml::from_str(yaml).unwrap();
2549 assert_eq!(p.transport, Some(Transport::Quic));
2550 assert_eq!(
2551 p.quic_cert_file.as_deref(),
2552 Some(std::path::Path::new("/tmp/test.crt"))
2553 );
2554 assert_eq!(
2555 p.quic_key_file.as_deref(),
2556 Some(std::path::Path::new("/tmp/test.key"))
2557 );
2558 let dumped = serde_yaml::to_string(&p).unwrap();
2559 let p2: ConfPool = serde_yaml::from_str(&dumped).unwrap();
2560 assert_eq!(p2.transport, p.transport);
2561 assert_eq!(p2.quic_cert_file, p.quic_cert_file);
2562 assert_eq!(p2.quic_key_file, p.quic_key_file);
2563 }
2564
2565 #[test]
2566 fn transport_quic_requires_cert_and_key() {
2567 let mut p = pool();
2568 p.transport = Some(Transport::Quic);
2569 p.apply_defaults();
2570 let err = p.validate("p").unwrap_err();
2571 assert!(matches!(
2572 err,
2573 ConfError::BadServer {
2574 field: "quic_cert_file",
2575 ..
2576 }
2577 ));
2578 p.quic_cert_file = Some(std::path::PathBuf::from("/tmp/c.pem"));
2579 let err = p.validate("p").unwrap_err();
2580 assert!(matches!(
2581 err,
2582 ConfError::BadServer {
2583 field: "quic_key_file",
2584 ..
2585 }
2586 ));
2587 p.quic_key_file = Some(std::path::PathBuf::from("/tmp/k.pem"));
2588 assert!(p.validate("p").is_ok());
2589 }
2590
2591 #[test]
2592 fn transport_tcp_ignores_quic_files() {
2593 let mut p = pool();
2594 p.transport = Some(Transport::Tcp);
2595 p.quic_cert_file = Some(std::path::PathBuf::from("/tmp/c.pem"));
2599 p.apply_defaults();
2600 assert!(p.validate("p").is_ok());
2601 }
2602}