Skip to main content

dynomite/conf/
pool.rs

1//! Pool body schema, default application, and validation.
2//!
3//! The [`ConfPool`] struct mirrors `struct conf_pool` from the C
4//! reference. Every field whose C counterpart starts as
5//! `CONF_UNSET_NUM` / `CONF_UNSET_BOOL` / `CONF_UNSET_HASH` is wrapped
6//! in [`Option`]; [`ConfPool::apply_defaults`] later fills in the
7//! sentinel-driven defaults.
8
9use std::collections::BTreeMap;
10use std::fmt;
11use std::path::PathBuf;
12
13use serde::{Deserialize, Serialize};
14
15use super::endpoint::ConfListen;
16use super::enums::{ConsistencyLevel, DataStore, Distribution, HashType, SecureServerOption};
17use super::error::ConfError;
18use super::server::{ConfDynSeed, ConfServer};
19use super::tokens::TokenList;
20
21/// Default configuration constants. Mirrors the `CONF_DEFAULT_*` macros
22/// in the C reference.
23pub mod defaults {
24    /// Default request timeout in milliseconds.
25    pub const TIMEOUT_MS: i64 = 5_000;
26    /// Default `listen()` backlog.
27    pub const LISTEN_BACKLOG: i64 = 512;
28    /// Default `client_connections:` value (0 = unlimited).
29    pub const CLIENT_CONNECTIONS: i64 = 0;
30    /// Default `data_store:` value (0 = redis).
31    pub const DATA_STORE: i64 = 0;
32    /// Default `preconnect:` value.
33    ///
34    /// The C reference defaults `preconnect` to false: clients
35    /// can connect to dynomited before the local datastore is
36    /// reachable. The lazy connect avoids a hard dependency on
37    /// boot ordering. We match that default here.
38    pub const PRECONNECT: bool = false;
39    /// Default `auto_eject_hosts:` value.
40    pub const AUTO_EJECT_HOSTS: bool = true;
41    /// Default `server_retry_timeout:` (ms).
42    pub const SERVER_RETRY_TIMEOUT_MS: i64 = 10 * 1000;
43    /// Default `server_failure_limit:`.
44    pub const SERVER_FAILURE_LIMIT: i64 = 3;
45    /// Default `dyn_read_timeout:` (ms).
46    pub const DYN_READ_TIMEOUT_MS: i64 = 10_000;
47    /// Default `dyn_write_timeout:` (ms).
48    pub const DYN_WRITE_TIMEOUT_MS: i64 = 10_000;
49    /// Default `dyn_connections:`.
50    pub const DYN_CONNECTIONS: i64 = 100;
51    /// Default `gos_interval:` (ms).
52    pub const GOS_INTERVAL_MS: i64 = 30_000;
53    /// Default `enable_hinted_handoff:` value. The feature is
54    /// off by default until operators opt in.
55    pub const ENABLE_HINTED_HANDOFF: bool = false;
56    /// Default `hint_ttl_seconds:` (24h). Hints older than this
57    /// are dropped during expiry sweeps.
58    pub const HINT_TTL_SECONDS: u64 = 86_400;
59    /// Default `hint_store_max_bytes:` (64 MiB) cap on the
60    /// node-local in-memory hint store.
61    pub const HINT_STORE_MAX_BYTES: u64 = 64 * 1024 * 1024;
62    /// Default `hint_drain_interval_ms:` between hint drainer
63    /// sweeps.
64    pub const HINT_DRAIN_INTERVAL_MS: u64 = 30_000;
65    /// Default per-connection message rate.
66    pub const CONN_MSG_RATE: u32 = 50_000;
67    /// Default `stats_interval:` (ms).
68    pub const STATS_INTERVAL_MS: i64 = 30 * 1000;
69    /// Default stats listener address.
70    pub const STATS_PNAME: &str = "0.0.0.0:22222";
71    /// Default datastore-side connection count.
72    pub const DATASTORE_CONNECTIONS: u8 = 1;
73    /// Default local-peer connection count.
74    pub const LOCAL_PEER_CONNECTIONS: u8 = 1;
75    /// Default remote-peer connection count.
76    pub const REMOTE_PEER_CONNECTIONS: u8 = 1;
77    /// Default rack name.
78    pub const RACK: &str = "localrack";
79    /// Default datacenter name.
80    pub const DC: &str = "localdc";
81    /// Default `secure_server_option:` value.
82    pub const SECURE_SERVER_OPTION: &str = "none";
83    /// Default `read_consistency:` / `write_consistency:`.
84    pub const CONSISTENCY: &str = "DC_ONE";
85    /// Default `dyn_seed_provider:`.
86    pub const SEED_PROVIDER: &str = "simple_provider";
87    /// Default `env:` (cloud environment marker).
88    pub const ENV: &str = "aws";
89    /// Default PEM key file path.
90    pub const PEM_KEY_FILE: &str = "conf/dynomite.pem";
91    /// Default reconciliation key file path.
92    pub const RECON_KEY_FILE: &str = "conf/recon_key.pem";
93    /// Default reconciliation IV file path.
94    pub const RECON_IV_FILE: &str = "conf/recon_iv.pem";
95    /// Default cadence (in seconds) of the entropy reconciliation
96    /// run loop. Mirrors the brief's five-minute default; ignored
97    /// when the entropy task is not enabled.
98    pub const RECON_INTERVAL_SECONDS: u64 = 300;
99    /// Smallest valid `mbuf_size:`.
100    pub const MBUF_MIN_SIZE: i64 = 512;
101    /// Largest valid `mbuf_size:`.
102    pub const MBUF_MAX_SIZE: i64 = 512_000;
103    /// Smallest valid `max_msgs:`.
104    pub const ALLOC_MSGS_MIN: i64 = 100_000;
105    /// Largest valid `max_msgs:`.
106    pub const ALLOC_MSGS_MAX: i64 = 1_000_000;
107}
108
109/// Wrapper for the `servers:` field that enforces the invariant
110/// of "exactly one datastore" without losing the YAML list shape.
111///
112/// # Examples
113///
114/// ```
115/// use dynomite::conf::{ConfServer, Servers};
116/// let s = Servers::from_vec(vec![ConfServer::parse("127.0.0.1:6379:1").unwrap()]);
117/// assert_eq!(s.len(), 1);
118/// assert!(!s.is_empty());
119/// assert!(s.datastore().is_some());
120/// ```
121#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize)]
122#[serde(transparent)]
123pub struct Servers(pub(crate) Vec<ConfServer>);
124
125impl Servers {
126    /// Construct from an explicit list. Validation enforces a length
127    /// of one when called via `Config::validate`.
128    ///
129    /// # Examples
130    ///
131    /// ```
132    /// use dynomite::conf::{ConfServer, Servers};
133    /// let s = Servers::from_vec(vec![ConfServer::parse("127.0.0.1:6379:1").unwrap()]);
134    /// assert_eq!(s.len(), 1);
135    /// ```
136    pub fn from_vec(v: Vec<ConfServer>) -> Self {
137        Self(v)
138    }
139}
140
141impl Servers {
142    /// Borrow the entries.
143    ///
144    /// # Examples
145    ///
146    /// ```
147    /// use dynomite::conf::{ConfServer, Servers};
148    /// let s = Servers::from_vec(vec![ConfServer::parse("127.0.0.1:6379:1").unwrap()]);
149    /// assert_eq!(s.entries().len(), 1);
150    /// ```
151    pub fn entries(&self) -> &[ConfServer] {
152        &self.0
153    }
154    /// Number of entries.
155    ///
156    /// # Examples
157    ///
158    /// ```
159    /// use dynomite::conf::Servers;
160    /// assert_eq!(Servers::default().len(), 0);
161    /// ```
162    pub fn len(&self) -> usize {
163        self.0.len()
164    }
165    /// Whether the list is empty.
166    ///
167    /// # Examples
168    ///
169    /// ```
170    /// use dynomite::conf::Servers;
171    /// assert!(Servers::default().is_empty());
172    /// ```
173    pub fn is_empty(&self) -> bool {
174        self.0.is_empty()
175    }
176    /// The single datastore (returns the first entry, if any).
177    ///
178    /// # Examples
179    ///
180    /// ```
181    /// use dynomite::conf::{ConfServer, Servers};
182    /// let s = Servers::from_vec(vec![ConfServer::parse("127.0.0.1:6379:1").unwrap()]);
183    /// assert!(s.datastore().is_some());
184    /// assert!(Servers::default().datastore().is_none());
185    /// ```
186    pub fn datastore(&self) -> Option<&ConfServer> {
187        self.0.first()
188    }
189}
190
191/// Pool configuration body. One per top-level YAML pool name.
192///
193/// # Examples
194///
195/// ```
196/// use dynomite::conf::{ConfPool, ConfListen};
197/// let mut p = ConfPool::default();
198/// assert!(p.listen.is_none());
199/// p.listen = Some(ConfListen::parse("listen", "127.0.0.1:8102").unwrap());
200/// p.apply_defaults();
201/// assert_eq!(p.timeout, Some(5_000));
202/// ```
203#[derive(Debug, Clone, Default, Serialize, Deserialize)]
204#[serde(deny_unknown_fields, default)]
205pub struct ConfPool {
206    /// `listen:` - client-facing listener address.
207    pub listen: Option<ConfListen>,
208    /// `dyn_listen:` - peer-facing listener address.
209    pub dyn_listen: Option<ConfListen>,
210    /// `stats_listen:` - HTTP stats endpoint.
211    pub stats_listen: Option<ConfListen>,
212
213    /// `hash:` - hash function name.
214    pub hash: Option<HashType>,
215    /// `hash_tag:` - two-character delimiter pair.
216    pub hash_tag: Option<String>,
217
218    /// `distribution:` - distribution algorithm. Defaults to
219    /// [`Distribution::Vnode`]. Setting one of the legacy
220    /// `ketama` / `modula` / `random` values is accepted but
221    /// emits a deprecation warning at config-load time and
222    /// collapses to `vnode` at runtime.
223    #[serde(default)]
224    pub distribution: Option<Distribution>,
225    /// `distribution_shadow:` - optional shadow distribution
226    /// computed alongside the live one. When set, the
227    /// dispatcher routes via [`Self::distribution`] but also
228    /// computes the shadow route for every key and bumps a
229    /// counter when the two disagree. Used to validate a
230    /// migration before flipping the live distribution.
231    #[serde(default)]
232    pub distribution_shadow: Option<Distribution>,
233    /// `server_connections:` - deprecated; recorded for warning but ignored.
234    #[serde(default)]
235    pub server_connections: Option<i64>,
236
237    /// `timeout:` - request timeout in milliseconds.
238    pub timeout: Option<i64>,
239    /// `backlog:` - listen backlog.
240    pub backlog: Option<i64>,
241    /// `client_connections:` - max client connections.
242    pub client_connections: Option<i64>,
243    /// `data_store:` - 0 = redis, 1 = memcache, 2 = noxu.
244    /// Operators may also write the textual form (`redis`,
245    /// `memcache`, `noxu`); the deserializer normalises both
246    /// shapes to the integer code that the rest of the engine
247    /// consumes.
248    #[serde(default, deserialize_with = "deserialize_data_store")]
249    pub data_store: Option<i64>,
250    /// `noxu_path:` - filesystem directory the in-process Noxu
251    /// DB datastore opens its environment at. Required when
252    /// `data_store: noxu` is selected; ignored otherwise. The
253    /// directory must be writable; an existing environment is
254    /// reused, otherwise one is created.
255    #[serde(default)]
256    pub noxu_path: Option<PathBuf>,
257    /// `preconnect:` - eagerly establish connections at startup.
258    pub preconnect: Option<bool>,
259    /// `redis_requirepass:` - optional password sent as `AUTH <pw>`
260    /// on every backend connection right after the TCP handshake.
261    /// Mirrors the Redis server option of the same name. Leave
262    /// unset to disable. Memcache backends are not authenticated
263    /// (`AUTH` is Redis-specific; memcache binary SASL is not
264    /// implemented).
265    #[serde(default)]
266    pub redis_requirepass: Option<String>,
267    /// `auto_eject_hosts:` - automatically eject failing peers.
268    pub auto_eject_hosts: Option<bool>,
269    /// `server_retry_timeout:` - retry interval for ejected servers (ms).
270    pub server_retry_timeout: Option<i64>,
271    /// `server_failure_limit:` - consecutive failures before eject.
272    pub server_failure_limit: Option<i64>,
273
274    /// `servers:` - the (single-element) datastore list.
275    pub servers: Option<Servers>,
276
277    /// `dyn_read_timeout:` - inter-node read timeout (ms).
278    pub dyn_read_timeout: Option<i64>,
279    /// `dyn_write_timeout:` - inter-node write timeout (ms).
280    pub dyn_write_timeout: Option<i64>,
281    /// `dyn_seed_provider:` - seeds backend.
282    pub dyn_seed_provider: Option<String>,
283    /// `dyn_seeds:` - peer dynomite nodes.
284    pub dyn_seeds: Option<Vec<ConfDynSeed>>,
285    /// `dyn_port:` - default peer port.
286    pub dyn_port: Option<i64>,
287    /// `dyn_connections:` - per-peer connection count.
288    pub dyn_connections: Option<i64>,
289    /// `rack:` - this node's rack.
290    pub rack: Option<String>,
291    /// `tokens:` - this node's tokens.
292    pub tokens: Option<TokenList>,
293    /// `gos_interval:` - gossip period (ms).
294    pub gos_interval: Option<i64>,
295    /// `secure_server_option:` - inter-node TLS mode.
296    pub secure_server_option: Option<String>,
297    /// `pem_key_file:` - path to the PEM private key.
298    pub pem_key_file: Option<String>,
299    /// `recon_key_file:` - reconciliation key path.
300    pub recon_key_file: Option<String>,
301    /// `recon_iv_file:` - reconciliation IV path.
302    pub recon_iv_file: Option<String>,
303    /// `recon_interval_seconds:` - period (in seconds) of the
304    /// background entropy reconciliation cycle.
305    ///
306    /// Ignored when [`Self::recon_key_file`] is unset or when the
307    /// configured key file cannot be opened at startup. When the
308    /// entropy task is enabled the default cadence is 300 seconds
309    /// (five minutes); operators can override it via this YAML
310    /// directive.
311    #[serde(default)]
312    pub recon_interval_seconds: Option<u64>,
313    /// `datacenter:` - this node's datacenter.
314    pub datacenter: Option<String>,
315    /// `env:` - cloud environment marker.
316    pub env: Option<String>,
317    /// `conn_msg_rate:` - per-connection message rate cap.
318    pub conn_msg_rate: Option<u32>,
319    /// `read_consistency:` - quorum policy for reads.
320    pub read_consistency: Option<String>,
321    /// `write_consistency:` - quorum policy for writes.
322    pub write_consistency: Option<String>,
323    /// `stats_interval:` - stats aggregation period (ms).
324    pub stats_interval: Option<i64>,
325    /// `enable_gossip:` - enable / disable gossip thread.
326    pub enable_gossip: Option<bool>,
327    /// `peer_tls_cert:` - PEM certificate path for the dnode
328    /// listener and outbound dnode connections. When both this
329    /// field and [`Self::peer_tls_key`] are set the peer plane
330    /// runs over TLS; when both are absent the peer plane runs
331    /// in plaintext (the historical behaviour). Setting one
332    /// without the other is rejected at validation time.
333    #[serde(default)]
334    pub peer_tls_cert: Option<PathBuf>,
335    /// `peer_tls_key:` - PEM private-key path matching
336    /// [`Self::peer_tls_cert`].
337    #[serde(default)]
338    pub peer_tls_key: Option<PathBuf>,
339    /// `peer_tls_ca:` - optional PEM CA bundle. When set, the
340    /// dnode listener requires every inbound peer to present a
341    /// certificate signed by a CA from this bundle (mutual TLS).
342    /// When unset, the listener still terminates TLS but does
343    /// not request a client certificate. The outbound side uses
344    /// this bundle as its trust anchor; when unset, the bundled
345    /// `webpki_roots` Mozilla bundle is used.
346    #[serde(default)]
347    pub peer_tls_ca: Option<PathBuf>,
348    /// `peer_tls_profiles:` - per-DC TLS material lookup. Each
349    /// entry is keyed by the target peer's datacenter name; the
350    /// value is a [`ConfTlsProfile`] giving the cert / key / CA
351    /// triple to use when negotiating peer-plane TLS to or from a
352    /// peer in that DC. When the inbound listener accepts a
353    /// connection it picks the cert by SNI hostname
354    /// (`dc-<dc-name>.dynomite.local`); the outbound peer
355    /// supervisor dials with the same SNI hostname so the remote
356    /// listener can route the handshake.
357    ///
358    /// When this map is empty (the default), the legacy
359    /// `peer_tls_cert` / `peer_tls_key` / `peer_tls_ca` triple is
360    /// the only profile in use, applied to every peer regardless
361    /// of DC. When the map is non-empty, each entry takes
362    /// precedence over the legacy fields for matching DCs; the
363    /// legacy fields become the implicit "default" profile used
364    /// for any DC without an explicit entry. When neither the
365    /// map nor the legacy fields are set, the peer plane is
366    /// plaintext.
367    #[serde(default)]
368    pub peer_tls_profiles: BTreeMap<String, ConfTlsProfile>,
369    /// `mbuf_size:` - mbuf chunk size in bytes.
370    pub mbuf_size: Option<i64>,
371    /// `max_msgs:` - allocated message buffer size.
372    pub max_msgs: Option<i64>,
373    /// `datastore_connections:` - count of connections to the datastore.
374    pub datastore_connections: Option<u8>,
375    /// `local_peer_connections:` - count of connections to local-DC peers.
376    pub local_peer_connections: Option<u8>,
377    /// `remote_peer_connections:` - count of connections to remote peers.
378    pub remote_peer_connections: Option<u8>,
379    /// `read_repairs_enabled:` - enable read-repair on quorum mismatch.
380    pub read_repairs_enabled: Option<bool>,
381    /// `enable_hinted_handoff:` - when true, writes whose target
382    /// peer is in [`crate::cluster::peer::PeerState::Down`] (or
383    /// whose outbound channel is closed / full) are stored in a
384    /// node-local hint queue and counted toward the consistency
385    /// threshold; a background drainer ships the hints to the
386    /// peer once it returns to
387    /// [`crate::cluster::peer::PeerState::Normal`]. When false
388    /// (the default) the dispatcher behaviour is unchanged: a
389    /// Down or unreachable target is silently skipped and the
390    /// request fails with `DynomiteNoQuorumAchieved` if the
391    /// remaining targets cannot satisfy the consistency level.
392    #[serde(default)]
393    pub enable_hinted_handoff: Option<bool>,
394    /// `hint_ttl_seconds:` - per-hint expiry. Hints older than
395    /// this many seconds are dropped during periodic sweeps to
396    /// bound the in-memory store. Defaults to 86400 (24 hours).
397    /// Ignored when `enable_hinted_handoff` is false.
398    #[serde(default)]
399    pub hint_ttl_seconds: Option<u64>,
400    /// `hint_store_max_bytes:` - upper bound on the in-memory
401    /// hint store. Once the store reaches this many bytes,
402    /// further enqueues fail with
403    /// [`crate::cluster::hints::HintStoreError::OverCapacity`]
404    /// and the dispatcher falls back to its non-handoff error
405    /// path. Defaults to 64 MiB. Ignored when
406    /// `enable_hinted_handoff` is false.
407    #[serde(default)]
408    pub hint_store_max_bytes: Option<u64>,
409    /// `hint_drain_interval_ms:` - period of the background hint
410    /// drainer sweep. Defaults to 30000 ms (30 seconds). Ignored
411    /// when `enable_hinted_handoff` is false.
412    #[serde(default)]
413    pub hint_drain_interval_ms: Option<u64>,
414    /// `log_format:` - selectable shape for tracing output.
415    ///
416    /// Accepted values are `default`, `rfc5424`, `rfc3164`, `json`,
417    /// and `ndjson` (alias of `json`). When unset, the historical
418    /// default text format is used. Parsing is performed at
419    /// log-installation time by [`crate::core::log::LogFormat::parse`];
420    /// invalid values fail the `dynomited --test-conf` gate.
421    pub log_format: Option<String>,
422
423    /// `observability:` - opt-in observability knobs (distributed
424    /// tracing OTLP exporter today; the OTLP log appender is
425    /// scoped but not yet wired). Absent / null disables every
426    /// observability surface, preserving today's silent default.
427    /// See [`ObservabilityConfig`].
428    #[serde(default)]
429    pub observability: Option<ObservabilityConfig>,
430
431    /// `bucket_types:` - per-bucket routing-property bundles.
432    ///
433    /// Each entry is a [`ConfBucketType`] keyed by name. The
434    /// dispatcher extracts the bucket name from the request key
435    /// (the prefix before the first `/`; see
436    /// [`crate::proto::redis::bucket_name`]) and, when a matching
437    /// entry exists, swaps in that type's `read_consistency`,
438    /// `write_consistency`, and `n_val` for the lifetime of that
439    /// request. Pool-level fields stay the fallback for keys
440    /// without a slash, for keys whose bucket prefix is unknown,
441    /// and for any field the bucket-type stanza leaves at its
442    /// default. Empty list disables the feature; entries must have
443    /// unique names.
444    #[serde(default)]
445    pub bucket_types: Vec<ConfBucketType>,
446
447    /// `default_bucket_type:` - name of the bucket type to apply
448    /// when the request key has no slash (or has an empty prefix).
449    /// Must reference an entry of `bucket_types` when set; unset
450    /// (the default) falls all the way back to pool-level
451    /// consistency.
452    #[serde(default)]
453    pub default_bucket_type: Option<String>,
454
455    /// `riak:` - optional Riak-mode listener / AAE configuration.
456    ///
457    /// Consumed only when `dynomited` is built with the
458    /// `--features riak` Cargo feature: the binary then
459    /// instantiates a Protocol Buffers Client (PBC) listener,
460    /// an HTTP gateway, and (optionally) the active anti-entropy
461    /// scheduler against the supplied addresses. When the field
462    /// is absent (or every inner option is unset), the binary
463    /// behaves identically to a Redis / Memcache deployment.
464    /// The block is parsed unconditionally so YAML files
465    /// authored against the Riak-enabled binary still validate
466    /// under the default build.
467    #[serde(default)]
468    pub riak: Option<ConfRiak>,
469}
470
471/// Optional Riak-mode listener / AAE knobs.
472///
473/// Every field is optional. The PBC and HTTP listeners are
474/// independent: setting one without the other is supported.
475/// When `aae_enabled` is `true` the active anti-entropy
476/// scheduler is spawned; the cadence knobs default to the
477/// values shipped by `dyniak::aae::config`.
478///
479/// # Examples
480///
481/// ```
482/// use dynomite::conf::ConfRiak;
483/// let r = ConfRiak {
484///     pbc_listen: Some("127.0.0.1:8087".into()),
485///     http_listen: Some("127.0.0.1:8098".into()),
486///     aae_enabled: Some(false),
487///     aae_full_sweep_interval_seconds: None,
488///     aae_segment_interval_seconds: None,
489///     tls_cert: None,
490///     tls_key: None,
491///     tls_ca: None,
492///     wasm_modules: None,
493/// };
494/// assert!(r.validate().is_ok());
495/// ```
496#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
497#[serde(deny_unknown_fields, default)]
498pub struct ConfRiak {
499    /// Address the Riak Protocol Buffers Client listener binds
500    /// to (`host:port`). When unset, the PBC listener is not
501    /// started.
502    pub pbc_listen: Option<String>,
503    /// Address the Riak HTTP gateway listener binds to
504    /// (`host:port`). When unset, the HTTP gateway is not
505    /// started.
506    pub http_listen: Option<String>,
507    /// When `true`, the Riak active anti-entropy scheduler is
508    /// spawned alongside the listeners. Default: `false`.
509    pub aae_enabled: Option<bool>,
510    /// Override for the AAE full-sweep cadence, in seconds.
511    /// When unset, `dyniak::aae::config::DEFAULT_FULL_SWEEP_SECONDS`
512    /// (24h) is used.
513    pub aae_full_sweep_interval_seconds: Option<u64>,
514    /// Override for the AAE per-segment exchange cadence, in
515    /// seconds. When unset, `dyniak::aae::config::DEFAULT_SEGMENT_SECONDS`
516    /// (60s) is used.
517    pub aae_segment_interval_seconds: Option<u64>,
518    /// `tls_cert:` - PEM certificate path for the Riak PBC and
519    /// HTTP listeners. When both `tls_cert` and `tls_key` are
520    /// set, both Riak listeners terminate TLS; when both are
521    /// absent, both listeners run in plaintext (the historical
522    /// behaviour). Setting one without the other is rejected at
523    /// validation time.
524    #[serde(default)]
525    pub tls_cert: Option<PathBuf>,
526    /// `tls_key:` - PEM private-key path matching
527    /// [`Self::tls_cert`].
528    #[serde(default)]
529    pub tls_key: Option<PathBuf>,
530    /// `tls_ca:` - optional PEM CA bundle for mutual TLS on the
531    /// Riak listeners. When set, every inbound client must
532    /// present a certificate signed by a CA from this bundle.
533    /// When unset, the listeners terminate TLS without
534    /// requesting a client certificate.
535    #[serde(default)]
536    pub tls_ca: Option<PathBuf>,
537    /// `wasm_modules:` - optional list of Wasm modules to
538    /// register with the MapReduce executor at startup. Each
539    /// entry pairs a logical `id` with the on-disk `path` of a
540    /// Wasm binary (`.wasm`) or WAT text (`.wat`) file. When
541    /// `dynomited` is built with the `wasm` Cargo feature it
542    /// loads every entry through
543    /// [`dyniak::mapreduce::wasm::load_modules_from_config`]
544    /// and exposes the resulting store on the executor; without
545    /// the feature the field is parsed and validated but the
546    /// loader is never called (the runtime returns the typed
547    /// `WasmNotImplemented` error if a `Phase::WasmModule` is
548    /// submitted).
549    ///
550    /// Validation: every `id` must be unique and every `path`
551    /// must point at an existing file at validation time.
552    #[serde(default)]
553    pub wasm_modules: Option<Vec<ConfRiakWasmModule>>,
554}
555
556/// One Wasm module entry inside a [`ConfRiak::wasm_modules`]
557/// list.
558///
559/// # Examples
560///
561/// ```
562/// use std::path::PathBuf;
563/// use dynomite::conf::ConfRiakWasmModule;
564/// let m = ConfRiakWasmModule {
565///     id: "identity".into(),
566///     path: PathBuf::from("/etc/dynomited/wasm/identity.wasm"),
567/// };
568/// assert_eq!(m.id, "identity");
569/// ```
570#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
571#[serde(deny_unknown_fields)]
572pub struct ConfRiakWasmModule {
573    /// Logical module identifier referenced from a
574    /// `Phase::WasmModule { module_id }` MapReduce phase.
575    pub id: String,
576    /// Filesystem path to the Wasm binary (`.wasm`) or WAT
577    /// text (`.wat`) file. Read once at startup.
578    pub path: PathBuf,
579}
580
581/// One per-DC TLS profile inside [`ConfPool::peer_tls_profiles`].
582///
583/// Each profile names a PEM cert / private-key pair and an
584/// optional CA bundle. The map's key is the datacenter name
585/// (matching the value an operator sets in `dyn_seeds:` for
586/// peers in that DC); when a peer's DC has no entry, the
587/// connection falls back to the legacy `peer_tls_*` fields
588/// (treated as the implicit "default" profile). When neither a
589/// per-DC entry nor the default fields are set, the connection
590/// is plaintext.
591///
592/// # Examples
593///
594/// ```
595/// use std::path::PathBuf;
596/// use dynomite::conf::ConfTlsProfile;
597/// let p = ConfTlsProfile {
598///     cert: Some(PathBuf::from("/etc/dynomite/dc1.pem")),
599///     key: Some(PathBuf::from("/etc/dynomite/dc1.key")),
600///     ca: Some(PathBuf::from("/etc/dynomite/dc1-ca.pem")),
601/// };
602/// assert!(p.validate("dc1").is_ok());
603/// ```
604#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
605#[serde(deny_unknown_fields, default)]
606pub struct ConfTlsProfile {
607    /// PEM certificate path. Must be set together with [`Self::key`].
608    pub cert: Option<PathBuf>,
609    /// PEM private-key path matching [`Self::cert`].
610    pub key: Option<PathBuf>,
611    /// Optional PEM CA bundle. When set, peer-plane connections
612    /// using this profile pin the bundle as their trust anchor
613    /// (and the listener requires inbound peers to present a
614    /// certificate signed by a CA in the bundle for mTLS). When
615    /// unset, the listener does not request a client certificate
616    /// and the outbound side falls back to the bundled
617    /// `webpki_roots` Mozilla anchors.
618    pub ca: Option<PathBuf>,
619}
620
621impl ConfTlsProfile {
622    /// Validate that the profile is internally consistent.
623    ///
624    /// `cert` and `key` must both be set or both be unset; a
625    /// `ca` requires the cert / key pair. The `dc` argument is
626    /// the map key the profile lives under and is included in
627    /// any error message so an operator can identify the
628    /// offending entry.
629    ///
630    /// # Errors
631    /// Returns [`ConfError::BadServer`] when the cert / key
632    /// pair is mismatched, or when `ca` is set without the cert
633    /// / key pair.
634    ///
635    /// # Examples
636    ///
637    /// ```
638    /// use std::path::PathBuf;
639    /// use dynomite::conf::ConfTlsProfile;
640    /// let p = ConfTlsProfile {
641    ///     cert: Some(PathBuf::from("/etc/x.pem")),
642    ///     key: None,
643    ///     ca: None,
644    /// };
645    /// assert!(p.validate("dc1").is_err());
646    /// ```
647    pub fn validate(&self, dc: &str) -> Result<(), ConfError> {
648        match (self.cert.as_deref(), self.key.as_deref()) {
649            (Some(_), Some(_)) | (None, None) => {}
650            (Some(c), None) => {
651                return Err(ConfError::BadServer {
652                    field: "peer_tls_profiles.cert",
653                    value: c.display().to_string(),
654                    reason: format!(
655                        "peer_tls_profiles[{dc}].cert is set but .key is not; both must be set together"
656                    ),
657                });
658            }
659            (None, Some(k)) => {
660                return Err(ConfError::BadServer {
661                    field: "peer_tls_profiles.key",
662                    value: k.display().to_string(),
663                    reason: format!(
664                        "peer_tls_profiles[{dc}].key is set but .cert is not; both must be set together"
665                    ),
666                });
667            }
668        }
669        if self.ca.is_some() && self.cert.is_none() {
670            return Err(ConfError::BadServer {
671                field: "peer_tls_profiles.ca",
672                value: self
673                    .ca
674                    .as_ref()
675                    .map_or_else(String::new, |p| p.display().to_string()),
676                reason: format!(
677                    "peer_tls_profiles[{dc}].ca requires .cert and .key to also be set"
678                ),
679            });
680        }
681        Ok(())
682    }
683}
684
685impl ConfRiak {
686    /// Validate the cross-field invariants of the Riak block.
687    ///
688    /// # Errors
689    /// Returns a [`ConfError::BadServer`] when an address fails
690    /// to parse as a `host:port` socket address, when an AAE
691    /// cadence is zero, or when `aae_segment_interval_seconds`
692    /// exceeds `aae_full_sweep_interval_seconds`.
693    ///
694    /// # Examples
695    ///
696    /// ```
697    /// use dynomite::conf::ConfRiak;
698    /// let r = ConfRiak {
699    ///     pbc_listen: Some("not-a-socket-addr".into()),
700    ///     ..ConfRiak::default()
701    /// };
702    /// assert!(r.validate().is_err());
703    /// ```
704    pub fn validate(&self) -> Result<(), ConfError> {
705        if let Some(addr) = self.pbc_listen.as_deref() {
706            validate_riak_addr("pbc_listen", addr)?;
707        }
708        if let Some(addr) = self.http_listen.as_deref() {
709            validate_riak_addr("http_listen", addr)?;
710        }
711        if let Some(n) = self.aae_full_sweep_interval_seconds {
712            if n == 0 {
713                return Err(ConfError::BadServer {
714                    field: "aae_full_sweep_interval_seconds",
715                    value: n.to_string(),
716                    reason: "must be > 0".into(),
717                });
718            }
719        }
720        if let Some(n) = self.aae_segment_interval_seconds {
721            if n == 0 {
722                return Err(ConfError::BadServer {
723                    field: "aae_segment_interval_seconds",
724                    value: n.to_string(),
725                    reason: "must be > 0".into(),
726                });
727            }
728        }
729        if let (Some(seg), Some(full)) = (
730            self.aae_segment_interval_seconds,
731            self.aae_full_sweep_interval_seconds,
732        ) {
733            if seg > full {
734                return Err(ConfError::BadServer {
735                    field: "aae_segment_interval_seconds",
736                    value: seg.to_string(),
737                    reason: format!("must be <= aae_full_sweep_interval_seconds ({full})"),
738                });
739            }
740        }
741        validate_tls_pair(
742            "tls_cert",
743            "tls_key",
744            self.tls_cert.as_deref(),
745            self.tls_key.as_deref(),
746        )?;
747        if self.tls_ca.is_some() && self.tls_cert.is_none() {
748            return Err(ConfError::BadServer {
749                field: "tls_ca",
750                value: self
751                    .tls_ca
752                    .as_ref()
753                    .map_or_else(String::new, |p| p.display().to_string()),
754                reason: "requires tls_cert and tls_key to also be set".into(),
755            });
756        }
757        if let Some(modules) = self.wasm_modules.as_deref() {
758            let mut seen: std::collections::BTreeSet<&str> = std::collections::BTreeSet::new();
759            for m in modules {
760                if m.id.is_empty() {
761                    return Err(ConfError::BadServer {
762                        field: "wasm_modules.id",
763                        value: String::new(),
764                        reason: "wasm module id must not be empty".into(),
765                    });
766                }
767                if !seen.insert(m.id.as_str()) {
768                    return Err(ConfError::BadServer {
769                        field: "wasm_modules.id",
770                        value: m.id.clone(),
771                        reason: "wasm module ids must be unique".into(),
772                    });
773                }
774                if !m.path.is_file() {
775                    return Err(ConfError::BadServer {
776                        field: "wasm_modules.path",
777                        value: m.path.display().to_string(),
778                        reason: format!("wasm module file not found for id '{}'", m.id),
779                    });
780                }
781            }
782        }
783        Ok(())
784    }
785}
786
787/// Cross-check a `(cert, key)` TLS pair: both must be `Some` or
788/// both must be `None`. The `cert_field` and `key_field` static
789/// strings name the YAML keys for the error message.
790fn validate_tls_pair(
791    cert_field: &'static str,
792    key_field: &'static str,
793    cert: Option<&std::path::Path>,
794    key: Option<&std::path::Path>,
795) -> Result<(), ConfError> {
796    match (cert, key) {
797        (Some(_), Some(_)) | (None, None) => Ok(()),
798        (Some(c), None) => Err(ConfError::BadServer {
799            field: cert_field,
800            value: c.display().to_string(),
801            reason: format!(
802                "{cert_field} is set but {key_field} is not; both must be set together"
803            ),
804        }),
805        (None, Some(k)) => Err(ConfError::BadServer {
806            field: key_field,
807            value: k.display().to_string(),
808            reason: format!(
809                "{key_field} is set but {cert_field} is not; both must be set together"
810            ),
811        }),
812    }
813}
814
815fn validate_riak_addr(field: &'static str, value: &str) -> Result<(), ConfError> {
816    use std::net::ToSocketAddrs;
817    if value.is_empty() {
818        return Err(ConfError::BadServer {
819            field,
820            value: value.to_string(),
821            reason: "riak listen address must not be empty".into(),
822        });
823    }
824    // Accept anything that resolves; mirrors how the rest of
825    // the engine validates `listen:` strings (parse first,
826    // resolve as a fallback).
827    if value.parse::<std::net::SocketAddr>().is_ok() {
828        return Ok(());
829    }
830    match value.to_socket_addrs() {
831        Ok(mut iter) => {
832            if iter.next().is_some() {
833                Ok(())
834            } else {
835                Err(ConfError::BadServer {
836                    field,
837                    value: value.to_string(),
838                    reason: "resolved to no addresses".into(),
839                })
840            }
841        }
842        Err(e) => Err(ConfError::BadServer {
843            field,
844            value: value.to_string(),
845            reason: format!("could not resolve: {e}"),
846        }),
847    }
848}
849
850/// Routing-property bundle attached to a key bucket.
851///
852/// Bucket types let operators give different key classes
853/// different SLAs without running multiple pools. Cache-style
854/// keys can pin to `DC_ONE` while transactional keys sit on
855/// `DC_EACH_SAFE_QUORUM`; the same dynomited binary serves both.
856///
857/// `n_val` caps the replica fan-out for the lifetime of one
858/// request: when the topology offers more replicas than `n_val`,
859/// the dispatcher takes the first `n_val` (the existing rack /
860/// DC ordering puts preferred replicas first). A value of `0`
861/// means "no cap" and is treated identically to omitting the
862/// field.
863///
864/// # Examples
865///
866/// ```
867/// use dynomite::conf::{ConfBucketType, ConsistencyLevel};
868/// let bt = ConfBucketType {
869///     name: "sessions".into(),
870///     read_consistency: "DC_QUORUM".into(),
871///     write_consistency: "DC_EACH_SAFE_QUORUM".into(),
872///     n_val: 3,
873/// };
874/// assert_eq!(bt.name, "sessions");
875/// assert_eq!(
876///     ConsistencyLevel::parse("read_consistency", &bt.read_consistency).unwrap(),
877///     ConsistencyLevel::DcQuorum,
878/// );
879/// assert_eq!(bt.n_val, 3);
880/// ```
881#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
882#[serde(deny_unknown_fields)]
883pub struct ConfBucketType {
884    /// Bucket name. Compared verbatim against the bytes returned
885    /// by [`crate::proto::redis::bucket_name`]; no normalisation
886    /// is performed. Names must be unique within a pool and must
887    /// not be empty.
888    pub name: String,
889    /// Read-side consistency level for keys in this bucket.
890    /// Stored as a string so the YAML round-trip is
891    /// human-readable; parsed via
892    /// [`ConsistencyLevel::parse`](crate::conf::ConsistencyLevel::parse)
893    /// during validation.
894    pub read_consistency: String,
895    /// Write-side consistency level for keys in this bucket.
896    /// See [`ConfBucketType::read_consistency`].
897    pub write_consistency: String,
898    /// Replication-factor cap. The dispatcher trims its replica
899    /// fan-out to at most this many targets. `0` means "no cap";
900    /// any positive value caps the fan-out to the leading
901    /// `n_val` peers (rack-local first).
902    #[serde(default)]
903    pub n_val: u8,
904}
905
906impl ConfBucketType {
907    /// Parse [`Self::read_consistency`] into the typed enum.
908    ///
909    /// # Examples
910    ///
911    /// ```
912    /// use dynomite::conf::{ConfBucketType, ConsistencyLevel};
913    /// let bt = ConfBucketType {
914    ///     name: "s".into(),
915    ///     read_consistency: "DC_QUORUM".into(),
916    ///     write_consistency: "DC_ONE".into(),
917    ///     n_val: 0,
918    /// };
919    /// assert_eq!(bt.read_level().unwrap(), ConsistencyLevel::DcQuorum);
920    /// ```
921    pub fn read_level(&self) -> Result<ConsistencyLevel, ConfError> {
922        ConsistencyLevel::parse("read_consistency", &self.read_consistency)
923    }
924
925    /// Parse [`Self::write_consistency`] into the typed enum.
926    ///
927    /// # Examples
928    ///
929    /// ```
930    /// use dynomite::conf::{ConfBucketType, ConsistencyLevel};
931    /// let bt = ConfBucketType {
932    ///     name: "s".into(),
933    ///     read_consistency: "DC_ONE".into(),
934    ///     write_consistency: "DC_SAFE_QUORUM".into(),
935    ///     n_val: 0,
936    /// };
937    /// assert_eq!(bt.write_level().unwrap(), ConsistencyLevel::DcSafeQuorum);
938    /// ```
939    pub fn write_level(&self) -> Result<ConsistencyLevel, ConfError> {
940        ConsistencyLevel::parse("write_consistency", &self.write_consistency)
941    }
942}
943
944/// Opt-in observability configuration.
945///
946/// Absent or null disables every observability surface. Covers
947/// distributed-tracing today; an OTLP log-appender bridge using
948/// the same fields is scoped as a follow-up.
949///
950/// # Examples
951///
952/// ```
953/// use dynomite::conf::ObservabilityConfig;
954/// let cfg = ObservabilityConfig {
955///     otlp_traces_endpoint: Some("http://collector:4317".into()),
956///     otlp_logs_endpoint: None,
957///     service_name: Some("dynomited".into()),
958///     traces_sampling: Some(0.1),
959/// };
960/// assert!(cfg.otlp_traces_endpoint.is_some());
961/// ```
962#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
963#[serde(deny_unknown_fields, default)]
964pub struct ObservabilityConfig {
965    /// OTLP gRPC endpoint for distributed traces (e.g.
966    /// `http://localhost:4317`). When `None` the binary skips
967    /// the OTel SDK install entirely; tracing keeps using the
968    /// configured `tracing-subscriber` log layer only.
969    pub otlp_traces_endpoint: Option<String>,
970    /// OTLP gRPC endpoint for log records. When set the binary
971    /// installs an `opentelemetry-appender-tracing` bridge
972    /// alongside the local log writer. The wiring is scoped as
973    /// a follow-up; the field is parsed today so YAML files
974    /// authored against the eventual implementation validate.
975    pub otlp_logs_endpoint: Option<String>,
976    /// Service name attached to every emitted span / log record.
977    /// Defaults to `"dynomited"` when unset.
978    pub service_name: Option<String>,
979    /// Trace sampling ratio in `[0.0, 1.0]`. `1.0` records every
980    /// trace, `0.0` records none. Defaults to `1.0` when unset.
981    pub traces_sampling: Option<f64>,
982}
983
984impl ConfPool {
985    /// Resolve the configured [`Distribution`] for the engine.
986    ///
987    /// Folds the legacy `ketama` / `modula` / `random` aliases
988    /// down to [`Distribution::Vnode`] (the only algorithm those
989    /// names ever resolved to in the Rust port). Emits a
990    /// `tracing::warn!` for the legacy aliases the first time
991    /// the field is read so the operator notices the
992    /// deprecation.
993    ///
994    /// # Examples
995    ///
996    /// ```
997    /// use dynomite::conf::{ConfPool, Distribution};
998    /// let p = ConfPool {
999    ///     distribution: Some(Distribution::RandomSlicing),
1000    ///     ..ConfPool::default()
1001    /// };
1002    /// assert_eq!(p.resolved_distribution(), Distribution::RandomSlicing);
1003    /// ```
1004    #[must_use]
1005    pub fn resolved_distribution(&self) -> Distribution {
1006        match self.distribution {
1007            None | Some(Distribution::Vnode) => Distribution::Vnode,
1008            Some(Distribution::RandomSlicing) => Distribution::RandomSlicing,
1009            Some(other) => {
1010                tracing::warn!(
1011                    target: "dynomite::conf",
1012                    distribution = other.as_str(),
1013                    "distribution mode '{}' is a legacy alias and resolves to 'vnode'; \
1014                     update the YAML to either 'vnode' or 'random_slicing'",
1015                    other
1016                );
1017                Distribution::Vnode
1018            }
1019        }
1020    }
1021}
1022
1023impl ConfPool {
1024    /// Apply defaults to any field still left `None` after parsing.
1025    ///
1026    /// # Examples
1027    ///
1028    /// ```
1029    /// use dynomite::conf::ConfPool;
1030    /// let mut p = ConfPool::default();
1031    /// p.apply_defaults();
1032    /// assert_eq!(p.timeout, Some(5_000));
1033    /// assert_eq!(p.rack.as_deref(), Some("localrack"));
1034    /// ```
1035    pub fn apply_defaults(&mut self) {
1036        if self.dyn_seed_provider.is_none() {
1037            self.dyn_seed_provider = Some(defaults::SEED_PROVIDER.to_string());
1038        }
1039        if self.hash.is_none() {
1040            self.hash = Some(HashType::Murmur);
1041        }
1042        if self.timeout.is_none() {
1043            self.timeout = Some(defaults::TIMEOUT_MS);
1044        }
1045        if self.backlog.is_none() {
1046            self.backlog = Some(defaults::LISTEN_BACKLOG);
1047        }
1048        // client_connections is unconditionally reset to its default
1049        // by the C validator; we mirror that exactly.
1050        self.client_connections = Some(defaults::CLIENT_CONNECTIONS);
1051        if self.data_store.is_none() {
1052            self.data_store = Some(defaults::DATA_STORE);
1053        }
1054        if self.preconnect.is_none() {
1055            self.preconnect = Some(defaults::PRECONNECT);
1056        }
1057        if self.auto_eject_hosts.is_none() {
1058            self.auto_eject_hosts = Some(defaults::AUTO_EJECT_HOSTS);
1059        }
1060        if self.server_retry_timeout.is_none() {
1061            self.server_retry_timeout = Some(defaults::SERVER_RETRY_TIMEOUT_MS);
1062        }
1063        if self.server_failure_limit.is_none() {
1064            self.server_failure_limit = Some(defaults::SERVER_FAILURE_LIMIT);
1065        }
1066        if self.dyn_read_timeout.is_none() {
1067            self.dyn_read_timeout = Some(defaults::DYN_READ_TIMEOUT_MS);
1068        }
1069        if self.dyn_write_timeout.is_none() {
1070            self.dyn_write_timeout = Some(defaults::DYN_WRITE_TIMEOUT_MS);
1071        }
1072        if self.dyn_connections.is_none() {
1073            self.dyn_connections = Some(defaults::DYN_CONNECTIONS);
1074        }
1075        if self.gos_interval.is_none() {
1076            self.gos_interval = Some(defaults::GOS_INTERVAL_MS);
1077        }
1078        if self.conn_msg_rate.is_none() {
1079            self.conn_msg_rate = Some(defaults::CONN_MSG_RATE);
1080        }
1081        if self.rack.is_none() {
1082            self.rack = Some(defaults::RACK.to_string());
1083        }
1084        if self.datacenter.is_none() {
1085            self.datacenter = Some(defaults::DC.to_string());
1086        }
1087        if self.secure_server_option.is_none() {
1088            self.secure_server_option = Some(defaults::SECURE_SERVER_OPTION.to_string());
1089        }
1090        if self.read_consistency.is_none() {
1091            self.read_consistency = Some(defaults::CONSISTENCY.to_string());
1092        }
1093        if self.write_consistency.is_none() {
1094            self.write_consistency = Some(defaults::CONSISTENCY.to_string());
1095        }
1096        if self.stats_interval.is_none() {
1097            self.stats_interval = Some(defaults::STATS_INTERVAL_MS);
1098        }
1099        if self.stats_listen.is_none() {
1100            // Safe: the constant is a hard-coded valid pname.
1101            self.stats_listen = Some(
1102                ConfListen::parse("stats_listen", defaults::STATS_PNAME)
1103                    .expect("invariant: STATS_PNAME constant is valid"),
1104            );
1105        }
1106        if self.env.is_none() {
1107            self.env = Some(defaults::ENV.to_string());
1108        }
1109        if self.pem_key_file.is_none() {
1110            self.pem_key_file = Some(defaults::PEM_KEY_FILE.to_string());
1111        }
1112        if self.recon_key_file.is_none() {
1113            self.recon_key_file = Some(defaults::RECON_KEY_FILE.to_string());
1114        }
1115        if self.recon_iv_file.is_none() {
1116            self.recon_iv_file = Some(defaults::RECON_IV_FILE.to_string());
1117        }
1118        if self.recon_interval_seconds.is_none() {
1119            self.recon_interval_seconds = Some(defaults::RECON_INTERVAL_SECONDS);
1120        }
1121        if self.datastore_connections.is_none() {
1122            self.datastore_connections = Some(defaults::DATASTORE_CONNECTIONS);
1123        }
1124        if self.local_peer_connections.is_none() {
1125            self.local_peer_connections = Some(defaults::LOCAL_PEER_CONNECTIONS);
1126        }
1127        if self.remote_peer_connections.is_none() {
1128            self.remote_peer_connections = Some(defaults::REMOTE_PEER_CONNECTIONS);
1129        }
1130        if self.read_repairs_enabled.is_none() {
1131            self.read_repairs_enabled = Some(false);
1132        }
1133        if self.enable_gossip.is_none() {
1134            self.enable_gossip = Some(false);
1135        }
1136        self.apply_hinted_handoff_defaults();
1137    }
1138
1139    /// Fill in the hinted-handoff knobs. Factored out of
1140    /// [`Self::apply_defaults`] to keep the parent method under
1141    /// the project's per-function line budget while still
1142    /// covering every hinted-handoff key with a default.
1143    fn apply_hinted_handoff_defaults(&mut self) {
1144        if self.enable_hinted_handoff.is_none() {
1145            self.enable_hinted_handoff = Some(defaults::ENABLE_HINTED_HANDOFF);
1146        }
1147        if self.hint_ttl_seconds.is_none() {
1148            self.hint_ttl_seconds = Some(defaults::HINT_TTL_SECONDS);
1149        }
1150        if self.hint_store_max_bytes.is_none() {
1151            self.hint_store_max_bytes = Some(defaults::HINT_STORE_MAX_BYTES);
1152        }
1153        if self.hint_drain_interval_ms.is_none() {
1154            self.hint_drain_interval_ms = Some(defaults::HINT_DRAIN_INTERVAL_MS);
1155        }
1156    }
1157
1158    /// Run the full validation pass against the (presumably finalized)
1159    /// pool body.
1160    ///
1161    /// # Examples
1162    ///
1163    /// ```
1164    /// use dynomite::conf::{ConfListen, ConfPool, ConfServer, Servers, TokenList};
1165    /// let mut p = ConfPool {
1166    ///     listen: Some(ConfListen::parse("listen", "127.0.0.1:8102").unwrap()),
1167    ///     servers: Some(Servers::from_vec(vec![ConfServer::parse("127.0.0.1:6379:1").unwrap()])),
1168    ///     tokens: Some(TokenList::parse("0").unwrap()),
1169    ///     ..ConfPool::default()
1170    /// };
1171    /// p.apply_defaults();
1172    /// assert!(p.validate("dyn_o_mite").is_ok());
1173    /// ```
1174    pub fn validate(&self, pool_name: &str) -> Result<(), ConfError> {
1175        if pool_name.is_empty() {
1176            return Err(ConfError::EmptyPoolName);
1177        }
1178
1179        if self.listen.is_none() {
1180            return Err(ConfError::MissingRequired("listen"));
1181        }
1182
1183        self.validate_numeric_ranges()?;
1184        self.validate_mbuf_size()?;
1185        self.validate_max_msgs()?;
1186
1187        if let Some(n) = self.data_store {
1188            let ds = DataStore::from_int(n)?;
1189            if ds == DataStore::Noxu {
1190                self.validate_noxu()?;
1191            }
1192        }
1193        if let Some(tag) = &self.hash_tag {
1194            if tag.chars().count() != 2 {
1195                return Err(ConfError::BadHashTag(tag.clone()));
1196            }
1197        }
1198
1199        let secure = if let Some(s) = &self.secure_server_option {
1200            SecureServerOption::parse(s)?
1201        } else {
1202            SecureServerOption::None
1203        };
1204        if let Some(s) = &self.read_consistency {
1205            ConsistencyLevel::parse("read_consistency", s)?;
1206        }
1207        if let Some(s) = &self.write_consistency {
1208            ConsistencyLevel::parse("write_consistency", s)?;
1209        }
1210        if secure != SecureServerOption::None {
1211            match &self.pem_key_file {
1212                Some(s) if !s.is_empty() => {}
1213                _ => return Err(ConfError::MissingRequired("pem_key_file")),
1214            }
1215        }
1216
1217        if let Some(s) = &self.log_format {
1218            crate::core::log::LogFormat::parse(s).map_err(|e| ConfError::BadServer {
1219                field: "log_format",
1220                value: s.clone(),
1221                reason: e.to_string(),
1222            })?;
1223        }
1224
1225        self.validate_bucket_types()?;
1226        self.validate_hinted_handoff()?;
1227        self.validate_peer_tls()?;
1228        if let Some(r) = &self.riak {
1229            r.validate()?;
1230        }
1231
1232        match &self.servers {
1233            None => return Err(ConfError::MissingRequired("servers")),
1234            Some(s) if s.is_empty() => return Err(ConfError::MissingRequired("servers")),
1235            Some(s) if s.len() > 1 => {
1236                return Err(ConfError::BadServer {
1237                    field: "servers",
1238                    value: s.len().to_string(),
1239                    reason: "expected exactly one datastore entry".to_string(),
1240                });
1241            }
1242            Some(_) => {}
1243        }
1244
1245        Ok(())
1246    }
1247
1248    fn validate_numeric_ranges(&self) -> Result<(), ConfError> {
1249        check_positive("timeout", self.timeout)?;
1250        check_positive("backlog", self.backlog)?;
1251        check_non_negative("client_connections", self.client_connections)?;
1252        check_positive("server_retry_timeout", self.server_retry_timeout)?;
1253        check_positive("server_failure_limit", self.server_failure_limit)?;
1254        check_positive("dyn_read_timeout", self.dyn_read_timeout)?;
1255        check_positive("dyn_write_timeout", self.dyn_write_timeout)?;
1256        check_positive("gos_interval", self.gos_interval)?;
1257        check_positive("stats_interval", self.stats_interval)?;
1258
1259        if let Some(n) = self.dyn_connections {
1260            if n <= 0 {
1261                return Err(ConfError::OutOfRange {
1262                    field: "dyn_connections",
1263                    value: n,
1264                    reason: "must be a positive non-zero number",
1265                });
1266            }
1267        }
1268        Ok(())
1269    }
1270
1271    fn validate_mbuf_size(&self) -> Result<(), ConfError> {
1272        let Some(n) = self.mbuf_size else {
1273            return Ok(());
1274        };
1275        if n <= 0 {
1276            return Err(ConfError::OutOfRange {
1277                field: "mbuf_size",
1278                value: n,
1279                reason: "must be a positive number",
1280            });
1281        }
1282        if !(defaults::MBUF_MIN_SIZE..=defaults::MBUF_MAX_SIZE).contains(&n) {
1283            return Err(ConfError::OutOfRange {
1284                field: "mbuf_size",
1285                value: n,
1286                reason: "must be between 512 and 512000 bytes",
1287            });
1288        }
1289        if n % 16 != 0 {
1290            return Err(ConfError::OutOfRange {
1291                field: "mbuf_size",
1292                value: n,
1293                reason: "must be a multiple of 16",
1294            });
1295        }
1296        Ok(())
1297    }
1298
1299    fn validate_max_msgs(&self) -> Result<(), ConfError> {
1300        let Some(n) = self.max_msgs else {
1301            return Ok(());
1302        };
1303        if n <= 0 {
1304            return Err(ConfError::OutOfRange {
1305                field: "max_msgs",
1306                value: n,
1307                reason: "requires a non-zero number",
1308            });
1309        }
1310        if !(defaults::ALLOC_MSGS_MIN..=defaults::ALLOC_MSGS_MAX).contains(&n) {
1311            return Err(ConfError::OutOfRange {
1312                field: "max_msgs",
1313                value: n,
1314                reason: "must be between 100000 and 1000000 messages",
1315            });
1316        }
1317        Ok(())
1318    }
1319
1320    fn validate_bucket_types(&self) -> Result<(), ConfError> {
1321        use std::collections::BTreeSet;
1322        let mut seen: BTreeSet<&str> = BTreeSet::new();
1323        for bt in &self.bucket_types {
1324            if bt.name.is_empty() {
1325                return Err(ConfError::BadServer {
1326                    field: "bucket_types",
1327                    value: String::new(),
1328                    reason: "bucket-type name must not be empty".to_string(),
1329                });
1330            }
1331            if !seen.insert(bt.name.as_str()) {
1332                return Err(ConfError::BadServer {
1333                    field: "bucket_types",
1334                    value: bt.name.clone(),
1335                    reason: "duplicate bucket-type name".to_string(),
1336                });
1337            }
1338            ConsistencyLevel::parse("read_consistency", &bt.read_consistency)?;
1339            ConsistencyLevel::parse("write_consistency", &bt.write_consistency)?;
1340        }
1341        if let Some(name) = &self.default_bucket_type {
1342            if !self.bucket_types.iter().any(|bt| &bt.name == name) {
1343                return Err(ConfError::BadServer {
1344                    field: "default_bucket_type",
1345                    value: name.clone(),
1346                    reason: "references an undefined bucket-type name".to_string(),
1347                });
1348            }
1349        }
1350        Ok(())
1351    }
1352
1353    /// Validate the cross-field invariants of the Noxu
1354    /// datastore selection.
1355    ///
1356    /// Selecting `data_store: noxu` is permitted only when the
1357    /// binary was built with `--features riak`; without it,
1358    /// `dynomited` cannot construct a `NoxuDatastore` because
1359    /// the `dyniak` crate (which owns the type) is not
1360    /// linked. The check is gated on a `cfg!(feature = ...)`
1361    /// expression that the parent crate threads through via
1362    /// the [`crate::conf::set_noxu_supported`] toggle: the
1363    /// engine ships with the toggle off, the `dynomited` binary
1364    /// turns it on under `--features riak`. The toggle is
1365    /// global because `data_store: noxu` is a build-time
1366    /// configuration constraint, not a per-pool one.
1367    ///
1368    /// `noxu_path:` must be set and non-empty.
1369    fn validate_noxu(&self) -> Result<(), ConfError> {
1370        if !crate::conf::is_noxu_supported() {
1371            return Err(ConfError::BadNoxuConfig(
1372                "noxu data_store requires dynomited built with --features riak",
1373            ));
1374        }
1375        match self.noxu_path.as_deref() {
1376            Some(p) if !p.as_os_str().is_empty() => Ok(()),
1377            _ => Err(ConfError::BadNoxuConfig(
1378                "data_store: noxu requires a non-empty 'noxu_path:' directive",
1379            )),
1380        }
1381    }
1382
1383    fn validate_hinted_handoff(&self) -> Result<(), ConfError> {
1384        if self.enable_hinted_handoff != Some(true) {
1385            return Ok(());
1386        }
1387        if let Some(ttl) = self.hint_ttl_seconds {
1388            if ttl == 0 {
1389                return Err(ConfError::BadServer {
1390                    field: "hint_ttl_seconds",
1391                    value: ttl.to_string(),
1392                    reason: "must be a positive number when enable_hinted_handoff is true"
1393                        .to_string(),
1394                });
1395            }
1396        }
1397        if let Some(cap) = self.hint_store_max_bytes {
1398            if cap == 0 {
1399                return Err(ConfError::BadServer {
1400                    field: "hint_store_max_bytes",
1401                    value: cap.to_string(),
1402                    reason: "must be a positive number when enable_hinted_handoff is true"
1403                        .to_string(),
1404                });
1405            }
1406        }
1407        if let Some(period) = self.hint_drain_interval_ms {
1408            if period == 0 {
1409                return Err(ConfError::BadServer {
1410                    field: "hint_drain_interval_ms",
1411                    value: period.to_string(),
1412                    reason: "must be a positive number when enable_hinted_handoff is true"
1413                        .to_string(),
1414                });
1415            }
1416        }
1417        Ok(())
1418    }
1419
1420    /// Cross-check the peer-plane TLS knobs.
1421    ///
1422    /// `peer_tls_cert` and `peer_tls_key` must both be set or
1423    /// both be unset. `peer_tls_ca` is independent (it controls
1424    /// optional mutual TLS) but only meaningful when the cert /
1425    /// key pair is set. Each per-DC profile in
1426    /// `peer_tls_profiles` is validated by
1427    /// [`ConfTlsProfile::validate`]; the per-DC profile names
1428    /// must be non-empty.
1429    fn validate_peer_tls(&self) -> Result<(), ConfError> {
1430        validate_tls_pair(
1431            "peer_tls_cert",
1432            "peer_tls_key",
1433            self.peer_tls_cert.as_deref(),
1434            self.peer_tls_key.as_deref(),
1435        )?;
1436        if self.peer_tls_ca.is_some() && self.peer_tls_cert.is_none() {
1437            return Err(ConfError::BadServer {
1438                field: "peer_tls_ca",
1439                value: self
1440                    .peer_tls_ca
1441                    .as_ref()
1442                    .map_or_else(String::new, |p| p.display().to_string()),
1443                reason: "requires peer_tls_cert and peer_tls_key to also be set".into(),
1444            });
1445        }
1446        for (dc, profile) in &self.peer_tls_profiles {
1447            if dc.is_empty() {
1448                return Err(ConfError::BadServer {
1449                    field: "peer_tls_profiles",
1450                    value: String::new(),
1451                    reason: "per-DC TLS profile name must not be empty".into(),
1452                });
1453            }
1454            profile.validate(dc)?;
1455        }
1456        Ok(())
1457    }
1458}
1459
1460impl fmt::Display for ConfPool {
1461    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1462        // We render the pool body by re-serializing through serde_yaml
1463        // so the round-trip is well defined; this is used by `test_conf`
1464        // and rustdoc examples.
1465        match serde_yaml::to_string(self) {
1466            Ok(s) => f.write_str(&s),
1467            Err(_) => Err(fmt::Error),
1468        }
1469    }
1470}
1471
1472fn check_positive(field: &'static str, v: Option<i64>) -> Result<(), ConfError> {
1473    if let Some(n) = v {
1474        if n <= 0 {
1475            return Err(ConfError::OutOfRange {
1476                field,
1477                value: n,
1478                reason: "must be a positive number",
1479            });
1480        }
1481    }
1482    Ok(())
1483}
1484
1485fn check_non_negative(field: &'static str, v: Option<i64>) -> Result<(), ConfError> {
1486    if let Some(n) = v {
1487        if n < 0 {
1488            return Err(ConfError::OutOfRange {
1489                field,
1490                value: n,
1491                reason: "must be a non-negative number",
1492            });
1493        }
1494    }
1495    Ok(())
1496}
1497
1498/// Custom deserializer for `data_store:` that accepts either the
1499/// historical integer form (`0`, `1`, `2`) or the textual form
1500/// (`redis`, `memcache`, `noxu`). Both shapes normalise to the
1501/// integer code that the rest of the engine consumes.
1502fn deserialize_data_store<'de, D>(de: D) -> Result<Option<i64>, D::Error>
1503where
1504    D: serde::Deserializer<'de>,
1505{
1506    use serde::de::{self, Visitor};
1507    use std::fmt;
1508
1509    struct V;
1510    impl<'de> Visitor<'de> for V {
1511        type Value = Option<i64>;
1512
1513        fn expecting(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1514            f.write_str("a data_store value: integer (0, 1, 2) or string (redis, memcache, noxu)")
1515        }
1516
1517        fn visit_none<E: de::Error>(self) -> Result<Self::Value, E> {
1518            Ok(None)
1519        }
1520
1521        fn visit_unit<E: de::Error>(self) -> Result<Self::Value, E> {
1522            Ok(None)
1523        }
1524
1525        fn visit_some<D2: serde::Deserializer<'de>>(
1526            self,
1527            de: D2,
1528        ) -> Result<Self::Value, D2::Error> {
1529            de.deserialize_any(V)
1530        }
1531
1532        fn visit_i64<E: de::Error>(self, v: i64) -> Result<Self::Value, E> {
1533            DataStore::from_int(v)
1534                .map(|d| Some(d.as_int()))
1535                .map_err(|e| E::custom(e.to_string()))
1536        }
1537
1538        fn visit_u64<E: de::Error>(self, v: u64) -> Result<Self::Value, E> {
1539            let n = i64::try_from(v).map_err(|_| E::custom("data_store integer overflow"))?;
1540            self.visit_i64(n)
1541        }
1542
1543        fn visit_str<E: de::Error>(self, v: &str) -> Result<Self::Value, E> {
1544            DataStore::from_name(v)
1545                .map(|d| Some(d.as_int()))
1546                .map_err(|_| {
1547                    E::custom(format!(
1548                        "data_store: unknown name '{v}'; expected one of: redis, memcache, noxu"
1549                    ))
1550                })
1551        }
1552
1553        fn visit_string<E: de::Error>(self, v: String) -> Result<Self::Value, E> {
1554            self.visit_str(&v)
1555        }
1556    }
1557
1558    de.deserialize_any(V)
1559}
1560
1561#[cfg(test)]
1562mod tests {
1563    use super::*;
1564
1565    fn pool() -> ConfPool {
1566        ConfPool {
1567            listen: Some(ConfListen::parse("listen", "127.0.0.1:8102").unwrap()),
1568            servers: Some(Servers::from_vec(vec![ConfServer::parse(
1569                "127.0.0.1:6379:1",
1570            )
1571            .unwrap()])),
1572            tokens: Some(TokenList::parse("0").unwrap()),
1573            ..ConfPool::default()
1574        }
1575    }
1576
1577    #[test]
1578    fn validate_minimal_post_finalize() {
1579        let mut p = pool();
1580        p.apply_defaults();
1581        p.validate("dyn_o_mite").unwrap();
1582    }
1583
1584    #[test]
1585    fn missing_listen_rejected() {
1586        let mut p = pool();
1587        p.listen = None;
1588        p.apply_defaults();
1589        assert!(matches!(
1590            p.validate("p"),
1591            Err(ConfError::MissingRequired("listen"))
1592        ));
1593    }
1594
1595    #[test]
1596    fn out_of_range_mbuf_rejected() {
1597        let mut p = pool();
1598        p.mbuf_size = Some(127);
1599        p.apply_defaults();
1600        assert!(matches!(p.validate("p"), Err(ConfError::OutOfRange { .. })));
1601    }
1602
1603    #[test]
1604    fn distribution_field_round_trips_through_yaml() {
1605        let yaml = r"
1606p:
1607  listen: 127.0.0.1:8102
1608  dyn_listen: 127.0.0.1:8101
1609  tokens: '0'
1610  servers:
1611  - 127.0.0.1:6379:1
1612  data_store: 0
1613  distribution: random_slicing
1614  distribution_shadow: vnode
1615  hash: murmur3_x64_64
1616";
1617        let parsed: std::collections::BTreeMap<String, ConfPool> =
1618            serde_yaml::from_str(yaml).unwrap();
1619        let pool = parsed.get("p").unwrap();
1620        assert_eq!(pool.distribution, Some(Distribution::RandomSlicing));
1621        assert_eq!(pool.distribution_shadow, Some(Distribution::Vnode));
1622        assert_eq!(pool.hash, Some(HashType::Murmur3X64_64));
1623        assert_eq!(pool.resolved_distribution(), Distribution::RandomSlicing);
1624    }
1625
1626    #[test]
1627    fn distribution_legacy_alias_resolves_to_vnode() {
1628        let mut p = pool();
1629        p.distribution = Some(Distribution::Ketama);
1630        assert_eq!(p.resolved_distribution(), Distribution::Vnode);
1631        p.distribution = Some(Distribution::Modula);
1632        assert_eq!(p.resolved_distribution(), Distribution::Vnode);
1633        p.distribution = Some(Distribution::Random);
1634        assert_eq!(p.resolved_distribution(), Distribution::Vnode);
1635    }
1636
1637    #[test]
1638    fn distribution_default_unset_is_vnode() {
1639        let p = pool();
1640        assert!(p.distribution.is_none());
1641        assert_eq!(p.resolved_distribution(), Distribution::Vnode);
1642    }
1643
1644    #[test]
1645    fn mbuf_size_not_multiple_of_16_rejected() {
1646        let mut p = pool();
1647        p.mbuf_size = Some(513);
1648        p.apply_defaults();
1649        assert!(matches!(p.validate("p"), Err(ConfError::OutOfRange { .. })));
1650    }
1651
1652    #[test]
1653    fn pem_required_when_secure() {
1654        let mut p = pool();
1655        p.secure_server_option = Some("datacenter".to_string());
1656        p.pem_key_file = Some(String::new());
1657        p.apply_defaults();
1658        // apply_defaults restores pem_key_file because it's `Some("")`,
1659        // which is non-None; so we expect MissingRequired("pem_key_file").
1660        assert!(matches!(
1661            p.validate("p"),
1662            Err(ConfError::MissingRequired("pem_key_file"))
1663        ));
1664    }
1665
1666    #[test]
1667    fn data_store_out_of_range_rejected() {
1668        let mut p = pool();
1669        p.data_store = Some(7);
1670        p.apply_defaults();
1671        assert!(matches!(p.validate("p"), Err(ConfError::BadDataStore(7))));
1672    }
1673
1674    /// Lock serialising tests that mutate the process-wide
1675    /// `NOXU_SUPPORTED` flag. cargo test runs tests on multiple
1676    /// threads; without serialisation a parallel test can flip
1677    /// the flag back before the assertion runs.
1678    static NOXU_FLAG_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
1679
1680    #[test]
1681    fn data_store_noxu_requires_riak_feature() {
1682        let _g = NOXU_FLAG_LOCK
1683            .lock()
1684            .unwrap_or_else(std::sync::PoisonError::into_inner);
1685        // Default state: noxu support flag is off; selecting
1686        // noxu must be rejected with the documented message.
1687        let prev = crate::conf::is_noxu_supported();
1688        crate::conf::set_noxu_supported(false);
1689        let mut p = pool();
1690        p.data_store = Some(2);
1691        p.noxu_path = Some("/tmp/test".into());
1692        p.apply_defaults();
1693        let err = p.validate("p");
1694        crate::conf::set_noxu_supported(prev);
1695        match err {
1696            Err(ConfError::BadNoxuConfig(msg)) => {
1697                assert!(msg.contains("--features riak"), "unexpected message: {msg}");
1698            }
1699            other => panic!("expected BadNoxuConfig, got {other:?}"),
1700        }
1701    }
1702
1703    #[test]
1704    fn data_store_noxu_requires_path() {
1705        let _g = NOXU_FLAG_LOCK
1706            .lock()
1707            .unwrap_or_else(std::sync::PoisonError::into_inner);
1708        let prev = crate::conf::is_noxu_supported();
1709        crate::conf::set_noxu_supported(true);
1710        let mut p = pool();
1711        p.data_store = Some(2);
1712        p.noxu_path = None;
1713        p.apply_defaults();
1714        let err = p.validate("p");
1715        crate::conf::set_noxu_supported(prev);
1716        match err {
1717            Err(ConfError::BadNoxuConfig(msg)) => {
1718                assert!(msg.contains("noxu_path"), "unexpected message: {msg}");
1719            }
1720            other => panic!("expected BadNoxuConfig, got {other:?}"),
1721        }
1722    }
1723
1724    #[test]
1725    fn data_store_noxu_yaml_round_trip_string_form() {
1726        // String form `data_store: noxu` and integer form
1727        // `data_store: 2` both normalise to integer 2 on parse.
1728        let yaml = r"
1729listen: 127.0.0.1:8102
1730servers:
1731- 127.0.0.1:6379:1
1732tokens: '0'
1733data_store: noxu
1734noxu_path: /tmp/test
1735";
1736        let p: ConfPool = serde_yaml::from_str(yaml).unwrap();
1737        assert_eq!(p.data_store, Some(2));
1738        assert_eq!(
1739            p.noxu_path.as_deref(),
1740            Some(std::path::Path::new("/tmp/test"))
1741        );
1742        // Re-emit and re-parse: round-trip is stable.
1743        let dumped = serde_yaml::to_string(&p).unwrap();
1744        let p2: ConfPool = serde_yaml::from_str(&dumped).unwrap();
1745        assert_eq!(p2.data_store, p.data_store);
1746        assert_eq!(p2.noxu_path, p.noxu_path);
1747    }
1748
1749    #[test]
1750    fn data_store_yaml_int_form_still_works() {
1751        let yaml = r"
1752listen: 127.0.0.1:8102
1753servers:
1754- 127.0.0.1:6379:1
1755tokens: '0'
1756data_store: 2
1757noxu_path: /tmp/test
1758";
1759        let p: ConfPool = serde_yaml::from_str(yaml).unwrap();
1760        assert_eq!(p.data_store, Some(2));
1761    }
1762
1763    #[test]
1764    fn data_store_string_form_unknown_rejected() {
1765        let yaml = r"
1766listen: 127.0.0.1:8102
1767servers:
1768- 127.0.0.1:6379:1
1769tokens: '0'
1770data_store: postgres
1771";
1772        let err = serde_yaml::from_str::<ConfPool>(yaml).unwrap_err();
1773        let msg = err.to_string();
1774        assert!(
1775            msg.contains("unknown name") || msg.contains("data_store"),
1776            "unexpected message: {msg}"
1777        );
1778    }
1779
1780    #[test]
1781    fn hash_tag_must_be_two_chars() {
1782        let mut p = pool();
1783        p.hash_tag = Some("abc".to_string());
1784        p.apply_defaults();
1785        assert!(matches!(p.validate("p"), Err(ConfError::BadHashTag(_))));
1786    }
1787
1788    #[test]
1789    fn empty_servers_rejected() {
1790        let mut p = pool();
1791        p.servers = Some(Servers::from_vec(vec![]));
1792        p.apply_defaults();
1793        assert!(matches!(
1794            p.validate("p"),
1795            Err(ConfError::MissingRequired("servers"))
1796        ));
1797    }
1798
1799    #[test]
1800    fn log_format_known_values_accepted() {
1801        for value in ["default", "rfc5424", "rfc3164", "json", "ndjson", "DEFAULT"] {
1802            let mut p = pool();
1803            p.log_format = Some(value.to_string());
1804            p.apply_defaults();
1805            assert!(p.validate("p").is_ok(), "value {value:?} should validate");
1806        }
1807    }
1808
1809    #[test]
1810    fn log_format_unknown_rejected() {
1811        let mut p = pool();
1812        p.log_format = Some("yaml".to_string());
1813        p.apply_defaults();
1814        let err = p.validate("p").unwrap_err();
1815        assert!(
1816            matches!(
1817                err,
1818                ConfError::BadServer {
1819                    field: "log_format",
1820                    ..
1821                }
1822            ),
1823            "unexpected error: {err:?}"
1824        );
1825    }
1826
1827    #[test]
1828    fn observability_block_round_trips() {
1829        let yaml = r"
1830observability:
1831  otlp_logs_endpoint: http://collector:4317
1832  service_name: dynomited
1833listen: 127.0.0.1:8102
1834servers:
1835- 127.0.0.1:6379:1
1836tokens: '0'
1837";
1838        let p: ConfPool = serde_yaml::from_str(yaml).unwrap();
1839        let obs = p.observability.as_ref().expect("observability set");
1840        assert_eq!(
1841            obs.otlp_logs_endpoint.as_deref(),
1842            Some("http://collector:4317")
1843        );
1844        assert_eq!(obs.service_name.as_deref(), Some("dynomited"));
1845    }
1846
1847    #[test]
1848    fn bucket_types_round_trip() {
1849        let yaml = r"
1850listen: 127.0.0.1:8102
1851servers:
1852- 127.0.0.1:6379:1
1853tokens: '0'
1854bucket_types:
1855- name: hot
1856  read_consistency: DC_QUORUM
1857  write_consistency: DC_EACH_SAFE_QUORUM
1858  n_val: 3
1859- name: cold
1860  read_consistency: DC_ONE
1861  write_consistency: DC_ONE
1862  n_val: 1
1863default_bucket_type: cold
1864";
1865        let p: ConfPool = serde_yaml::from_str(yaml).unwrap();
1866        assert_eq!(p.bucket_types.len(), 2);
1867        assert_eq!(p.bucket_types[0].name, "hot");
1868        assert_eq!(p.bucket_types[0].n_val, 3);
1869        assert_eq!(
1870            p.bucket_types[0].read_level().unwrap(),
1871            crate::conf::ConsistencyLevel::DcQuorum,
1872        );
1873        assert_eq!(p.default_bucket_type.as_deref(), Some("cold"));
1874        // Re-emit and re-parse: round-trip preserves the data.
1875        let dumped = serde_yaml::to_string(&p).unwrap();
1876        let p2: ConfPool = serde_yaml::from_str(&dumped).unwrap();
1877        assert_eq!(p2.bucket_types, p.bucket_types);
1878        assert_eq!(p2.default_bucket_type, p.default_bucket_type);
1879    }
1880
1881    #[test]
1882    fn bucket_types_default_is_empty() {
1883        let mut p = pool();
1884        p.apply_defaults();
1885        assert!(p.bucket_types.is_empty());
1886        assert!(p.default_bucket_type.is_none());
1887        assert!(p.validate("p").is_ok());
1888    }
1889
1890    #[test]
1891    fn duplicate_bucket_type_name_rejected() {
1892        let mut p = pool();
1893        p.bucket_types = vec![
1894            ConfBucketType {
1895                name: "a".into(),
1896                read_consistency: "DC_ONE".into(),
1897                write_consistency: "DC_ONE".into(),
1898                n_val: 0,
1899            },
1900            ConfBucketType {
1901                name: "a".into(),
1902                read_consistency: "DC_ONE".into(),
1903                write_consistency: "DC_ONE".into(),
1904                n_val: 0,
1905            },
1906        ];
1907        p.apply_defaults();
1908        let err = p.validate("p").unwrap_err();
1909        assert!(
1910            matches!(
1911                err,
1912                ConfError::BadServer {
1913                    field: "bucket_types",
1914                    ..
1915                }
1916            ),
1917            "unexpected error: {err:?}",
1918        );
1919    }
1920
1921    #[test]
1922    fn bucket_type_unknown_consistency_rejected() {
1923        let mut p = pool();
1924        p.bucket_types = vec![ConfBucketType {
1925            name: "a".into(),
1926            read_consistency: "DC_PURPLE".into(),
1927            write_consistency: "DC_ONE".into(),
1928            n_val: 0,
1929        }];
1930        p.apply_defaults();
1931        let err = p.validate("p").unwrap_err();
1932        assert!(matches!(err, ConfError::BadConsistency { .. }));
1933    }
1934
1935    #[test]
1936    fn unknown_default_bucket_type_rejected() {
1937        let mut p = pool();
1938        p.default_bucket_type = Some("missing".into());
1939        p.apply_defaults();
1940        let err = p.validate("p").unwrap_err();
1941        assert!(matches!(
1942            err,
1943            ConfError::BadServer {
1944                field: "default_bucket_type",
1945                ..
1946            }
1947        ));
1948    }
1949
1950    #[test]
1951    fn hinted_handoff_default_off_with_canonical_constants() {
1952        let mut p = pool();
1953        p.apply_defaults();
1954        assert_eq!(p.enable_hinted_handoff, Some(false));
1955        assert_eq!(p.hint_ttl_seconds, Some(defaults::HINT_TTL_SECONDS));
1956        assert_eq!(p.hint_store_max_bytes, Some(defaults::HINT_STORE_MAX_BYTES));
1957        assert_eq!(
1958            p.hint_drain_interval_ms,
1959            Some(defaults::HINT_DRAIN_INTERVAL_MS)
1960        );
1961        assert!(p.validate("p").is_ok());
1962    }
1963
1964    #[test]
1965    fn hinted_handoff_yaml_round_trip() {
1966        let yaml = r"
1967listen: 127.0.0.1:8102
1968servers:
1969- 127.0.0.1:6379:1
1970tokens: '0'
1971enable_hinted_handoff: true
1972hint_ttl_seconds: 7200
1973hint_store_max_bytes: 8388608
1974hint_drain_interval_ms: 5000
1975";
1976        let p: ConfPool = serde_yaml::from_str(yaml).unwrap();
1977        assert_eq!(p.enable_hinted_handoff, Some(true));
1978        assert_eq!(p.hint_ttl_seconds, Some(7200));
1979        assert_eq!(p.hint_store_max_bytes, Some(8_388_608));
1980        assert_eq!(p.hint_drain_interval_ms, Some(5_000));
1981        let dumped = serde_yaml::to_string(&p).unwrap();
1982        let p2: ConfPool = serde_yaml::from_str(&dumped).unwrap();
1983        assert_eq!(p2.enable_hinted_handoff, p.enable_hinted_handoff);
1984        assert_eq!(p2.hint_ttl_seconds, p.hint_ttl_seconds);
1985        assert_eq!(p2.hint_store_max_bytes, p.hint_store_max_bytes);
1986        assert_eq!(p2.hint_drain_interval_ms, p.hint_drain_interval_ms);
1987    }
1988
1989    #[test]
1990    fn hinted_handoff_zero_ttl_rejected_when_enabled() {
1991        let mut p = pool();
1992        p.enable_hinted_handoff = Some(true);
1993        p.hint_ttl_seconds = Some(0);
1994        p.apply_defaults();
1995        // apply_defaults() does NOT overwrite Some(0) with the
1996        // default; the validator should reject it.
1997        let err = p.validate("p").unwrap_err();
1998        assert!(matches!(
1999            err,
2000            ConfError::BadServer {
2001                field: "hint_ttl_seconds",
2002                ..
2003            }
2004        ));
2005    }
2006
2007    #[test]
2008    fn hinted_handoff_zero_max_bytes_rejected_when_enabled() {
2009        let mut p = pool();
2010        p.enable_hinted_handoff = Some(true);
2011        p.hint_store_max_bytes = Some(0);
2012        p.apply_defaults();
2013        let err = p.validate("p").unwrap_err();
2014        assert!(matches!(
2015            err,
2016            ConfError::BadServer {
2017                field: "hint_store_max_bytes",
2018                ..
2019            }
2020        ));
2021    }
2022
2023    #[test]
2024    fn hinted_handoff_zero_values_ignored_when_disabled() {
2025        // With handoff off, the validator must NOT reject
2026        // out-of-range values: operators may legitimately leave
2027        // them at zero with handoff off.
2028        let mut p = pool();
2029        p.enable_hinted_handoff = Some(false);
2030        p.hint_ttl_seconds = Some(0);
2031        p.hint_store_max_bytes = Some(0);
2032        p.hint_drain_interval_ms = Some(0);
2033        p.apply_defaults();
2034        assert!(p.validate("p").is_ok());
2035    }
2036
2037    #[test]
2038    fn riak_block_validates_when_unset() {
2039        let mut p = pool();
2040        p.riak = Some(ConfRiak::default());
2041        p.apply_defaults();
2042        assert!(p.validate("p").is_ok());
2043    }
2044
2045    #[test]
2046    fn riak_block_validates_with_addresses() {
2047        let mut p = pool();
2048        p.riak = Some(ConfRiak {
2049            pbc_listen: Some("127.0.0.1:8087".into()),
2050            http_listen: Some("127.0.0.1:8098".into()),
2051            ..ConfRiak::default()
2052        });
2053        p.apply_defaults();
2054        assert!(p.validate("p").is_ok());
2055    }
2056
2057    #[test]
2058    fn riak_block_rejects_bad_pbc_addr() {
2059        let mut p = pool();
2060        p.riak = Some(ConfRiak {
2061            pbc_listen: Some(String::new()),
2062            ..ConfRiak::default()
2063        });
2064        p.apply_defaults();
2065        assert!(matches!(p.validate("p"), Err(ConfError::BadServer { .. })));
2066    }
2067
2068    #[test]
2069    fn riak_block_rejects_segment_above_full_sweep() {
2070        let mut p = pool();
2071        p.riak = Some(ConfRiak {
2072            aae_segment_interval_seconds: Some(120),
2073            aae_full_sweep_interval_seconds: Some(60),
2074            ..ConfRiak::default()
2075        });
2076        p.apply_defaults();
2077        assert!(matches!(p.validate("p"), Err(ConfError::BadServer { .. })));
2078    }
2079
2080    #[test]
2081    fn riak_block_round_trips_through_yaml() {
2082        let yaml = r"
2083p:
2084  listen: 127.0.0.1:1
2085  dyn_listen: 127.0.0.1:2
2086  tokens: '0'
2087  servers:
2088  - 127.0.0.1:3:1
2089  data_store: 0
2090  riak:
2091    pbc_listen: 127.0.0.1:8087
2092    http_listen: 127.0.0.1:8098
2093    aae_enabled: true
2094    aae_full_sweep_interval_seconds: 3600
2095    aae_segment_interval_seconds: 30
2096";
2097        let cfg: std::collections::BTreeMap<String, ConfPool> = serde_yaml::from_str(yaml).unwrap();
2098        let p = cfg.get("p").unwrap();
2099        let r = p.riak.as_ref().unwrap();
2100        assert_eq!(r.pbc_listen.as_deref(), Some("127.0.0.1:8087"));
2101        assert_eq!(r.http_listen.as_deref(), Some("127.0.0.1:8098"));
2102        assert_eq!(r.aae_enabled, Some(true));
2103        assert_eq!(r.aae_full_sweep_interval_seconds, Some(3600));
2104        assert_eq!(r.aae_segment_interval_seconds, Some(30));
2105    }
2106
2107    #[test]
2108    fn peer_tls_pair_unset_is_ok() {
2109        let mut p = pool();
2110        p.apply_defaults();
2111        assert!(p.validate("p").is_ok(), "plaintext default must validate");
2112    }
2113
2114    #[test]
2115    fn peer_tls_pair_both_set_is_ok() {
2116        let mut p = pool();
2117        p.peer_tls_cert = Some(std::path::PathBuf::from("/etc/dynomite/peer.crt"));
2118        p.peer_tls_key = Some(std::path::PathBuf::from("/etc/dynomite/peer.key"));
2119        p.apply_defaults();
2120        assert!(p.validate("p").is_ok());
2121    }
2122
2123    #[test]
2124    fn peer_tls_cert_without_key_rejected() {
2125        let mut p = pool();
2126        p.peer_tls_cert = Some(std::path::PathBuf::from("/x.crt"));
2127        p.apply_defaults();
2128        let err = p.validate("p").unwrap_err();
2129        assert!(
2130            matches!(
2131                err,
2132                ConfError::BadServer {
2133                    field: "peer_tls_cert",
2134                    ..
2135                }
2136            ),
2137            "got {err:?}"
2138        );
2139    }
2140
2141    #[test]
2142    fn peer_tls_key_without_cert_rejected() {
2143        let mut p = pool();
2144        p.peer_tls_key = Some(std::path::PathBuf::from("/x.key"));
2145        p.apply_defaults();
2146        let err = p.validate("p").unwrap_err();
2147        assert!(
2148            matches!(
2149                err,
2150                ConfError::BadServer {
2151                    field: "peer_tls_key",
2152                    ..
2153                }
2154            ),
2155            "got {err:?}"
2156        );
2157    }
2158
2159    #[test]
2160    fn peer_tls_ca_without_cert_rejected() {
2161        let mut p = pool();
2162        p.peer_tls_ca = Some(std::path::PathBuf::from("/x.ca"));
2163        p.apply_defaults();
2164        let err = p.validate("p").unwrap_err();
2165        assert!(
2166            matches!(
2167                err,
2168                ConfError::BadServer {
2169                    field: "peer_tls_ca",
2170                    ..
2171                }
2172            ),
2173            "got {err:?}"
2174        );
2175    }
2176
2177    #[test]
2178    fn riak_tls_cert_without_key_rejected() {
2179        let mut p = pool();
2180        p.riak = Some(ConfRiak {
2181            pbc_listen: Some("127.0.0.1:8087".into()),
2182            tls_cert: Some(std::path::PathBuf::from("/x.crt")),
2183            ..ConfRiak::default()
2184        });
2185        p.apply_defaults();
2186        let err = p.validate("p").unwrap_err();
2187        assert!(
2188            matches!(
2189                err,
2190                ConfError::BadServer {
2191                    field: "tls_cert",
2192                    ..
2193                }
2194            ),
2195            "got {err:?}"
2196        );
2197    }
2198
2199    #[test]
2200    fn riak_tls_pair_both_set_is_ok() {
2201        let mut p = pool();
2202        p.riak = Some(ConfRiak {
2203            pbc_listen: Some("127.0.0.1:8087".into()),
2204            tls_cert: Some(std::path::PathBuf::from("/x.crt")),
2205            tls_key: Some(std::path::PathBuf::from("/x.key")),
2206            ..ConfRiak::default()
2207        });
2208        p.apply_defaults();
2209        assert!(p.validate("p").is_ok());
2210    }
2211
2212    #[test]
2213    fn riak_wasm_modules_yaml_round_trip() {
2214        let dir = tempfile::tempdir().unwrap();
2215        let m1 = dir.path().join("identity.wasm");
2216        let m2 = dir.path().join("sum.wasm");
2217        std::fs::write(&m1, b"\0asm\x01\0\0\0").unwrap();
2218        std::fs::write(&m2, b"\0asm\x01\0\0\0").unwrap();
2219        let yaml = format!(
2220            r"
2221listen: 127.0.0.1:8102
2222servers:
2223- 127.0.0.1:6379:1
2224tokens: '0'
2225riak:
2226  pbc_listen: 127.0.0.1:8087
2227  wasm_modules:
2228  - id: identity
2229    path: {m1}
2230  - id: sum
2231    path: {m2}
2232",
2233            m1 = m1.display(),
2234            m2 = m2.display(),
2235        );
2236        let p: ConfPool = serde_yaml::from_str(&yaml).unwrap();
2237        let r = p.riak.as_ref().unwrap();
2238        let mods = r.wasm_modules.as_ref().unwrap();
2239        assert_eq!(mods.len(), 2);
2240        assert_eq!(mods[0].id, "identity");
2241        assert_eq!(mods[0].path, m1);
2242        assert_eq!(mods[1].id, "sum");
2243        assert_eq!(mods[1].path, m2);
2244        // Round-trip back to YAML and re-parse.
2245        let dumped = serde_yaml::to_string(&p).unwrap();
2246        let p2: ConfPool = serde_yaml::from_str(&dumped).unwrap();
2247        assert_eq!(p2.riak.unwrap().wasm_modules, r.wasm_modules);
2248    }
2249
2250    #[test]
2251    fn riak_wasm_modules_unique_ids_required() {
2252        let dir = tempfile::tempdir().unwrap();
2253        let path = dir.path().join("m.wasm");
2254        std::fs::write(&path, b"\0").unwrap();
2255        let r = ConfRiak {
2256            wasm_modules: Some(vec![
2257                ConfRiakWasmModule {
2258                    id: "m".into(),
2259                    path: path.clone(),
2260                },
2261                ConfRiakWasmModule {
2262                    id: "m".into(),
2263                    path: path.clone(),
2264                },
2265            ]),
2266            ..ConfRiak::default()
2267        };
2268        let err = r.validate().unwrap_err();
2269        assert!(matches!(
2270            err,
2271            ConfError::BadServer {
2272                field: "wasm_modules.id",
2273                ..
2274            }
2275        ));
2276    }
2277
2278    #[test]
2279    fn riak_wasm_modules_path_must_exist() {
2280        let r = ConfRiak {
2281            wasm_modules: Some(vec![ConfRiakWasmModule {
2282                id: "missing".into(),
2283                path: std::path::PathBuf::from("/no/such/path/at/all.wasm"),
2284            }]),
2285            ..ConfRiak::default()
2286        };
2287        let err = r.validate().unwrap_err();
2288        assert!(matches!(
2289            err,
2290            ConfError::BadServer {
2291                field: "wasm_modules.path",
2292                ..
2293            }
2294        ));
2295    }
2296
2297    #[test]
2298    fn riak_wasm_modules_empty_id_rejected() {
2299        let dir = tempfile::tempdir().unwrap();
2300        let path = dir.path().join("m.wasm");
2301        std::fs::write(&path, b"\0").unwrap();
2302        let r = ConfRiak {
2303            wasm_modules: Some(vec![ConfRiakWasmModule {
2304                id: String::new(),
2305                path,
2306            }]),
2307            ..ConfRiak::default()
2308        };
2309        let err = r.validate().unwrap_err();
2310        assert!(matches!(
2311            err,
2312            ConfError::BadServer {
2313                field: "wasm_modules.id",
2314                ..
2315            }
2316        ));
2317    }
2318
2319    #[test]
2320    fn peer_tls_profile_pair_unset_is_ok() {
2321        let p = ConfTlsProfile::default();
2322        assert!(p.validate("dc1").is_ok());
2323    }
2324
2325    #[test]
2326    fn peer_tls_profile_cert_without_key_rejected() {
2327        let p = ConfTlsProfile {
2328            cert: Some(std::path::PathBuf::from("/x.crt")),
2329            ..ConfTlsProfile::default()
2330        };
2331        let err = p.validate("dc1").unwrap_err();
2332        assert!(matches!(
2333            err,
2334            ConfError::BadServer {
2335                field: "peer_tls_profiles.cert",
2336                ..
2337            }
2338        ));
2339    }
2340
2341    #[test]
2342    fn peer_tls_profile_key_without_cert_rejected() {
2343        let p = ConfTlsProfile {
2344            key: Some(std::path::PathBuf::from("/x.key")),
2345            ..ConfTlsProfile::default()
2346        };
2347        let err = p.validate("dc1").unwrap_err();
2348        assert!(matches!(
2349            err,
2350            ConfError::BadServer {
2351                field: "peer_tls_profiles.key",
2352                ..
2353            }
2354        ));
2355    }
2356
2357    #[test]
2358    fn peer_tls_profile_ca_without_cert_rejected() {
2359        let p = ConfTlsProfile {
2360            ca: Some(std::path::PathBuf::from("/x.ca")),
2361            ..ConfTlsProfile::default()
2362        };
2363        let err = p.validate("dc1").unwrap_err();
2364        assert!(matches!(
2365            err,
2366            ConfError::BadServer {
2367                field: "peer_tls_profiles.ca",
2368                ..
2369            }
2370        ));
2371    }
2372
2373    #[test]
2374    fn peer_tls_profiles_empty_dc_name_rejected() {
2375        let mut p = pool();
2376        p.peer_tls_profiles.insert(
2377            String::new(),
2378            ConfTlsProfile {
2379                cert: Some(std::path::PathBuf::from("/x.crt")),
2380                key: Some(std::path::PathBuf::from("/x.key")),
2381                ca: None,
2382            },
2383        );
2384        p.apply_defaults();
2385        let err = p.validate("p").unwrap_err();
2386        assert!(matches!(
2387            err,
2388            ConfError::BadServer {
2389                field: "peer_tls_profiles",
2390                ..
2391            }
2392        ));
2393    }
2394
2395    #[test]
2396    fn peer_tls_profiles_per_dc_pair_validates() {
2397        let mut p = pool();
2398        p.peer_tls_profiles.insert(
2399            "dc1".into(),
2400            ConfTlsProfile {
2401                cert: Some(std::path::PathBuf::from("/dc1.crt")),
2402                key: Some(std::path::PathBuf::from("/dc1.key")),
2403                ca: None,
2404            },
2405        );
2406        p.apply_defaults();
2407        assert!(p.validate("p").is_ok());
2408    }
2409
2410    #[test]
2411    fn peer_tls_profiles_per_dc_cert_without_key_rejected() {
2412        let mut p = pool();
2413        p.peer_tls_profiles.insert(
2414            "dc1".into(),
2415            ConfTlsProfile {
2416                cert: Some(std::path::PathBuf::from("/dc1.crt")),
2417                key: None,
2418                ca: None,
2419            },
2420        );
2421        p.apply_defaults();
2422        let err = p.validate("p").unwrap_err();
2423        assert!(matches!(
2424            err,
2425            ConfError::BadServer {
2426                field: "peer_tls_profiles.cert",
2427                ..
2428            }
2429        ));
2430    }
2431
2432    #[test]
2433    fn peer_tls_profiles_yaml_round_trip() {
2434        let yaml = r"
2435listen: 127.0.0.1:8102
2436servers:
2437- 127.0.0.1:6379:1
2438tokens: '0'
2439peer_tls_profiles:
2440  dc1:
2441    cert: /etc/dynomite/dc1.pem
2442    key: /etc/dynomite/dc1.key
2443    ca: /etc/dynomite/dc1-ca.pem
2444  dc2:
2445    cert: /etc/dynomite/dc2.pem
2446    key: /etc/dynomite/dc2.key
2447";
2448        let p: ConfPool = serde_yaml::from_str(yaml).unwrap();
2449        assert_eq!(p.peer_tls_profiles.len(), 2);
2450        assert_eq!(
2451            p.peer_tls_profiles["dc1"].cert.as_deref(),
2452            Some(std::path::Path::new("/etc/dynomite/dc1.pem"))
2453        );
2454        assert!(p.peer_tls_profiles["dc2"].ca.is_none());
2455        let dumped = serde_yaml::to_string(&p).unwrap();
2456        let p2: ConfPool = serde_yaml::from_str(&dumped).unwrap();
2457        assert_eq!(p2.peer_tls_profiles, p.peer_tls_profiles);
2458    }
2459}