1use std::collections::BTreeMap;
10use std::fmt;
11use std::path::PathBuf;
12
13use serde::{Deserialize, Serialize};
14
15use super::endpoint::ConfListen;
16use super::enums::{ConsistencyLevel, DataStore, Distribution, HashType, SecureServerOption};
17use super::error::ConfError;
18use super::server::{ConfDynSeed, ConfServer};
19use super::tokens::TokenList;
20
21pub mod defaults {
24 pub const TIMEOUT_MS: i64 = 5_000;
26 pub const LISTEN_BACKLOG: i64 = 512;
28 pub const CLIENT_CONNECTIONS: i64 = 0;
30 pub const DATA_STORE: i64 = 0;
32 pub const PRECONNECT: bool = false;
39 pub const AUTO_EJECT_HOSTS: bool = true;
41 pub const SERVER_RETRY_TIMEOUT_MS: i64 = 10 * 1000;
43 pub const SERVER_FAILURE_LIMIT: i64 = 3;
45 pub const DYN_READ_TIMEOUT_MS: i64 = 10_000;
47 pub const DYN_WRITE_TIMEOUT_MS: i64 = 10_000;
49 pub const DYN_CONNECTIONS: i64 = 100;
51 pub const GOS_INTERVAL_MS: i64 = 30_000;
53 pub const ENABLE_HINTED_HANDOFF: bool = false;
56 pub const HINT_TTL_SECONDS: u64 = 86_400;
59 pub const HINT_STORE_MAX_BYTES: u64 = 64 * 1024 * 1024;
62 pub const HINT_DRAIN_INTERVAL_MS: u64 = 30_000;
65 pub const CONN_MSG_RATE: u32 = 50_000;
67 pub const STATS_INTERVAL_MS: i64 = 30 * 1000;
69 pub const STATS_PNAME: &str = "0.0.0.0:22222";
71 pub const DATASTORE_CONNECTIONS: u8 = 1;
73 pub const LOCAL_PEER_CONNECTIONS: u8 = 1;
75 pub const REMOTE_PEER_CONNECTIONS: u8 = 1;
77 pub const RACK: &str = "localrack";
79 pub const DC: &str = "localdc";
81 pub const SECURE_SERVER_OPTION: &str = "none";
83 pub const CONSISTENCY: &str = "DC_ONE";
85 pub const SEED_PROVIDER: &str = "simple_provider";
87 pub const ENV: &str = "aws";
89 pub const PEM_KEY_FILE: &str = "conf/dynomite.pem";
91 pub const RECON_KEY_FILE: &str = "conf/recon_key.pem";
93 pub const RECON_IV_FILE: &str = "conf/recon_iv.pem";
95 pub const RECON_INTERVAL_SECONDS: u64 = 300;
99 pub const MBUF_MIN_SIZE: i64 = 512;
101 pub const MBUF_MAX_SIZE: i64 = 512_000;
103 pub const ALLOC_MSGS_MIN: i64 = 100_000;
105 pub const ALLOC_MSGS_MAX: i64 = 1_000_000;
107}
108
109#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize)]
122#[serde(transparent)]
123pub struct Servers(pub(crate) Vec<ConfServer>);
124
125impl Servers {
126 pub fn from_vec(v: Vec<ConfServer>) -> Self {
137 Self(v)
138 }
139}
140
141impl Servers {
142 pub fn entries(&self) -> &[ConfServer] {
152 &self.0
153 }
154 pub fn len(&self) -> usize {
163 self.0.len()
164 }
165 pub fn is_empty(&self) -> bool {
174 self.0.is_empty()
175 }
176 pub fn datastore(&self) -> Option<&ConfServer> {
187 self.0.first()
188 }
189}
190
191#[derive(Debug, Clone, Default, Serialize, Deserialize)]
204#[serde(deny_unknown_fields, default)]
205pub struct ConfPool {
206 pub listen: Option<ConfListen>,
208 pub dyn_listen: Option<ConfListen>,
210 pub stats_listen: Option<ConfListen>,
212
213 pub hash: Option<HashType>,
215 pub hash_tag: Option<String>,
217
218 #[serde(default)]
224 pub distribution: Option<Distribution>,
225 #[serde(default)]
232 pub distribution_shadow: Option<Distribution>,
233 #[serde(default)]
235 pub server_connections: Option<i64>,
236
237 pub timeout: Option<i64>,
239 pub backlog: Option<i64>,
241 pub client_connections: Option<i64>,
243 #[serde(default, deserialize_with = "deserialize_data_store")]
249 pub data_store: Option<i64>,
250 #[serde(default)]
256 pub noxu_path: Option<PathBuf>,
257 pub preconnect: Option<bool>,
259 #[serde(default)]
266 pub redis_requirepass: Option<String>,
267 pub auto_eject_hosts: Option<bool>,
269 pub server_retry_timeout: Option<i64>,
271 pub server_failure_limit: Option<i64>,
273
274 pub servers: Option<Servers>,
276
277 pub dyn_read_timeout: Option<i64>,
279 pub dyn_write_timeout: Option<i64>,
281 pub dyn_seed_provider: Option<String>,
283 pub dyn_seeds: Option<Vec<ConfDynSeed>>,
285 pub dyn_port: Option<i64>,
287 pub dyn_connections: Option<i64>,
289 pub rack: Option<String>,
291 pub tokens: Option<TokenList>,
293 pub gos_interval: Option<i64>,
295 pub secure_server_option: Option<String>,
297 pub pem_key_file: Option<String>,
299 pub recon_key_file: Option<String>,
301 pub recon_iv_file: Option<String>,
303 #[serde(default)]
312 pub recon_interval_seconds: Option<u64>,
313 pub datacenter: Option<String>,
315 pub env: Option<String>,
317 pub conn_msg_rate: Option<u32>,
319 pub read_consistency: Option<String>,
321 pub write_consistency: Option<String>,
323 pub stats_interval: Option<i64>,
325 pub enable_gossip: Option<bool>,
327 #[serde(default)]
334 pub peer_tls_cert: Option<PathBuf>,
335 #[serde(default)]
338 pub peer_tls_key: Option<PathBuf>,
339 #[serde(default)]
347 pub peer_tls_ca: Option<PathBuf>,
348 #[serde(default)]
368 pub peer_tls_profiles: BTreeMap<String, ConfTlsProfile>,
369 pub mbuf_size: Option<i64>,
371 pub max_msgs: Option<i64>,
373 pub datastore_connections: Option<u8>,
375 pub local_peer_connections: Option<u8>,
377 pub remote_peer_connections: Option<u8>,
379 pub read_repairs_enabled: Option<bool>,
381 #[serde(default)]
393 pub enable_hinted_handoff: Option<bool>,
394 #[serde(default)]
399 pub hint_ttl_seconds: Option<u64>,
400 #[serde(default)]
408 pub hint_store_max_bytes: Option<u64>,
409 #[serde(default)]
413 pub hint_drain_interval_ms: Option<u64>,
414 pub log_format: Option<String>,
422
423 #[serde(default)]
429 pub observability: Option<ObservabilityConfig>,
430
431 #[serde(default)]
445 pub bucket_types: Vec<ConfBucketType>,
446
447 #[serde(default)]
453 pub default_bucket_type: Option<String>,
454
455 #[serde(default)]
468 pub riak: Option<ConfRiak>,
469}
470
471#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
497#[serde(deny_unknown_fields, default)]
498pub struct ConfRiak {
499 pub pbc_listen: Option<String>,
503 pub http_listen: Option<String>,
507 pub aae_enabled: Option<bool>,
510 pub aae_full_sweep_interval_seconds: Option<u64>,
514 pub aae_segment_interval_seconds: Option<u64>,
518 #[serde(default)]
525 pub tls_cert: Option<PathBuf>,
526 #[serde(default)]
529 pub tls_key: Option<PathBuf>,
530 #[serde(default)]
536 pub tls_ca: Option<PathBuf>,
537 #[serde(default)]
553 pub wasm_modules: Option<Vec<ConfRiakWasmModule>>,
554}
555
556#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
571#[serde(deny_unknown_fields)]
572pub struct ConfRiakWasmModule {
573 pub id: String,
576 pub path: PathBuf,
579}
580
581#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
605#[serde(deny_unknown_fields, default)]
606pub struct ConfTlsProfile {
607 pub cert: Option<PathBuf>,
609 pub key: Option<PathBuf>,
611 pub ca: Option<PathBuf>,
619}
620
621impl ConfTlsProfile {
622 pub fn validate(&self, dc: &str) -> Result<(), ConfError> {
648 match (self.cert.as_deref(), self.key.as_deref()) {
649 (Some(_), Some(_)) | (None, None) => {}
650 (Some(c), None) => {
651 return Err(ConfError::BadServer {
652 field: "peer_tls_profiles.cert",
653 value: c.display().to_string(),
654 reason: format!(
655 "peer_tls_profiles[{dc}].cert is set but .key is not; both must be set together"
656 ),
657 });
658 }
659 (None, Some(k)) => {
660 return Err(ConfError::BadServer {
661 field: "peer_tls_profiles.key",
662 value: k.display().to_string(),
663 reason: format!(
664 "peer_tls_profiles[{dc}].key is set but .cert is not; both must be set together"
665 ),
666 });
667 }
668 }
669 if self.ca.is_some() && self.cert.is_none() {
670 return Err(ConfError::BadServer {
671 field: "peer_tls_profiles.ca",
672 value: self
673 .ca
674 .as_ref()
675 .map_or_else(String::new, |p| p.display().to_string()),
676 reason: format!(
677 "peer_tls_profiles[{dc}].ca requires .cert and .key to also be set"
678 ),
679 });
680 }
681 Ok(())
682 }
683}
684
685impl ConfRiak {
686 pub fn validate(&self) -> Result<(), ConfError> {
705 if let Some(addr) = self.pbc_listen.as_deref() {
706 validate_riak_addr("pbc_listen", addr)?;
707 }
708 if let Some(addr) = self.http_listen.as_deref() {
709 validate_riak_addr("http_listen", addr)?;
710 }
711 if let Some(n) = self.aae_full_sweep_interval_seconds {
712 if n == 0 {
713 return Err(ConfError::BadServer {
714 field: "aae_full_sweep_interval_seconds",
715 value: n.to_string(),
716 reason: "must be > 0".into(),
717 });
718 }
719 }
720 if let Some(n) = self.aae_segment_interval_seconds {
721 if n == 0 {
722 return Err(ConfError::BadServer {
723 field: "aae_segment_interval_seconds",
724 value: n.to_string(),
725 reason: "must be > 0".into(),
726 });
727 }
728 }
729 if let (Some(seg), Some(full)) = (
730 self.aae_segment_interval_seconds,
731 self.aae_full_sweep_interval_seconds,
732 ) {
733 if seg > full {
734 return Err(ConfError::BadServer {
735 field: "aae_segment_interval_seconds",
736 value: seg.to_string(),
737 reason: format!("must be <= aae_full_sweep_interval_seconds ({full})"),
738 });
739 }
740 }
741 validate_tls_pair(
742 "tls_cert",
743 "tls_key",
744 self.tls_cert.as_deref(),
745 self.tls_key.as_deref(),
746 )?;
747 if self.tls_ca.is_some() && self.tls_cert.is_none() {
748 return Err(ConfError::BadServer {
749 field: "tls_ca",
750 value: self
751 .tls_ca
752 .as_ref()
753 .map_or_else(String::new, |p| p.display().to_string()),
754 reason: "requires tls_cert and tls_key to also be set".into(),
755 });
756 }
757 if let Some(modules) = self.wasm_modules.as_deref() {
758 let mut seen: std::collections::BTreeSet<&str> = std::collections::BTreeSet::new();
759 for m in modules {
760 if m.id.is_empty() {
761 return Err(ConfError::BadServer {
762 field: "wasm_modules.id",
763 value: String::new(),
764 reason: "wasm module id must not be empty".into(),
765 });
766 }
767 if !seen.insert(m.id.as_str()) {
768 return Err(ConfError::BadServer {
769 field: "wasm_modules.id",
770 value: m.id.clone(),
771 reason: "wasm module ids must be unique".into(),
772 });
773 }
774 if !m.path.is_file() {
775 return Err(ConfError::BadServer {
776 field: "wasm_modules.path",
777 value: m.path.display().to_string(),
778 reason: format!("wasm module file not found for id '{}'", m.id),
779 });
780 }
781 }
782 }
783 Ok(())
784 }
785}
786
787fn validate_tls_pair(
791 cert_field: &'static str,
792 key_field: &'static str,
793 cert: Option<&std::path::Path>,
794 key: Option<&std::path::Path>,
795) -> Result<(), ConfError> {
796 match (cert, key) {
797 (Some(_), Some(_)) | (None, None) => Ok(()),
798 (Some(c), None) => Err(ConfError::BadServer {
799 field: cert_field,
800 value: c.display().to_string(),
801 reason: format!(
802 "{cert_field} is set but {key_field} is not; both must be set together"
803 ),
804 }),
805 (None, Some(k)) => Err(ConfError::BadServer {
806 field: key_field,
807 value: k.display().to_string(),
808 reason: format!(
809 "{key_field} is set but {cert_field} is not; both must be set together"
810 ),
811 }),
812 }
813}
814
815fn validate_riak_addr(field: &'static str, value: &str) -> Result<(), ConfError> {
816 use std::net::ToSocketAddrs;
817 if value.is_empty() {
818 return Err(ConfError::BadServer {
819 field,
820 value: value.to_string(),
821 reason: "riak listen address must not be empty".into(),
822 });
823 }
824 if value.parse::<std::net::SocketAddr>().is_ok() {
828 return Ok(());
829 }
830 match value.to_socket_addrs() {
831 Ok(mut iter) => {
832 if iter.next().is_some() {
833 Ok(())
834 } else {
835 Err(ConfError::BadServer {
836 field,
837 value: value.to_string(),
838 reason: "resolved to no addresses".into(),
839 })
840 }
841 }
842 Err(e) => Err(ConfError::BadServer {
843 field,
844 value: value.to_string(),
845 reason: format!("could not resolve: {e}"),
846 }),
847 }
848}
849
850#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
882#[serde(deny_unknown_fields)]
883pub struct ConfBucketType {
884 pub name: String,
889 pub read_consistency: String,
895 pub write_consistency: String,
898 #[serde(default)]
903 pub n_val: u8,
904}
905
906impl ConfBucketType {
907 pub fn read_level(&self) -> Result<ConsistencyLevel, ConfError> {
922 ConsistencyLevel::parse("read_consistency", &self.read_consistency)
923 }
924
925 pub fn write_level(&self) -> Result<ConsistencyLevel, ConfError> {
940 ConsistencyLevel::parse("write_consistency", &self.write_consistency)
941 }
942}
943
944#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
963#[serde(deny_unknown_fields, default)]
964pub struct ObservabilityConfig {
965 pub otlp_traces_endpoint: Option<String>,
970 pub otlp_logs_endpoint: Option<String>,
976 pub service_name: Option<String>,
979 pub traces_sampling: Option<f64>,
982}
983
984impl ConfPool {
985 #[must_use]
1005 pub fn resolved_distribution(&self) -> Distribution {
1006 match self.distribution {
1007 None | Some(Distribution::Vnode) => Distribution::Vnode,
1008 Some(Distribution::RandomSlicing) => Distribution::RandomSlicing,
1009 Some(other) => {
1010 tracing::warn!(
1011 target: "dynomite::conf",
1012 distribution = other.as_str(),
1013 "distribution mode '{}' is a legacy alias and resolves to 'vnode'; \
1014 update the YAML to either 'vnode' or 'random_slicing'",
1015 other
1016 );
1017 Distribution::Vnode
1018 }
1019 }
1020 }
1021}
1022
1023impl ConfPool {
1024 pub fn apply_defaults(&mut self) {
1036 if self.dyn_seed_provider.is_none() {
1037 self.dyn_seed_provider = Some(defaults::SEED_PROVIDER.to_string());
1038 }
1039 if self.hash.is_none() {
1040 self.hash = Some(HashType::Murmur);
1041 }
1042 if self.timeout.is_none() {
1043 self.timeout = Some(defaults::TIMEOUT_MS);
1044 }
1045 if self.backlog.is_none() {
1046 self.backlog = Some(defaults::LISTEN_BACKLOG);
1047 }
1048 self.client_connections = Some(defaults::CLIENT_CONNECTIONS);
1051 if self.data_store.is_none() {
1052 self.data_store = Some(defaults::DATA_STORE);
1053 }
1054 if self.preconnect.is_none() {
1055 self.preconnect = Some(defaults::PRECONNECT);
1056 }
1057 if self.auto_eject_hosts.is_none() {
1058 self.auto_eject_hosts = Some(defaults::AUTO_EJECT_HOSTS);
1059 }
1060 if self.server_retry_timeout.is_none() {
1061 self.server_retry_timeout = Some(defaults::SERVER_RETRY_TIMEOUT_MS);
1062 }
1063 if self.server_failure_limit.is_none() {
1064 self.server_failure_limit = Some(defaults::SERVER_FAILURE_LIMIT);
1065 }
1066 if self.dyn_read_timeout.is_none() {
1067 self.dyn_read_timeout = Some(defaults::DYN_READ_TIMEOUT_MS);
1068 }
1069 if self.dyn_write_timeout.is_none() {
1070 self.dyn_write_timeout = Some(defaults::DYN_WRITE_TIMEOUT_MS);
1071 }
1072 if self.dyn_connections.is_none() {
1073 self.dyn_connections = Some(defaults::DYN_CONNECTIONS);
1074 }
1075 if self.gos_interval.is_none() {
1076 self.gos_interval = Some(defaults::GOS_INTERVAL_MS);
1077 }
1078 if self.conn_msg_rate.is_none() {
1079 self.conn_msg_rate = Some(defaults::CONN_MSG_RATE);
1080 }
1081 if self.rack.is_none() {
1082 self.rack = Some(defaults::RACK.to_string());
1083 }
1084 if self.datacenter.is_none() {
1085 self.datacenter = Some(defaults::DC.to_string());
1086 }
1087 if self.secure_server_option.is_none() {
1088 self.secure_server_option = Some(defaults::SECURE_SERVER_OPTION.to_string());
1089 }
1090 if self.read_consistency.is_none() {
1091 self.read_consistency = Some(defaults::CONSISTENCY.to_string());
1092 }
1093 if self.write_consistency.is_none() {
1094 self.write_consistency = Some(defaults::CONSISTENCY.to_string());
1095 }
1096 if self.stats_interval.is_none() {
1097 self.stats_interval = Some(defaults::STATS_INTERVAL_MS);
1098 }
1099 if self.stats_listen.is_none() {
1100 self.stats_listen = Some(
1102 ConfListen::parse("stats_listen", defaults::STATS_PNAME)
1103 .expect("invariant: STATS_PNAME constant is valid"),
1104 );
1105 }
1106 if self.env.is_none() {
1107 self.env = Some(defaults::ENV.to_string());
1108 }
1109 if self.pem_key_file.is_none() {
1110 self.pem_key_file = Some(defaults::PEM_KEY_FILE.to_string());
1111 }
1112 if self.recon_key_file.is_none() {
1113 self.recon_key_file = Some(defaults::RECON_KEY_FILE.to_string());
1114 }
1115 if self.recon_iv_file.is_none() {
1116 self.recon_iv_file = Some(defaults::RECON_IV_FILE.to_string());
1117 }
1118 if self.recon_interval_seconds.is_none() {
1119 self.recon_interval_seconds = Some(defaults::RECON_INTERVAL_SECONDS);
1120 }
1121 if self.datastore_connections.is_none() {
1122 self.datastore_connections = Some(defaults::DATASTORE_CONNECTIONS);
1123 }
1124 if self.local_peer_connections.is_none() {
1125 self.local_peer_connections = Some(defaults::LOCAL_PEER_CONNECTIONS);
1126 }
1127 if self.remote_peer_connections.is_none() {
1128 self.remote_peer_connections = Some(defaults::REMOTE_PEER_CONNECTIONS);
1129 }
1130 if self.read_repairs_enabled.is_none() {
1131 self.read_repairs_enabled = Some(false);
1132 }
1133 if self.enable_gossip.is_none() {
1134 self.enable_gossip = Some(false);
1135 }
1136 self.apply_hinted_handoff_defaults();
1137 }
1138
1139 fn apply_hinted_handoff_defaults(&mut self) {
1144 if self.enable_hinted_handoff.is_none() {
1145 self.enable_hinted_handoff = Some(defaults::ENABLE_HINTED_HANDOFF);
1146 }
1147 if self.hint_ttl_seconds.is_none() {
1148 self.hint_ttl_seconds = Some(defaults::HINT_TTL_SECONDS);
1149 }
1150 if self.hint_store_max_bytes.is_none() {
1151 self.hint_store_max_bytes = Some(defaults::HINT_STORE_MAX_BYTES);
1152 }
1153 if self.hint_drain_interval_ms.is_none() {
1154 self.hint_drain_interval_ms = Some(defaults::HINT_DRAIN_INTERVAL_MS);
1155 }
1156 }
1157
1158 pub fn validate(&self, pool_name: &str) -> Result<(), ConfError> {
1175 if pool_name.is_empty() {
1176 return Err(ConfError::EmptyPoolName);
1177 }
1178
1179 if self.listen.is_none() {
1180 return Err(ConfError::MissingRequired("listen"));
1181 }
1182
1183 self.validate_numeric_ranges()?;
1184 self.validate_mbuf_size()?;
1185 self.validate_max_msgs()?;
1186
1187 if let Some(n) = self.data_store {
1188 let ds = DataStore::from_int(n)?;
1189 if ds == DataStore::Noxu {
1190 self.validate_noxu()?;
1191 }
1192 }
1193 if let Some(tag) = &self.hash_tag {
1194 if tag.chars().count() != 2 {
1195 return Err(ConfError::BadHashTag(tag.clone()));
1196 }
1197 }
1198
1199 let secure = if let Some(s) = &self.secure_server_option {
1200 SecureServerOption::parse(s)?
1201 } else {
1202 SecureServerOption::None
1203 };
1204 if let Some(s) = &self.read_consistency {
1205 ConsistencyLevel::parse("read_consistency", s)?;
1206 }
1207 if let Some(s) = &self.write_consistency {
1208 ConsistencyLevel::parse("write_consistency", s)?;
1209 }
1210 if secure != SecureServerOption::None {
1211 match &self.pem_key_file {
1212 Some(s) if !s.is_empty() => {}
1213 _ => return Err(ConfError::MissingRequired("pem_key_file")),
1214 }
1215 }
1216
1217 if let Some(s) = &self.log_format {
1218 crate::core::log::LogFormat::parse(s).map_err(|e| ConfError::BadServer {
1219 field: "log_format",
1220 value: s.clone(),
1221 reason: e.to_string(),
1222 })?;
1223 }
1224
1225 self.validate_bucket_types()?;
1226 self.validate_hinted_handoff()?;
1227 self.validate_peer_tls()?;
1228 if let Some(r) = &self.riak {
1229 r.validate()?;
1230 }
1231
1232 match &self.servers {
1233 None => return Err(ConfError::MissingRequired("servers")),
1234 Some(s) if s.is_empty() => return Err(ConfError::MissingRequired("servers")),
1235 Some(s) if s.len() > 1 => {
1236 return Err(ConfError::BadServer {
1237 field: "servers",
1238 value: s.len().to_string(),
1239 reason: "expected exactly one datastore entry".to_string(),
1240 });
1241 }
1242 Some(_) => {}
1243 }
1244
1245 Ok(())
1246 }
1247
1248 fn validate_numeric_ranges(&self) -> Result<(), ConfError> {
1249 check_positive("timeout", self.timeout)?;
1250 check_positive("backlog", self.backlog)?;
1251 check_non_negative("client_connections", self.client_connections)?;
1252 check_positive("server_retry_timeout", self.server_retry_timeout)?;
1253 check_positive("server_failure_limit", self.server_failure_limit)?;
1254 check_positive("dyn_read_timeout", self.dyn_read_timeout)?;
1255 check_positive("dyn_write_timeout", self.dyn_write_timeout)?;
1256 check_positive("gos_interval", self.gos_interval)?;
1257 check_positive("stats_interval", self.stats_interval)?;
1258
1259 if let Some(n) = self.dyn_connections {
1260 if n <= 0 {
1261 return Err(ConfError::OutOfRange {
1262 field: "dyn_connections",
1263 value: n,
1264 reason: "must be a positive non-zero number",
1265 });
1266 }
1267 }
1268 Ok(())
1269 }
1270
1271 fn validate_mbuf_size(&self) -> Result<(), ConfError> {
1272 let Some(n) = self.mbuf_size else {
1273 return Ok(());
1274 };
1275 if n <= 0 {
1276 return Err(ConfError::OutOfRange {
1277 field: "mbuf_size",
1278 value: n,
1279 reason: "must be a positive number",
1280 });
1281 }
1282 if !(defaults::MBUF_MIN_SIZE..=defaults::MBUF_MAX_SIZE).contains(&n) {
1283 return Err(ConfError::OutOfRange {
1284 field: "mbuf_size",
1285 value: n,
1286 reason: "must be between 512 and 512000 bytes",
1287 });
1288 }
1289 if n % 16 != 0 {
1290 return Err(ConfError::OutOfRange {
1291 field: "mbuf_size",
1292 value: n,
1293 reason: "must be a multiple of 16",
1294 });
1295 }
1296 Ok(())
1297 }
1298
1299 fn validate_max_msgs(&self) -> Result<(), ConfError> {
1300 let Some(n) = self.max_msgs else {
1301 return Ok(());
1302 };
1303 if n <= 0 {
1304 return Err(ConfError::OutOfRange {
1305 field: "max_msgs",
1306 value: n,
1307 reason: "requires a non-zero number",
1308 });
1309 }
1310 if !(defaults::ALLOC_MSGS_MIN..=defaults::ALLOC_MSGS_MAX).contains(&n) {
1311 return Err(ConfError::OutOfRange {
1312 field: "max_msgs",
1313 value: n,
1314 reason: "must be between 100000 and 1000000 messages",
1315 });
1316 }
1317 Ok(())
1318 }
1319
1320 fn validate_bucket_types(&self) -> Result<(), ConfError> {
1321 use std::collections::BTreeSet;
1322 let mut seen: BTreeSet<&str> = BTreeSet::new();
1323 for bt in &self.bucket_types {
1324 if bt.name.is_empty() {
1325 return Err(ConfError::BadServer {
1326 field: "bucket_types",
1327 value: String::new(),
1328 reason: "bucket-type name must not be empty".to_string(),
1329 });
1330 }
1331 if !seen.insert(bt.name.as_str()) {
1332 return Err(ConfError::BadServer {
1333 field: "bucket_types",
1334 value: bt.name.clone(),
1335 reason: "duplicate bucket-type name".to_string(),
1336 });
1337 }
1338 ConsistencyLevel::parse("read_consistency", &bt.read_consistency)?;
1339 ConsistencyLevel::parse("write_consistency", &bt.write_consistency)?;
1340 }
1341 if let Some(name) = &self.default_bucket_type {
1342 if !self.bucket_types.iter().any(|bt| &bt.name == name) {
1343 return Err(ConfError::BadServer {
1344 field: "default_bucket_type",
1345 value: name.clone(),
1346 reason: "references an undefined bucket-type name".to_string(),
1347 });
1348 }
1349 }
1350 Ok(())
1351 }
1352
1353 fn validate_noxu(&self) -> Result<(), ConfError> {
1370 if !crate::conf::is_noxu_supported() {
1371 return Err(ConfError::BadNoxuConfig(
1372 "noxu data_store requires dynomited built with --features riak",
1373 ));
1374 }
1375 match self.noxu_path.as_deref() {
1376 Some(p) if !p.as_os_str().is_empty() => Ok(()),
1377 _ => Err(ConfError::BadNoxuConfig(
1378 "data_store: noxu requires a non-empty 'noxu_path:' directive",
1379 )),
1380 }
1381 }
1382
1383 fn validate_hinted_handoff(&self) -> Result<(), ConfError> {
1384 if self.enable_hinted_handoff != Some(true) {
1385 return Ok(());
1386 }
1387 if let Some(ttl) = self.hint_ttl_seconds {
1388 if ttl == 0 {
1389 return Err(ConfError::BadServer {
1390 field: "hint_ttl_seconds",
1391 value: ttl.to_string(),
1392 reason: "must be a positive number when enable_hinted_handoff is true"
1393 .to_string(),
1394 });
1395 }
1396 }
1397 if let Some(cap) = self.hint_store_max_bytes {
1398 if cap == 0 {
1399 return Err(ConfError::BadServer {
1400 field: "hint_store_max_bytes",
1401 value: cap.to_string(),
1402 reason: "must be a positive number when enable_hinted_handoff is true"
1403 .to_string(),
1404 });
1405 }
1406 }
1407 if let Some(period) = self.hint_drain_interval_ms {
1408 if period == 0 {
1409 return Err(ConfError::BadServer {
1410 field: "hint_drain_interval_ms",
1411 value: period.to_string(),
1412 reason: "must be a positive number when enable_hinted_handoff is true"
1413 .to_string(),
1414 });
1415 }
1416 }
1417 Ok(())
1418 }
1419
1420 fn validate_peer_tls(&self) -> Result<(), ConfError> {
1430 validate_tls_pair(
1431 "peer_tls_cert",
1432 "peer_tls_key",
1433 self.peer_tls_cert.as_deref(),
1434 self.peer_tls_key.as_deref(),
1435 )?;
1436 if self.peer_tls_ca.is_some() && self.peer_tls_cert.is_none() {
1437 return Err(ConfError::BadServer {
1438 field: "peer_tls_ca",
1439 value: self
1440 .peer_tls_ca
1441 .as_ref()
1442 .map_or_else(String::new, |p| p.display().to_string()),
1443 reason: "requires peer_tls_cert and peer_tls_key to also be set".into(),
1444 });
1445 }
1446 for (dc, profile) in &self.peer_tls_profiles {
1447 if dc.is_empty() {
1448 return Err(ConfError::BadServer {
1449 field: "peer_tls_profiles",
1450 value: String::new(),
1451 reason: "per-DC TLS profile name must not be empty".into(),
1452 });
1453 }
1454 profile.validate(dc)?;
1455 }
1456 Ok(())
1457 }
1458}
1459
1460impl fmt::Display for ConfPool {
1461 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1462 match serde_yaml::to_string(self) {
1466 Ok(s) => f.write_str(&s),
1467 Err(_) => Err(fmt::Error),
1468 }
1469 }
1470}
1471
1472fn check_positive(field: &'static str, v: Option<i64>) -> Result<(), ConfError> {
1473 if let Some(n) = v {
1474 if n <= 0 {
1475 return Err(ConfError::OutOfRange {
1476 field,
1477 value: n,
1478 reason: "must be a positive number",
1479 });
1480 }
1481 }
1482 Ok(())
1483}
1484
1485fn check_non_negative(field: &'static str, v: Option<i64>) -> Result<(), ConfError> {
1486 if let Some(n) = v {
1487 if n < 0 {
1488 return Err(ConfError::OutOfRange {
1489 field,
1490 value: n,
1491 reason: "must be a non-negative number",
1492 });
1493 }
1494 }
1495 Ok(())
1496}
1497
1498fn deserialize_data_store<'de, D>(de: D) -> Result<Option<i64>, D::Error>
1503where
1504 D: serde::Deserializer<'de>,
1505{
1506 use serde::de::{self, Visitor};
1507 use std::fmt;
1508
1509 struct V;
1510 impl<'de> Visitor<'de> for V {
1511 type Value = Option<i64>;
1512
1513 fn expecting(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1514 f.write_str("a data_store value: integer (0, 1, 2) or string (redis, memcache, noxu)")
1515 }
1516
1517 fn visit_none<E: de::Error>(self) -> Result<Self::Value, E> {
1518 Ok(None)
1519 }
1520
1521 fn visit_unit<E: de::Error>(self) -> Result<Self::Value, E> {
1522 Ok(None)
1523 }
1524
1525 fn visit_some<D2: serde::Deserializer<'de>>(
1526 self,
1527 de: D2,
1528 ) -> Result<Self::Value, D2::Error> {
1529 de.deserialize_any(V)
1530 }
1531
1532 fn visit_i64<E: de::Error>(self, v: i64) -> Result<Self::Value, E> {
1533 DataStore::from_int(v)
1534 .map(|d| Some(d.as_int()))
1535 .map_err(|e| E::custom(e.to_string()))
1536 }
1537
1538 fn visit_u64<E: de::Error>(self, v: u64) -> Result<Self::Value, E> {
1539 let n = i64::try_from(v).map_err(|_| E::custom("data_store integer overflow"))?;
1540 self.visit_i64(n)
1541 }
1542
1543 fn visit_str<E: de::Error>(self, v: &str) -> Result<Self::Value, E> {
1544 DataStore::from_name(v)
1545 .map(|d| Some(d.as_int()))
1546 .map_err(|_| {
1547 E::custom(format!(
1548 "data_store: unknown name '{v}'; expected one of: redis, memcache, noxu"
1549 ))
1550 })
1551 }
1552
1553 fn visit_string<E: de::Error>(self, v: String) -> Result<Self::Value, E> {
1554 self.visit_str(&v)
1555 }
1556 }
1557
1558 de.deserialize_any(V)
1559}
1560
1561#[cfg(test)]
1562mod tests {
1563 use super::*;
1564
1565 fn pool() -> ConfPool {
1566 ConfPool {
1567 listen: Some(ConfListen::parse("listen", "127.0.0.1:8102").unwrap()),
1568 servers: Some(Servers::from_vec(vec![ConfServer::parse(
1569 "127.0.0.1:6379:1",
1570 )
1571 .unwrap()])),
1572 tokens: Some(TokenList::parse("0").unwrap()),
1573 ..ConfPool::default()
1574 }
1575 }
1576
1577 #[test]
1578 fn validate_minimal_post_finalize() {
1579 let mut p = pool();
1580 p.apply_defaults();
1581 p.validate("dyn_o_mite").unwrap();
1582 }
1583
1584 #[test]
1585 fn missing_listen_rejected() {
1586 let mut p = pool();
1587 p.listen = None;
1588 p.apply_defaults();
1589 assert!(matches!(
1590 p.validate("p"),
1591 Err(ConfError::MissingRequired("listen"))
1592 ));
1593 }
1594
1595 #[test]
1596 fn out_of_range_mbuf_rejected() {
1597 let mut p = pool();
1598 p.mbuf_size = Some(127);
1599 p.apply_defaults();
1600 assert!(matches!(p.validate("p"), Err(ConfError::OutOfRange { .. })));
1601 }
1602
1603 #[test]
1604 fn distribution_field_round_trips_through_yaml() {
1605 let yaml = r"
1606p:
1607 listen: 127.0.0.1:8102
1608 dyn_listen: 127.0.0.1:8101
1609 tokens: '0'
1610 servers:
1611 - 127.0.0.1:6379:1
1612 data_store: 0
1613 distribution: random_slicing
1614 distribution_shadow: vnode
1615 hash: murmur3_x64_64
1616";
1617 let parsed: std::collections::BTreeMap<String, ConfPool> =
1618 serde_yaml::from_str(yaml).unwrap();
1619 let pool = parsed.get("p").unwrap();
1620 assert_eq!(pool.distribution, Some(Distribution::RandomSlicing));
1621 assert_eq!(pool.distribution_shadow, Some(Distribution::Vnode));
1622 assert_eq!(pool.hash, Some(HashType::Murmur3X64_64));
1623 assert_eq!(pool.resolved_distribution(), Distribution::RandomSlicing);
1624 }
1625
1626 #[test]
1627 fn distribution_legacy_alias_resolves_to_vnode() {
1628 let mut p = pool();
1629 p.distribution = Some(Distribution::Ketama);
1630 assert_eq!(p.resolved_distribution(), Distribution::Vnode);
1631 p.distribution = Some(Distribution::Modula);
1632 assert_eq!(p.resolved_distribution(), Distribution::Vnode);
1633 p.distribution = Some(Distribution::Random);
1634 assert_eq!(p.resolved_distribution(), Distribution::Vnode);
1635 }
1636
1637 #[test]
1638 fn distribution_default_unset_is_vnode() {
1639 let p = pool();
1640 assert!(p.distribution.is_none());
1641 assert_eq!(p.resolved_distribution(), Distribution::Vnode);
1642 }
1643
1644 #[test]
1645 fn mbuf_size_not_multiple_of_16_rejected() {
1646 let mut p = pool();
1647 p.mbuf_size = Some(513);
1648 p.apply_defaults();
1649 assert!(matches!(p.validate("p"), Err(ConfError::OutOfRange { .. })));
1650 }
1651
1652 #[test]
1653 fn pem_required_when_secure() {
1654 let mut p = pool();
1655 p.secure_server_option = Some("datacenter".to_string());
1656 p.pem_key_file = Some(String::new());
1657 p.apply_defaults();
1658 assert!(matches!(
1661 p.validate("p"),
1662 Err(ConfError::MissingRequired("pem_key_file"))
1663 ));
1664 }
1665
1666 #[test]
1667 fn data_store_out_of_range_rejected() {
1668 let mut p = pool();
1669 p.data_store = Some(7);
1670 p.apply_defaults();
1671 assert!(matches!(p.validate("p"), Err(ConfError::BadDataStore(7))));
1672 }
1673
1674 static NOXU_FLAG_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
1679
1680 #[test]
1681 fn data_store_noxu_requires_riak_feature() {
1682 let _g = NOXU_FLAG_LOCK
1683 .lock()
1684 .unwrap_or_else(std::sync::PoisonError::into_inner);
1685 let prev = crate::conf::is_noxu_supported();
1688 crate::conf::set_noxu_supported(false);
1689 let mut p = pool();
1690 p.data_store = Some(2);
1691 p.noxu_path = Some("/tmp/test".into());
1692 p.apply_defaults();
1693 let err = p.validate("p");
1694 crate::conf::set_noxu_supported(prev);
1695 match err {
1696 Err(ConfError::BadNoxuConfig(msg)) => {
1697 assert!(msg.contains("--features riak"), "unexpected message: {msg}");
1698 }
1699 other => panic!("expected BadNoxuConfig, got {other:?}"),
1700 }
1701 }
1702
1703 #[test]
1704 fn data_store_noxu_requires_path() {
1705 let _g = NOXU_FLAG_LOCK
1706 .lock()
1707 .unwrap_or_else(std::sync::PoisonError::into_inner);
1708 let prev = crate::conf::is_noxu_supported();
1709 crate::conf::set_noxu_supported(true);
1710 let mut p = pool();
1711 p.data_store = Some(2);
1712 p.noxu_path = None;
1713 p.apply_defaults();
1714 let err = p.validate("p");
1715 crate::conf::set_noxu_supported(prev);
1716 match err {
1717 Err(ConfError::BadNoxuConfig(msg)) => {
1718 assert!(msg.contains("noxu_path"), "unexpected message: {msg}");
1719 }
1720 other => panic!("expected BadNoxuConfig, got {other:?}"),
1721 }
1722 }
1723
1724 #[test]
1725 fn data_store_noxu_yaml_round_trip_string_form() {
1726 let yaml = r"
1729listen: 127.0.0.1:8102
1730servers:
1731- 127.0.0.1:6379:1
1732tokens: '0'
1733data_store: noxu
1734noxu_path: /tmp/test
1735";
1736 let p: ConfPool = serde_yaml::from_str(yaml).unwrap();
1737 assert_eq!(p.data_store, Some(2));
1738 assert_eq!(
1739 p.noxu_path.as_deref(),
1740 Some(std::path::Path::new("/tmp/test"))
1741 );
1742 let dumped = serde_yaml::to_string(&p).unwrap();
1744 let p2: ConfPool = serde_yaml::from_str(&dumped).unwrap();
1745 assert_eq!(p2.data_store, p.data_store);
1746 assert_eq!(p2.noxu_path, p.noxu_path);
1747 }
1748
1749 #[test]
1750 fn data_store_yaml_int_form_still_works() {
1751 let yaml = r"
1752listen: 127.0.0.1:8102
1753servers:
1754- 127.0.0.1:6379:1
1755tokens: '0'
1756data_store: 2
1757noxu_path: /tmp/test
1758";
1759 let p: ConfPool = serde_yaml::from_str(yaml).unwrap();
1760 assert_eq!(p.data_store, Some(2));
1761 }
1762
1763 #[test]
1764 fn data_store_string_form_unknown_rejected() {
1765 let yaml = r"
1766listen: 127.0.0.1:8102
1767servers:
1768- 127.0.0.1:6379:1
1769tokens: '0'
1770data_store: postgres
1771";
1772 let err = serde_yaml::from_str::<ConfPool>(yaml).unwrap_err();
1773 let msg = err.to_string();
1774 assert!(
1775 msg.contains("unknown name") || msg.contains("data_store"),
1776 "unexpected message: {msg}"
1777 );
1778 }
1779
1780 #[test]
1781 fn hash_tag_must_be_two_chars() {
1782 let mut p = pool();
1783 p.hash_tag = Some("abc".to_string());
1784 p.apply_defaults();
1785 assert!(matches!(p.validate("p"), Err(ConfError::BadHashTag(_))));
1786 }
1787
1788 #[test]
1789 fn empty_servers_rejected() {
1790 let mut p = pool();
1791 p.servers = Some(Servers::from_vec(vec![]));
1792 p.apply_defaults();
1793 assert!(matches!(
1794 p.validate("p"),
1795 Err(ConfError::MissingRequired("servers"))
1796 ));
1797 }
1798
1799 #[test]
1800 fn log_format_known_values_accepted() {
1801 for value in ["default", "rfc5424", "rfc3164", "json", "ndjson", "DEFAULT"] {
1802 let mut p = pool();
1803 p.log_format = Some(value.to_string());
1804 p.apply_defaults();
1805 assert!(p.validate("p").is_ok(), "value {value:?} should validate");
1806 }
1807 }
1808
1809 #[test]
1810 fn log_format_unknown_rejected() {
1811 let mut p = pool();
1812 p.log_format = Some("yaml".to_string());
1813 p.apply_defaults();
1814 let err = p.validate("p").unwrap_err();
1815 assert!(
1816 matches!(
1817 err,
1818 ConfError::BadServer {
1819 field: "log_format",
1820 ..
1821 }
1822 ),
1823 "unexpected error: {err:?}"
1824 );
1825 }
1826
1827 #[test]
1828 fn observability_block_round_trips() {
1829 let yaml = r"
1830observability:
1831 otlp_logs_endpoint: http://collector:4317
1832 service_name: dynomited
1833listen: 127.0.0.1:8102
1834servers:
1835- 127.0.0.1:6379:1
1836tokens: '0'
1837";
1838 let p: ConfPool = serde_yaml::from_str(yaml).unwrap();
1839 let obs = p.observability.as_ref().expect("observability set");
1840 assert_eq!(
1841 obs.otlp_logs_endpoint.as_deref(),
1842 Some("http://collector:4317")
1843 );
1844 assert_eq!(obs.service_name.as_deref(), Some("dynomited"));
1845 }
1846
1847 #[test]
1848 fn bucket_types_round_trip() {
1849 let yaml = r"
1850listen: 127.0.0.1:8102
1851servers:
1852- 127.0.0.1:6379:1
1853tokens: '0'
1854bucket_types:
1855- name: hot
1856 read_consistency: DC_QUORUM
1857 write_consistency: DC_EACH_SAFE_QUORUM
1858 n_val: 3
1859- name: cold
1860 read_consistency: DC_ONE
1861 write_consistency: DC_ONE
1862 n_val: 1
1863default_bucket_type: cold
1864";
1865 let p: ConfPool = serde_yaml::from_str(yaml).unwrap();
1866 assert_eq!(p.bucket_types.len(), 2);
1867 assert_eq!(p.bucket_types[0].name, "hot");
1868 assert_eq!(p.bucket_types[0].n_val, 3);
1869 assert_eq!(
1870 p.bucket_types[0].read_level().unwrap(),
1871 crate::conf::ConsistencyLevel::DcQuorum,
1872 );
1873 assert_eq!(p.default_bucket_type.as_deref(), Some("cold"));
1874 let dumped = serde_yaml::to_string(&p).unwrap();
1876 let p2: ConfPool = serde_yaml::from_str(&dumped).unwrap();
1877 assert_eq!(p2.bucket_types, p.bucket_types);
1878 assert_eq!(p2.default_bucket_type, p.default_bucket_type);
1879 }
1880
1881 #[test]
1882 fn bucket_types_default_is_empty() {
1883 let mut p = pool();
1884 p.apply_defaults();
1885 assert!(p.bucket_types.is_empty());
1886 assert!(p.default_bucket_type.is_none());
1887 assert!(p.validate("p").is_ok());
1888 }
1889
1890 #[test]
1891 fn duplicate_bucket_type_name_rejected() {
1892 let mut p = pool();
1893 p.bucket_types = vec![
1894 ConfBucketType {
1895 name: "a".into(),
1896 read_consistency: "DC_ONE".into(),
1897 write_consistency: "DC_ONE".into(),
1898 n_val: 0,
1899 },
1900 ConfBucketType {
1901 name: "a".into(),
1902 read_consistency: "DC_ONE".into(),
1903 write_consistency: "DC_ONE".into(),
1904 n_val: 0,
1905 },
1906 ];
1907 p.apply_defaults();
1908 let err = p.validate("p").unwrap_err();
1909 assert!(
1910 matches!(
1911 err,
1912 ConfError::BadServer {
1913 field: "bucket_types",
1914 ..
1915 }
1916 ),
1917 "unexpected error: {err:?}",
1918 );
1919 }
1920
1921 #[test]
1922 fn bucket_type_unknown_consistency_rejected() {
1923 let mut p = pool();
1924 p.bucket_types = vec![ConfBucketType {
1925 name: "a".into(),
1926 read_consistency: "DC_PURPLE".into(),
1927 write_consistency: "DC_ONE".into(),
1928 n_val: 0,
1929 }];
1930 p.apply_defaults();
1931 let err = p.validate("p").unwrap_err();
1932 assert!(matches!(err, ConfError::BadConsistency { .. }));
1933 }
1934
1935 #[test]
1936 fn unknown_default_bucket_type_rejected() {
1937 let mut p = pool();
1938 p.default_bucket_type = Some("missing".into());
1939 p.apply_defaults();
1940 let err = p.validate("p").unwrap_err();
1941 assert!(matches!(
1942 err,
1943 ConfError::BadServer {
1944 field: "default_bucket_type",
1945 ..
1946 }
1947 ));
1948 }
1949
1950 #[test]
1951 fn hinted_handoff_default_off_with_canonical_constants() {
1952 let mut p = pool();
1953 p.apply_defaults();
1954 assert_eq!(p.enable_hinted_handoff, Some(false));
1955 assert_eq!(p.hint_ttl_seconds, Some(defaults::HINT_TTL_SECONDS));
1956 assert_eq!(p.hint_store_max_bytes, Some(defaults::HINT_STORE_MAX_BYTES));
1957 assert_eq!(
1958 p.hint_drain_interval_ms,
1959 Some(defaults::HINT_DRAIN_INTERVAL_MS)
1960 );
1961 assert!(p.validate("p").is_ok());
1962 }
1963
1964 #[test]
1965 fn hinted_handoff_yaml_round_trip() {
1966 let yaml = r"
1967listen: 127.0.0.1:8102
1968servers:
1969- 127.0.0.1:6379:1
1970tokens: '0'
1971enable_hinted_handoff: true
1972hint_ttl_seconds: 7200
1973hint_store_max_bytes: 8388608
1974hint_drain_interval_ms: 5000
1975";
1976 let p: ConfPool = serde_yaml::from_str(yaml).unwrap();
1977 assert_eq!(p.enable_hinted_handoff, Some(true));
1978 assert_eq!(p.hint_ttl_seconds, Some(7200));
1979 assert_eq!(p.hint_store_max_bytes, Some(8_388_608));
1980 assert_eq!(p.hint_drain_interval_ms, Some(5_000));
1981 let dumped = serde_yaml::to_string(&p).unwrap();
1982 let p2: ConfPool = serde_yaml::from_str(&dumped).unwrap();
1983 assert_eq!(p2.enable_hinted_handoff, p.enable_hinted_handoff);
1984 assert_eq!(p2.hint_ttl_seconds, p.hint_ttl_seconds);
1985 assert_eq!(p2.hint_store_max_bytes, p.hint_store_max_bytes);
1986 assert_eq!(p2.hint_drain_interval_ms, p.hint_drain_interval_ms);
1987 }
1988
1989 #[test]
1990 fn hinted_handoff_zero_ttl_rejected_when_enabled() {
1991 let mut p = pool();
1992 p.enable_hinted_handoff = Some(true);
1993 p.hint_ttl_seconds = Some(0);
1994 p.apply_defaults();
1995 let err = p.validate("p").unwrap_err();
1998 assert!(matches!(
1999 err,
2000 ConfError::BadServer {
2001 field: "hint_ttl_seconds",
2002 ..
2003 }
2004 ));
2005 }
2006
2007 #[test]
2008 fn hinted_handoff_zero_max_bytes_rejected_when_enabled() {
2009 let mut p = pool();
2010 p.enable_hinted_handoff = Some(true);
2011 p.hint_store_max_bytes = Some(0);
2012 p.apply_defaults();
2013 let err = p.validate("p").unwrap_err();
2014 assert!(matches!(
2015 err,
2016 ConfError::BadServer {
2017 field: "hint_store_max_bytes",
2018 ..
2019 }
2020 ));
2021 }
2022
2023 #[test]
2024 fn hinted_handoff_zero_values_ignored_when_disabled() {
2025 let mut p = pool();
2029 p.enable_hinted_handoff = Some(false);
2030 p.hint_ttl_seconds = Some(0);
2031 p.hint_store_max_bytes = Some(0);
2032 p.hint_drain_interval_ms = Some(0);
2033 p.apply_defaults();
2034 assert!(p.validate("p").is_ok());
2035 }
2036
2037 #[test]
2038 fn riak_block_validates_when_unset() {
2039 let mut p = pool();
2040 p.riak = Some(ConfRiak::default());
2041 p.apply_defaults();
2042 assert!(p.validate("p").is_ok());
2043 }
2044
2045 #[test]
2046 fn riak_block_validates_with_addresses() {
2047 let mut p = pool();
2048 p.riak = Some(ConfRiak {
2049 pbc_listen: Some("127.0.0.1:8087".into()),
2050 http_listen: Some("127.0.0.1:8098".into()),
2051 ..ConfRiak::default()
2052 });
2053 p.apply_defaults();
2054 assert!(p.validate("p").is_ok());
2055 }
2056
2057 #[test]
2058 fn riak_block_rejects_bad_pbc_addr() {
2059 let mut p = pool();
2060 p.riak = Some(ConfRiak {
2061 pbc_listen: Some(String::new()),
2062 ..ConfRiak::default()
2063 });
2064 p.apply_defaults();
2065 assert!(matches!(p.validate("p"), Err(ConfError::BadServer { .. })));
2066 }
2067
2068 #[test]
2069 fn riak_block_rejects_segment_above_full_sweep() {
2070 let mut p = pool();
2071 p.riak = Some(ConfRiak {
2072 aae_segment_interval_seconds: Some(120),
2073 aae_full_sweep_interval_seconds: Some(60),
2074 ..ConfRiak::default()
2075 });
2076 p.apply_defaults();
2077 assert!(matches!(p.validate("p"), Err(ConfError::BadServer { .. })));
2078 }
2079
2080 #[test]
2081 fn riak_block_round_trips_through_yaml() {
2082 let yaml = r"
2083p:
2084 listen: 127.0.0.1:1
2085 dyn_listen: 127.0.0.1:2
2086 tokens: '0'
2087 servers:
2088 - 127.0.0.1:3:1
2089 data_store: 0
2090 riak:
2091 pbc_listen: 127.0.0.1:8087
2092 http_listen: 127.0.0.1:8098
2093 aae_enabled: true
2094 aae_full_sweep_interval_seconds: 3600
2095 aae_segment_interval_seconds: 30
2096";
2097 let cfg: std::collections::BTreeMap<String, ConfPool> = serde_yaml::from_str(yaml).unwrap();
2098 let p = cfg.get("p").unwrap();
2099 let r = p.riak.as_ref().unwrap();
2100 assert_eq!(r.pbc_listen.as_deref(), Some("127.0.0.1:8087"));
2101 assert_eq!(r.http_listen.as_deref(), Some("127.0.0.1:8098"));
2102 assert_eq!(r.aae_enabled, Some(true));
2103 assert_eq!(r.aae_full_sweep_interval_seconds, Some(3600));
2104 assert_eq!(r.aae_segment_interval_seconds, Some(30));
2105 }
2106
2107 #[test]
2108 fn peer_tls_pair_unset_is_ok() {
2109 let mut p = pool();
2110 p.apply_defaults();
2111 assert!(p.validate("p").is_ok(), "plaintext default must validate");
2112 }
2113
2114 #[test]
2115 fn peer_tls_pair_both_set_is_ok() {
2116 let mut p = pool();
2117 p.peer_tls_cert = Some(std::path::PathBuf::from("/etc/dynomite/peer.crt"));
2118 p.peer_tls_key = Some(std::path::PathBuf::from("/etc/dynomite/peer.key"));
2119 p.apply_defaults();
2120 assert!(p.validate("p").is_ok());
2121 }
2122
2123 #[test]
2124 fn peer_tls_cert_without_key_rejected() {
2125 let mut p = pool();
2126 p.peer_tls_cert = Some(std::path::PathBuf::from("/x.crt"));
2127 p.apply_defaults();
2128 let err = p.validate("p").unwrap_err();
2129 assert!(
2130 matches!(
2131 err,
2132 ConfError::BadServer {
2133 field: "peer_tls_cert",
2134 ..
2135 }
2136 ),
2137 "got {err:?}"
2138 );
2139 }
2140
2141 #[test]
2142 fn peer_tls_key_without_cert_rejected() {
2143 let mut p = pool();
2144 p.peer_tls_key = Some(std::path::PathBuf::from("/x.key"));
2145 p.apply_defaults();
2146 let err = p.validate("p").unwrap_err();
2147 assert!(
2148 matches!(
2149 err,
2150 ConfError::BadServer {
2151 field: "peer_tls_key",
2152 ..
2153 }
2154 ),
2155 "got {err:?}"
2156 );
2157 }
2158
2159 #[test]
2160 fn peer_tls_ca_without_cert_rejected() {
2161 let mut p = pool();
2162 p.peer_tls_ca = Some(std::path::PathBuf::from("/x.ca"));
2163 p.apply_defaults();
2164 let err = p.validate("p").unwrap_err();
2165 assert!(
2166 matches!(
2167 err,
2168 ConfError::BadServer {
2169 field: "peer_tls_ca",
2170 ..
2171 }
2172 ),
2173 "got {err:?}"
2174 );
2175 }
2176
2177 #[test]
2178 fn riak_tls_cert_without_key_rejected() {
2179 let mut p = pool();
2180 p.riak = Some(ConfRiak {
2181 pbc_listen: Some("127.0.0.1:8087".into()),
2182 tls_cert: Some(std::path::PathBuf::from("/x.crt")),
2183 ..ConfRiak::default()
2184 });
2185 p.apply_defaults();
2186 let err = p.validate("p").unwrap_err();
2187 assert!(
2188 matches!(
2189 err,
2190 ConfError::BadServer {
2191 field: "tls_cert",
2192 ..
2193 }
2194 ),
2195 "got {err:?}"
2196 );
2197 }
2198
2199 #[test]
2200 fn riak_tls_pair_both_set_is_ok() {
2201 let mut p = pool();
2202 p.riak = Some(ConfRiak {
2203 pbc_listen: Some("127.0.0.1:8087".into()),
2204 tls_cert: Some(std::path::PathBuf::from("/x.crt")),
2205 tls_key: Some(std::path::PathBuf::from("/x.key")),
2206 ..ConfRiak::default()
2207 });
2208 p.apply_defaults();
2209 assert!(p.validate("p").is_ok());
2210 }
2211
2212 #[test]
2213 fn riak_wasm_modules_yaml_round_trip() {
2214 let dir = tempfile::tempdir().unwrap();
2215 let m1 = dir.path().join("identity.wasm");
2216 let m2 = dir.path().join("sum.wasm");
2217 std::fs::write(&m1, b"\0asm\x01\0\0\0").unwrap();
2218 std::fs::write(&m2, b"\0asm\x01\0\0\0").unwrap();
2219 let yaml = format!(
2220 r"
2221listen: 127.0.0.1:8102
2222servers:
2223- 127.0.0.1:6379:1
2224tokens: '0'
2225riak:
2226 pbc_listen: 127.0.0.1:8087
2227 wasm_modules:
2228 - id: identity
2229 path: {m1}
2230 - id: sum
2231 path: {m2}
2232",
2233 m1 = m1.display(),
2234 m2 = m2.display(),
2235 );
2236 let p: ConfPool = serde_yaml::from_str(&yaml).unwrap();
2237 let r = p.riak.as_ref().unwrap();
2238 let mods = r.wasm_modules.as_ref().unwrap();
2239 assert_eq!(mods.len(), 2);
2240 assert_eq!(mods[0].id, "identity");
2241 assert_eq!(mods[0].path, m1);
2242 assert_eq!(mods[1].id, "sum");
2243 assert_eq!(mods[1].path, m2);
2244 let dumped = serde_yaml::to_string(&p).unwrap();
2246 let p2: ConfPool = serde_yaml::from_str(&dumped).unwrap();
2247 assert_eq!(p2.riak.unwrap().wasm_modules, r.wasm_modules);
2248 }
2249
2250 #[test]
2251 fn riak_wasm_modules_unique_ids_required() {
2252 let dir = tempfile::tempdir().unwrap();
2253 let path = dir.path().join("m.wasm");
2254 std::fs::write(&path, b"\0").unwrap();
2255 let r = ConfRiak {
2256 wasm_modules: Some(vec![
2257 ConfRiakWasmModule {
2258 id: "m".into(),
2259 path: path.clone(),
2260 },
2261 ConfRiakWasmModule {
2262 id: "m".into(),
2263 path: path.clone(),
2264 },
2265 ]),
2266 ..ConfRiak::default()
2267 };
2268 let err = r.validate().unwrap_err();
2269 assert!(matches!(
2270 err,
2271 ConfError::BadServer {
2272 field: "wasm_modules.id",
2273 ..
2274 }
2275 ));
2276 }
2277
2278 #[test]
2279 fn riak_wasm_modules_path_must_exist() {
2280 let r = ConfRiak {
2281 wasm_modules: Some(vec![ConfRiakWasmModule {
2282 id: "missing".into(),
2283 path: std::path::PathBuf::from("/no/such/path/at/all.wasm"),
2284 }]),
2285 ..ConfRiak::default()
2286 };
2287 let err = r.validate().unwrap_err();
2288 assert!(matches!(
2289 err,
2290 ConfError::BadServer {
2291 field: "wasm_modules.path",
2292 ..
2293 }
2294 ));
2295 }
2296
2297 #[test]
2298 fn riak_wasm_modules_empty_id_rejected() {
2299 let dir = tempfile::tempdir().unwrap();
2300 let path = dir.path().join("m.wasm");
2301 std::fs::write(&path, b"\0").unwrap();
2302 let r = ConfRiak {
2303 wasm_modules: Some(vec![ConfRiakWasmModule {
2304 id: String::new(),
2305 path,
2306 }]),
2307 ..ConfRiak::default()
2308 };
2309 let err = r.validate().unwrap_err();
2310 assert!(matches!(
2311 err,
2312 ConfError::BadServer {
2313 field: "wasm_modules.id",
2314 ..
2315 }
2316 ));
2317 }
2318
2319 #[test]
2320 fn peer_tls_profile_pair_unset_is_ok() {
2321 let p = ConfTlsProfile::default();
2322 assert!(p.validate("dc1").is_ok());
2323 }
2324
2325 #[test]
2326 fn peer_tls_profile_cert_without_key_rejected() {
2327 let p = ConfTlsProfile {
2328 cert: Some(std::path::PathBuf::from("/x.crt")),
2329 ..ConfTlsProfile::default()
2330 };
2331 let err = p.validate("dc1").unwrap_err();
2332 assert!(matches!(
2333 err,
2334 ConfError::BadServer {
2335 field: "peer_tls_profiles.cert",
2336 ..
2337 }
2338 ));
2339 }
2340
2341 #[test]
2342 fn peer_tls_profile_key_without_cert_rejected() {
2343 let p = ConfTlsProfile {
2344 key: Some(std::path::PathBuf::from("/x.key")),
2345 ..ConfTlsProfile::default()
2346 };
2347 let err = p.validate("dc1").unwrap_err();
2348 assert!(matches!(
2349 err,
2350 ConfError::BadServer {
2351 field: "peer_tls_profiles.key",
2352 ..
2353 }
2354 ));
2355 }
2356
2357 #[test]
2358 fn peer_tls_profile_ca_without_cert_rejected() {
2359 let p = ConfTlsProfile {
2360 ca: Some(std::path::PathBuf::from("/x.ca")),
2361 ..ConfTlsProfile::default()
2362 };
2363 let err = p.validate("dc1").unwrap_err();
2364 assert!(matches!(
2365 err,
2366 ConfError::BadServer {
2367 field: "peer_tls_profiles.ca",
2368 ..
2369 }
2370 ));
2371 }
2372
2373 #[test]
2374 fn peer_tls_profiles_empty_dc_name_rejected() {
2375 let mut p = pool();
2376 p.peer_tls_profiles.insert(
2377 String::new(),
2378 ConfTlsProfile {
2379 cert: Some(std::path::PathBuf::from("/x.crt")),
2380 key: Some(std::path::PathBuf::from("/x.key")),
2381 ca: None,
2382 },
2383 );
2384 p.apply_defaults();
2385 let err = p.validate("p").unwrap_err();
2386 assert!(matches!(
2387 err,
2388 ConfError::BadServer {
2389 field: "peer_tls_profiles",
2390 ..
2391 }
2392 ));
2393 }
2394
2395 #[test]
2396 fn peer_tls_profiles_per_dc_pair_validates() {
2397 let mut p = pool();
2398 p.peer_tls_profiles.insert(
2399 "dc1".into(),
2400 ConfTlsProfile {
2401 cert: Some(std::path::PathBuf::from("/dc1.crt")),
2402 key: Some(std::path::PathBuf::from("/dc1.key")),
2403 ca: None,
2404 },
2405 );
2406 p.apply_defaults();
2407 assert!(p.validate("p").is_ok());
2408 }
2409
2410 #[test]
2411 fn peer_tls_profiles_per_dc_cert_without_key_rejected() {
2412 let mut p = pool();
2413 p.peer_tls_profiles.insert(
2414 "dc1".into(),
2415 ConfTlsProfile {
2416 cert: Some(std::path::PathBuf::from("/dc1.crt")),
2417 key: None,
2418 ca: None,
2419 },
2420 );
2421 p.apply_defaults();
2422 let err = p.validate("p").unwrap_err();
2423 assert!(matches!(
2424 err,
2425 ConfError::BadServer {
2426 field: "peer_tls_profiles.cert",
2427 ..
2428 }
2429 ));
2430 }
2431
2432 #[test]
2433 fn peer_tls_profiles_yaml_round_trip() {
2434 let yaml = r"
2435listen: 127.0.0.1:8102
2436servers:
2437- 127.0.0.1:6379:1
2438tokens: '0'
2439peer_tls_profiles:
2440 dc1:
2441 cert: /etc/dynomite/dc1.pem
2442 key: /etc/dynomite/dc1.key
2443 ca: /etc/dynomite/dc1-ca.pem
2444 dc2:
2445 cert: /etc/dynomite/dc2.pem
2446 key: /etc/dynomite/dc2.key
2447";
2448 let p: ConfPool = serde_yaml::from_str(yaml).unwrap();
2449 assert_eq!(p.peer_tls_profiles.len(), 2);
2450 assert_eq!(
2451 p.peer_tls_profiles["dc1"].cert.as_deref(),
2452 Some(std::path::Path::new("/etc/dynomite/dc1.pem"))
2453 );
2454 assert!(p.peer_tls_profiles["dc2"].ca.is_none());
2455 let dumped = serde_yaml::to_string(&p).unwrap();
2456 let p2: ConfPool = serde_yaml::from_str(&dumped).unwrap();
2457 assert_eq!(p2.peer_tls_profiles, p.peer_tls_profiles);
2458 }
2459}