Skip to main content

dynomite/conf/
pool.rs

1//! Pool body schema, default application, and validation.
2//!
3//! The [`ConfPool`] struct mirrors `struct conf_pool` from the C
4//! reference. Every field whose C counterpart starts as
5//! `CONF_UNSET_NUM` / `CONF_UNSET_BOOL` / `CONF_UNSET_HASH` is wrapped
6//! in [`Option`]; [`ConfPool::apply_defaults`] later fills in the
7//! sentinel-driven defaults.
8
9use std::collections::BTreeMap;
10use std::fmt;
11use std::path::PathBuf;
12
13use serde::{Deserialize, Serialize};
14
15use super::endpoint::ConfListen;
16use super::enums::{
17    ConsistencyLevel, DataStore, Distribution, HashType, SecureServerOption, Transport,
18};
19use super::error::ConfError;
20use super::server::{ConfDynSeed, ConfServer};
21use super::tokens::TokenList;
22
23/// Default configuration constants. Mirrors the `CONF_DEFAULT_*` macros
24/// in the C reference.
25pub mod defaults {
26    /// Default request timeout in milliseconds.
27    pub const TIMEOUT_MS: i64 = 5_000;
28    /// Default `listen()` backlog.
29    pub const LISTEN_BACKLOG: i64 = 512;
30    /// Default `client_connections:` value (0 = unlimited).
31    pub const CLIENT_CONNECTIONS: i64 = 0;
32    /// Default `data_store:` value (0 = redis).
33    pub const DATA_STORE: i64 = 0;
34    /// Default `preconnect:` value.
35    ///
36    /// The C reference defaults `preconnect` to false: clients
37    /// can connect to dynomited before the local datastore is
38    /// reachable. The lazy connect avoids a hard dependency on
39    /// boot ordering. We match that default here.
40    pub const PRECONNECT: bool = false;
41    /// Default `auto_eject_hosts:` value.
42    pub const AUTO_EJECT_HOSTS: bool = true;
43    /// Default `server_retry_timeout:` (ms).
44    pub const SERVER_RETRY_TIMEOUT_MS: i64 = 10 * 1000;
45    /// Default `server_failure_limit:`.
46    pub const SERVER_FAILURE_LIMIT: i64 = 3;
47    /// Default `dyn_read_timeout:` (ms).
48    pub const DYN_READ_TIMEOUT_MS: i64 = 10_000;
49    /// Default `dyn_write_timeout:` (ms).
50    pub const DYN_WRITE_TIMEOUT_MS: i64 = 10_000;
51    /// Default `dyn_connections:`.
52    pub const DYN_CONNECTIONS: i64 = 100;
53    /// Default `gos_interval:` (ms).
54    pub const GOS_INTERVAL_MS: i64 = 30_000;
55    /// Default `enable_hinted_handoff:` value. The feature is
56    /// off by default until operators opt in.
57    pub const ENABLE_HINTED_HANDOFF: bool = false;
58    /// Default `hint_ttl_seconds:` (24h). Hints older than this
59    /// are dropped during expiry sweeps.
60    pub const HINT_TTL_SECONDS: u64 = 86_400;
61    /// Default `hint_store_max_bytes:` (64 MiB) cap on the
62    /// node-local in-memory hint store.
63    pub const HINT_STORE_MAX_BYTES: u64 = 64 * 1024 * 1024;
64    /// Default `hint_drain_interval_ms:` between hint drainer
65    /// sweeps.
66    pub const HINT_DRAIN_INTERVAL_MS: u64 = 30_000;
67    /// Default per-connection message rate.
68    pub const CONN_MSG_RATE: u32 = 50_000;
69    /// Default `stats_interval:` (ms).
70    pub const STATS_INTERVAL_MS: i64 = 30 * 1000;
71    /// Default stats listener address.
72    pub const STATS_PNAME: &str = "0.0.0.0:22222";
73    /// Default datastore-side connection count.
74    pub const DATASTORE_CONNECTIONS: u8 = 1;
75    /// Default local-peer connection count.
76    pub const LOCAL_PEER_CONNECTIONS: u8 = 1;
77    /// Default remote-peer connection count.
78    pub const REMOTE_PEER_CONNECTIONS: u8 = 1;
79    /// Default rack name.
80    pub const RACK: &str = "localrack";
81    /// Default datacenter name.
82    pub const DC: &str = "localdc";
83    /// Default `secure_server_option:` value.
84    pub const SECURE_SERVER_OPTION: &str = "none";
85    /// Default `read_consistency:` / `write_consistency:`.
86    pub const CONSISTENCY: &str = "DC_ONE";
87    /// Default `dyn_seed_provider:`.
88    pub const SEED_PROVIDER: &str = "simple_provider";
89    /// Default `env:` (cloud environment marker).
90    pub const ENV: &str = "aws";
91    /// Default PEM key file path.
92    pub const PEM_KEY_FILE: &str = "conf/dynomite.pem";
93    /// Default reconciliation key file path.
94    pub const RECON_KEY_FILE: &str = "conf/recon_key.pem";
95    /// Default reconciliation IV file path.
96    pub const RECON_IV_FILE: &str = "conf/recon_iv.pem";
97    /// Default cadence (in seconds) of the entropy reconciliation
98    /// run loop. Mirrors the brief's five-minute default; ignored
99    /// when the entropy task is not enabled.
100    pub const RECON_INTERVAL_SECONDS: u64 = 300;
101    /// Smallest valid `mbuf_size:`.
102    pub const MBUF_MIN_SIZE: i64 = 512;
103    /// Largest valid `mbuf_size:`.
104    pub const MBUF_MAX_SIZE: i64 = 512_000;
105    /// Smallest valid `max_msgs:`.
106    pub const ALLOC_MSGS_MIN: i64 = 100_000;
107    /// Largest valid `max_msgs:`.
108    pub const ALLOC_MSGS_MAX: i64 = 1_000_000;
109}
110
111/// Wrapper for the `servers:` field that enforces the invariant
112/// of "exactly one datastore" without losing the YAML list shape.
113///
114/// # Examples
115///
116/// ```
117/// use dynomite::conf::{ConfServer, Servers};
118/// let s = Servers::from_vec(vec![ConfServer::parse("127.0.0.1:6379:1").unwrap()]);
119/// assert_eq!(s.len(), 1);
120/// assert!(!s.is_empty());
121/// assert!(s.datastore().is_some());
122/// ```
123#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize)]
124#[serde(transparent)]
125pub struct Servers(pub(crate) Vec<ConfServer>);
126
127impl Servers {
128    /// Construct from an explicit list. Validation enforces a length
129    /// of one when called via `Config::validate`.
130    ///
131    /// # Examples
132    ///
133    /// ```
134    /// use dynomite::conf::{ConfServer, Servers};
135    /// let s = Servers::from_vec(vec![ConfServer::parse("127.0.0.1:6379:1").unwrap()]);
136    /// assert_eq!(s.len(), 1);
137    /// ```
138    pub fn from_vec(v: Vec<ConfServer>) -> Self {
139        Self(v)
140    }
141}
142
143impl Servers {
144    /// Borrow the entries.
145    ///
146    /// # Examples
147    ///
148    /// ```
149    /// use dynomite::conf::{ConfServer, Servers};
150    /// let s = Servers::from_vec(vec![ConfServer::parse("127.0.0.1:6379:1").unwrap()]);
151    /// assert_eq!(s.entries().len(), 1);
152    /// ```
153    pub fn entries(&self) -> &[ConfServer] {
154        &self.0
155    }
156    /// Number of entries.
157    ///
158    /// # Examples
159    ///
160    /// ```
161    /// use dynomite::conf::Servers;
162    /// assert_eq!(Servers::default().len(), 0);
163    /// ```
164    pub fn len(&self) -> usize {
165        self.0.len()
166    }
167    /// Whether the list is empty.
168    ///
169    /// # Examples
170    ///
171    /// ```
172    /// use dynomite::conf::Servers;
173    /// assert!(Servers::default().is_empty());
174    /// ```
175    pub fn is_empty(&self) -> bool {
176        self.0.is_empty()
177    }
178    /// The single datastore (returns the first entry, if any).
179    ///
180    /// # Examples
181    ///
182    /// ```
183    /// use dynomite::conf::{ConfServer, Servers};
184    /// let s = Servers::from_vec(vec![ConfServer::parse("127.0.0.1:6379:1").unwrap()]);
185    /// assert!(s.datastore().is_some());
186    /// assert!(Servers::default().datastore().is_none());
187    /// ```
188    pub fn datastore(&self) -> Option<&ConfServer> {
189        self.0.first()
190    }
191}
192
193/// Pool configuration body. One per top-level YAML pool name.
194///
195/// # Examples
196///
197/// ```
198/// use dynomite::conf::{ConfPool, ConfListen};
199/// let mut p = ConfPool::default();
200/// assert!(p.listen.is_none());
201/// p.listen = Some(ConfListen::parse("listen", "127.0.0.1:8102").unwrap());
202/// p.apply_defaults();
203/// assert_eq!(p.timeout, Some(5_000));
204/// ```
205#[derive(Debug, Clone, Default, Serialize, Deserialize)]
206#[serde(deny_unknown_fields, default)]
207pub struct ConfPool {
208    /// `listen:` - client-facing listener address.
209    pub listen: Option<ConfListen>,
210    /// `dyn_listen:` - peer-facing listener address.
211    pub dyn_listen: Option<ConfListen>,
212    /// `stats_listen:` - HTTP stats endpoint.
213    pub stats_listen: Option<ConfListen>,
214
215    /// `hash:` - hash function name.
216    pub hash: Option<HashType>,
217    /// `hash_tag:` - two-character delimiter pair.
218    pub hash_tag: Option<String>,
219
220    /// `distribution:` - distribution algorithm. Defaults to
221    /// [`Distribution::Vnode`]. Setting one of the legacy
222    /// `ketama` / `modula` / `random` values is accepted but
223    /// emits a deprecation warning at config-load time and
224    /// collapses to `vnode` at runtime.
225    #[serde(default)]
226    pub distribution: Option<Distribution>,
227    /// `distribution_shadow:` - optional shadow distribution
228    /// computed alongside the live one. When set, the
229    /// dispatcher routes via [`Self::distribution`] but also
230    /// computes the shadow route for every key and bumps a
231    /// counter when the two disagree. Used to validate a
232    /// migration before flipping the live distribution.
233    #[serde(default)]
234    pub distribution_shadow: Option<Distribution>,
235    /// `server_connections:` - deprecated; recorded for warning but ignored.
236    #[serde(default)]
237    pub server_connections: Option<i64>,
238
239    /// `timeout:` - request timeout in milliseconds.
240    pub timeout: Option<i64>,
241    /// `backlog:` - listen backlog.
242    pub backlog: Option<i64>,
243    /// `client_connections:` - max client connections.
244    pub client_connections: Option<i64>,
245    /// `data_store:` - 0 = redis, 1 = memcache, 2 = noxu.
246    /// Operators may also write the textual form (`redis`,
247    /// `memcache`, `noxu`); the deserializer normalises both
248    /// shapes to the integer code that the rest of the engine
249    /// consumes.
250    #[serde(default, deserialize_with = "deserialize_data_store")]
251    pub data_store: Option<i64>,
252    /// `noxu_path:` - filesystem directory the in-process Noxu
253    /// DB datastore opens its environment at. Required when
254    /// `data_store: noxu` is selected; ignored otherwise. The
255    /// directory must be writable; an existing environment is
256    /// reused, otherwise one is created.
257    #[serde(default)]
258    pub noxu_path: Option<PathBuf>,
259    /// `preconnect:` - eagerly establish connections at startup.
260    pub preconnect: Option<bool>,
261    /// `redis_requirepass:` - optional password sent as `AUTH <pw>`
262    /// on every backend connection right after the TCP handshake.
263    /// Mirrors the Redis server option of the same name. Leave
264    /// unset to disable. Memcache backends are not authenticated
265    /// (`AUTH` is Redis-specific; memcache binary SASL is not
266    /// implemented).
267    #[serde(default)]
268    pub redis_requirepass: Option<String>,
269    /// `auto_eject_hosts:` - automatically eject failing peers.
270    pub auto_eject_hosts: Option<bool>,
271    /// `server_retry_timeout:` - retry interval for ejected servers (ms).
272    pub server_retry_timeout: Option<i64>,
273    /// `server_failure_limit:` - consecutive failures before eject.
274    pub server_failure_limit: Option<i64>,
275
276    /// `servers:` - the (single-element) datastore list.
277    pub servers: Option<Servers>,
278
279    /// `dyn_read_timeout:` - inter-node read timeout (ms).
280    pub dyn_read_timeout: Option<i64>,
281    /// `dyn_write_timeout:` - inter-node write timeout (ms).
282    pub dyn_write_timeout: Option<i64>,
283    /// `dyn_seed_provider:` - seeds backend.
284    pub dyn_seed_provider: Option<String>,
285    /// `dyn_seeds:` - peer dynomite nodes.
286    pub dyn_seeds: Option<Vec<ConfDynSeed>>,
287    /// `dyn_port:` - default peer port.
288    pub dyn_port: Option<i64>,
289    /// `dyn_connections:` - per-peer connection count.
290    pub dyn_connections: Option<i64>,
291    /// `rack:` - this node's rack.
292    pub rack: Option<String>,
293    /// `tokens:` - this node's tokens.
294    pub tokens: Option<TokenList>,
295    /// `gos_interval:` - gossip period (ms).
296    pub gos_interval: Option<i64>,
297    /// `secure_server_option:` - inter-node TLS mode.
298    pub secure_server_option: Option<String>,
299    /// `pem_key_file:` - path to the PEM private key.
300    pub pem_key_file: Option<String>,
301    /// `recon_key_file:` - reconciliation key path.
302    pub recon_key_file: Option<String>,
303    /// `recon_iv_file:` - reconciliation IV path.
304    pub recon_iv_file: Option<String>,
305    /// `recon_interval_seconds:` - period (in seconds) of the
306    /// background entropy reconciliation cycle.
307    ///
308    /// Ignored when [`Self::recon_key_file`] is unset or when the
309    /// configured key file cannot be opened at startup. When the
310    /// entropy task is enabled the default cadence is 300 seconds
311    /// (five minutes); operators can override it via this YAML
312    /// directive.
313    #[serde(default)]
314    pub recon_interval_seconds: Option<u64>,
315    /// `datacenter:` - this node's datacenter.
316    pub datacenter: Option<String>,
317    /// `env:` - cloud environment marker.
318    pub env: Option<String>,
319    /// `conn_msg_rate:` - per-connection message rate cap.
320    pub conn_msg_rate: Option<u32>,
321    /// `read_consistency:` - quorum policy for reads.
322    pub read_consistency: Option<String>,
323    /// `write_consistency:` - quorum policy for writes.
324    pub write_consistency: Option<String>,
325    /// `stats_interval:` - stats aggregation period (ms).
326    pub stats_interval: Option<i64>,
327    /// `enable_gossip:` - enable / disable gossip thread.
328    pub enable_gossip: Option<bool>,
329    /// `peer_tls_cert:` - PEM certificate path for the dnode
330    /// listener and outbound dnode connections. When both this
331    /// field and [`Self::peer_tls_key`] are set the peer plane
332    /// runs over TLS; when both are absent the peer plane runs
333    /// in plaintext (the historical behaviour). Setting one
334    /// without the other is rejected at validation time.
335    #[serde(default)]
336    pub peer_tls_cert: Option<PathBuf>,
337    /// `peer_tls_key:` - PEM private-key path matching
338    /// [`Self::peer_tls_cert`].
339    #[serde(default)]
340    pub peer_tls_key: Option<PathBuf>,
341    /// `peer_tls_ca:` - optional PEM CA bundle. When set, the
342    /// dnode listener requires every inbound peer to present a
343    /// certificate signed by a CA from this bundle (mutual TLS).
344    /// When unset, the listener still terminates TLS but does
345    /// not request a client certificate. The outbound side uses
346    /// this bundle as its trust anchor; when unset, the bundled
347    /// `webpki_roots` Mozilla bundle is used.
348    #[serde(default)]
349    pub peer_tls_ca: Option<PathBuf>,
350    /// `peer_tls_profiles:` - per-DC TLS material lookup. Each
351    /// entry is keyed by the target peer's datacenter name; the
352    /// value is a [`ConfTlsProfile`] giving the cert / key / CA
353    /// triple to use when negotiating peer-plane TLS to or from a
354    /// peer in that DC. When the inbound listener accepts a
355    /// connection it picks the cert by SNI hostname
356    /// (`dc-<dc-name>.dynomite.local`); the outbound peer
357    /// supervisor dials with the same SNI hostname so the remote
358    /// listener can route the handshake.
359    ///
360    /// When this map is empty (the default), the legacy
361    /// `peer_tls_cert` / `peer_tls_key` / `peer_tls_ca` triple is
362    /// the only profile in use, applied to every peer regardless
363    /// of DC. When the map is non-empty, each entry takes
364    /// precedence over the legacy fields for matching DCs; the
365    /// legacy fields become the implicit "default" profile used
366    /// for any DC without an explicit entry. When neither the
367    /// map nor the legacy fields are set, the peer plane is
368    /// plaintext.
369    #[serde(default)]
370    pub peer_tls_profiles: BTreeMap<String, ConfTlsProfile>,
371    /// `mbuf_size:` - mbuf chunk size in bytes.
372    pub mbuf_size: Option<i64>,
373    /// `max_msgs:` - allocated message buffer size.
374    pub max_msgs: Option<i64>,
375    /// `datastore_connections:` - count of connections to the datastore.
376    pub datastore_connections: Option<u8>,
377    /// `local_peer_connections:` - count of connections to local-DC peers.
378    pub local_peer_connections: Option<u8>,
379    /// `remote_peer_connections:` - count of connections to remote peers.
380    pub remote_peer_connections: Option<u8>,
381    /// `read_repairs_enabled:` - enable read-repair on quorum mismatch.
382    pub read_repairs_enabled: Option<bool>,
383    /// `enable_hinted_handoff:` - when true, writes whose target
384    /// peer is in [`crate::cluster::peer::PeerState::Down`] (or
385    /// whose outbound channel is closed / full) are stored in a
386    /// node-local hint queue and counted toward the consistency
387    /// threshold; a background drainer ships the hints to the
388    /// peer once it returns to
389    /// [`crate::cluster::peer::PeerState::Normal`]. When false
390    /// (the default) the dispatcher behaviour is unchanged: a
391    /// Down or unreachable target is silently skipped and the
392    /// request fails with `DynomiteNoQuorumAchieved` if the
393    /// remaining targets cannot satisfy the consistency level.
394    #[serde(default)]
395    pub enable_hinted_handoff: Option<bool>,
396    /// `hint_ttl_seconds:` - per-hint expiry. Hints older than
397    /// this many seconds are dropped during periodic sweeps to
398    /// bound the in-memory store. Defaults to 86400 (24 hours).
399    /// Ignored when `enable_hinted_handoff` is false.
400    #[serde(default)]
401    pub hint_ttl_seconds: Option<u64>,
402    /// `hint_store_max_bytes:` - upper bound on the in-memory
403    /// hint store. Once the store reaches this many bytes,
404    /// further enqueues fail with
405    /// [`crate::cluster::hints::HintStoreError::OverCapacity`]
406    /// and the dispatcher falls back to its non-handoff error
407    /// path. Defaults to 64 MiB. Ignored when
408    /// `enable_hinted_handoff` is false.
409    #[serde(default)]
410    pub hint_store_max_bytes: Option<u64>,
411    /// `hint_drain_interval_ms:` - period of the background hint
412    /// drainer sweep. Defaults to 30000 ms (30 seconds). Ignored
413    /// when `enable_hinted_handoff` is false.
414    #[serde(default)]
415    pub hint_drain_interval_ms: Option<u64>,
416    /// `log_format:` - selectable shape for tracing output.
417    ///
418    /// Accepted values are `default`, `rfc5424`, `rfc3164`, `json`,
419    /// and `ndjson` (alias of `json`). When unset, the historical
420    /// default text format is used. Parsing is performed at
421    /// log-installation time by [`crate::core::log::LogFormat::parse`];
422    /// invalid values fail the `dynomited --test-conf` gate.
423    pub log_format: Option<String>,
424
425    /// `observability:` - opt-in observability knobs (distributed
426    /// tracing OTLP exporter today; the OTLP log appender is
427    /// scoped but not yet wired). Absent / null disables every
428    /// observability surface, preserving today's silent default.
429    /// See [`ObservabilityConfig`].
430    #[serde(default)]
431    pub observability: Option<ObservabilityConfig>,
432
433    /// `bucket_types:` - per-bucket routing-property bundles.
434    ///
435    /// Each entry is a [`ConfBucketType`] keyed by name. The
436    /// dispatcher extracts the bucket name from the request key
437    /// (the prefix before the first `/`; see
438    /// [`crate::proto::redis::bucket_name`]) and, when a matching
439    /// entry exists, swaps in that type's `read_consistency`,
440    /// `write_consistency`, and `n_val` for the lifetime of that
441    /// request. Pool-level fields stay the fallback for keys
442    /// without a slash, for keys whose bucket prefix is unknown,
443    /// and for any field the bucket-type stanza leaves at its
444    /// default. Empty list disables the feature; entries must have
445    /// unique names.
446    #[serde(default)]
447    pub bucket_types: Vec<ConfBucketType>,
448
449    /// `default_bucket_type:` - name of the bucket type to apply
450    /// when the request key has no slash (or has an empty prefix).
451    /// Must reference an entry of `bucket_types` when set; unset
452    /// (the default) falls all the way back to pool-level
453    /// consistency.
454    #[serde(default)]
455    pub default_bucket_type: Option<String>,
456
457    /// `riak:` - optional Riak-mode listener / AAE configuration.
458    ///
459    /// Consumed only when `dynomited` is built with the
460    /// `--features riak` Cargo feature: the binary then
461    /// instantiates a Protocol Buffers Client (PBC) listener,
462    /// an HTTP gateway, and (optionally) the active anti-entropy
463    /// scheduler against the supplied addresses. When the field
464    /// is absent (or every inner option is unset), the binary
465    /// behaves identically to a Redis / Memcache deployment.
466    /// The block is parsed unconditionally so YAML files
467    /// authored against the Riak-enabled binary still validate
468    /// under the default build.
469    #[serde(default)]
470    pub riak: Option<ConfRiak>,
471
472    /// `transport:` - selects the network stack the proxy
473    /// listener binds. `tcp` is the historical default and is
474    /// the only option a build without the `quic` Cargo
475    /// feature satisfies. `quic` requires the engine's `quic`
476    /// feature and a server cert / key pair supplied via
477    /// [`Self::quic_cert_file`] and [`Self::quic_key_file`];
478    /// the listener binds a UDP socket and serves the
479    /// configured datastore protocol over a single QUIC
480    /// bidirectional stream per accepted connection.
481    ///
482    /// When unset (the default), the engine selects
483    /// [`Transport::Tcp`].
484    #[serde(default)]
485    pub transport: Option<Transport>,
486
487    /// `quic_cert_file:` - PEM certificate chain path used by
488    /// the QUIC listener when `transport: quic` is selected.
489    /// Required when [`Self::transport`] resolves to
490    /// [`Transport::Quic`]; ignored otherwise.
491    #[serde(default)]
492    pub quic_cert_file: Option<PathBuf>,
493
494    /// `quic_key_file:` - PEM private-key path matching
495    /// [`Self::quic_cert_file`]. Required when
496    /// [`Self::transport`] resolves to [`Transport::Quic`];
497    /// ignored otherwise.
498    #[serde(default)]
499    pub quic_key_file: Option<PathBuf>,
500}
501
502/// Optional Riak-mode listener / AAE knobs.
503///
504/// Every field is optional. The PBC and HTTP listeners are
505/// independent: setting one without the other is supported.
506/// When `aae_enabled` is `true` the active anti-entropy
507/// scheduler is spawned; the cadence knobs default to the
508/// values shipped by `dyniak::aae::config`.
509///
510/// # Examples
511///
512/// ```
513/// use dynomite::conf::ConfRiak;
514/// let r = ConfRiak {
515///     pbc_listen: Some("127.0.0.1:8087".into()),
516///     http_listen: Some("127.0.0.1:8098".into()),
517///     aae_enabled: Some(false),
518///     aae_full_sweep_interval_seconds: None,
519///     aae_segment_interval_seconds: None,
520///     tls_cert: None,
521///     tls_key: None,
522///     tls_ca: None,
523///     wasm_modules: None,
524/// };
525/// assert!(r.validate().is_ok());
526/// ```
527#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
528#[serde(deny_unknown_fields, default)]
529pub struct ConfRiak {
530    /// Address the Riak Protocol Buffers Client listener binds
531    /// to (`host:port`). When unset, the PBC listener is not
532    /// started.
533    pub pbc_listen: Option<String>,
534    /// Address the Riak HTTP gateway listener binds to
535    /// (`host:port`). When unset, the HTTP gateway is not
536    /// started.
537    pub http_listen: Option<String>,
538    /// When `true`, the Riak active anti-entropy scheduler is
539    /// spawned alongside the listeners. Default: `false`.
540    pub aae_enabled: Option<bool>,
541    /// Override for the AAE full-sweep cadence, in seconds.
542    /// When unset, `dyniak::aae::config::DEFAULT_FULL_SWEEP_SECONDS`
543    /// (24h) is used.
544    pub aae_full_sweep_interval_seconds: Option<u64>,
545    /// Override for the AAE per-segment exchange cadence, in
546    /// seconds. When unset, `dyniak::aae::config::DEFAULT_SEGMENT_SECONDS`
547    /// (60s) is used.
548    pub aae_segment_interval_seconds: Option<u64>,
549    /// `tls_cert:` - PEM certificate path for the Riak PBC and
550    /// HTTP listeners. When both `tls_cert` and `tls_key` are
551    /// set, both Riak listeners terminate TLS; when both are
552    /// absent, both listeners run in plaintext (the historical
553    /// behaviour). Setting one without the other is rejected at
554    /// validation time.
555    #[serde(default)]
556    pub tls_cert: Option<PathBuf>,
557    /// `tls_key:` - PEM private-key path matching
558    /// [`Self::tls_cert`].
559    #[serde(default)]
560    pub tls_key: Option<PathBuf>,
561    /// `tls_ca:` - optional PEM CA bundle for mutual TLS on the
562    /// Riak listeners. When set, every inbound client must
563    /// present a certificate signed by a CA from this bundle.
564    /// When unset, the listeners terminate TLS without
565    /// requesting a client certificate.
566    #[serde(default)]
567    pub tls_ca: Option<PathBuf>,
568    /// `wasm_modules:` - optional list of Wasm modules to
569    /// register with the MapReduce executor at startup. Each
570    /// entry pairs a logical `id` with the on-disk `path` of a
571    /// Wasm binary (`.wasm`) or WAT text (`.wat`) file. When
572    /// `dynomited` is built with the `wasm` Cargo feature it
573    /// loads every entry through
574    /// [`dyniak::mapreduce::wasm::load_modules_from_config`]
575    /// and exposes the resulting store on the executor; without
576    /// the feature the field is parsed and validated but the
577    /// loader is never called (the runtime returns the typed
578    /// `WasmNotImplemented` error if a `Phase::WasmModule` is
579    /// submitted).
580    ///
581    /// Validation: every `id` must be unique and every `path`
582    /// must point at an existing file at validation time.
583    #[serde(default)]
584    pub wasm_modules: Option<Vec<ConfRiakWasmModule>>,
585}
586
587/// One Wasm module entry inside a [`ConfRiak::wasm_modules`]
588/// list.
589///
590/// # Examples
591///
592/// ```
593/// use std::path::PathBuf;
594/// use dynomite::conf::ConfRiakWasmModule;
595/// let m = ConfRiakWasmModule {
596///     id: "identity".into(),
597///     path: PathBuf::from("/etc/dynomited/wasm/identity.wasm"),
598/// };
599/// assert_eq!(m.id, "identity");
600/// ```
601#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
602#[serde(deny_unknown_fields)]
603pub struct ConfRiakWasmModule {
604    /// Logical module identifier referenced from a
605    /// `Phase::WasmModule { module_id }` MapReduce phase.
606    pub id: String,
607    /// Filesystem path to the Wasm binary (`.wasm`) or WAT
608    /// text (`.wat`) file. Read once at startup.
609    pub path: PathBuf,
610}
611
612/// One per-DC TLS profile inside [`ConfPool::peer_tls_profiles`].
613///
614/// Each profile names a PEM cert / private-key pair and an
615/// optional CA bundle. The map's key is the datacenter name
616/// (matching the value an operator sets in `dyn_seeds:` for
617/// peers in that DC); when a peer's DC has no entry, the
618/// connection falls back to the legacy `peer_tls_*` fields
619/// (treated as the implicit "default" profile). When neither a
620/// per-DC entry nor the default fields are set, the connection
621/// is plaintext.
622///
623/// # Examples
624///
625/// ```
626/// use std::path::PathBuf;
627/// use dynomite::conf::ConfTlsProfile;
628/// let p = ConfTlsProfile {
629///     cert: Some(PathBuf::from("/etc/dynomite/dc1.pem")),
630///     key: Some(PathBuf::from("/etc/dynomite/dc1.key")),
631///     ca: Some(PathBuf::from("/etc/dynomite/dc1-ca.pem")),
632/// };
633/// assert!(p.validate("dc1").is_ok());
634/// ```
635#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
636#[serde(deny_unknown_fields, default)]
637pub struct ConfTlsProfile {
638    /// PEM certificate path. Must be set together with [`Self::key`].
639    pub cert: Option<PathBuf>,
640    /// PEM private-key path matching [`Self::cert`].
641    pub key: Option<PathBuf>,
642    /// Optional PEM CA bundle. When set, peer-plane connections
643    /// using this profile pin the bundle as their trust anchor
644    /// (and the listener requires inbound peers to present a
645    /// certificate signed by a CA in the bundle for mTLS). When
646    /// unset, the listener does not request a client certificate
647    /// and the outbound side falls back to the bundled
648    /// `webpki_roots` Mozilla anchors.
649    pub ca: Option<PathBuf>,
650}
651
652impl ConfTlsProfile {
653    /// Validate that the profile is internally consistent.
654    ///
655    /// `cert` and `key` must both be set or both be unset; a
656    /// `ca` requires the cert / key pair. The `dc` argument is
657    /// the map key the profile lives under and is included in
658    /// any error message so an operator can identify the
659    /// offending entry.
660    ///
661    /// # Errors
662    /// Returns [`ConfError::BadServer`] when the cert / key
663    /// pair is mismatched, or when `ca` is set without the cert
664    /// / key pair.
665    ///
666    /// # Examples
667    ///
668    /// ```
669    /// use std::path::PathBuf;
670    /// use dynomite::conf::ConfTlsProfile;
671    /// let p = ConfTlsProfile {
672    ///     cert: Some(PathBuf::from("/etc/x.pem")),
673    ///     key: None,
674    ///     ca: None,
675    /// };
676    /// assert!(p.validate("dc1").is_err());
677    /// ```
678    pub fn validate(&self, dc: &str) -> Result<(), ConfError> {
679        match (self.cert.as_deref(), self.key.as_deref()) {
680            (Some(_), Some(_)) | (None, None) => {}
681            (Some(c), None) => {
682                return Err(ConfError::BadServer {
683                    field: "peer_tls_profiles.cert",
684                    value: c.display().to_string(),
685                    reason: format!(
686                        "peer_tls_profiles[{dc}].cert is set but .key is not; both must be set together"
687                    ),
688                });
689            }
690            (None, Some(k)) => {
691                return Err(ConfError::BadServer {
692                    field: "peer_tls_profiles.key",
693                    value: k.display().to_string(),
694                    reason: format!(
695                        "peer_tls_profiles[{dc}].key is set but .cert is not; both must be set together"
696                    ),
697                });
698            }
699        }
700        if self.ca.is_some() && self.cert.is_none() {
701            return Err(ConfError::BadServer {
702                field: "peer_tls_profiles.ca",
703                value: self
704                    .ca
705                    .as_ref()
706                    .map_or_else(String::new, |p| p.display().to_string()),
707                reason: format!(
708                    "peer_tls_profiles[{dc}].ca requires .cert and .key to also be set"
709                ),
710            });
711        }
712        Ok(())
713    }
714}
715
716impl ConfRiak {
717    /// Validate the cross-field invariants of the Riak block.
718    ///
719    /// # Errors
720    /// Returns a [`ConfError::BadServer`] when an address fails
721    /// to parse as a `host:port` socket address, when an AAE
722    /// cadence is zero, or when `aae_segment_interval_seconds`
723    /// exceeds `aae_full_sweep_interval_seconds`.
724    ///
725    /// # Examples
726    ///
727    /// ```
728    /// use dynomite::conf::ConfRiak;
729    /// let r = ConfRiak {
730    ///     pbc_listen: Some("not-a-socket-addr".into()),
731    ///     ..ConfRiak::default()
732    /// };
733    /// assert!(r.validate().is_err());
734    /// ```
735    pub fn validate(&self) -> Result<(), ConfError> {
736        if let Some(addr) = self.pbc_listen.as_deref() {
737            validate_riak_addr("pbc_listen", addr)?;
738        }
739        if let Some(addr) = self.http_listen.as_deref() {
740            validate_riak_addr("http_listen", addr)?;
741        }
742        if let Some(n) = self.aae_full_sweep_interval_seconds {
743            if n == 0 {
744                return Err(ConfError::BadServer {
745                    field: "aae_full_sweep_interval_seconds",
746                    value: n.to_string(),
747                    reason: "must be > 0".into(),
748                });
749            }
750        }
751        if let Some(n) = self.aae_segment_interval_seconds {
752            if n == 0 {
753                return Err(ConfError::BadServer {
754                    field: "aae_segment_interval_seconds",
755                    value: n.to_string(),
756                    reason: "must be > 0".into(),
757                });
758            }
759        }
760        if let (Some(seg), Some(full)) = (
761            self.aae_segment_interval_seconds,
762            self.aae_full_sweep_interval_seconds,
763        ) {
764            if seg > full {
765                return Err(ConfError::BadServer {
766                    field: "aae_segment_interval_seconds",
767                    value: seg.to_string(),
768                    reason: format!("must be <= aae_full_sweep_interval_seconds ({full})"),
769                });
770            }
771        }
772        validate_tls_pair(
773            "tls_cert",
774            "tls_key",
775            self.tls_cert.as_deref(),
776            self.tls_key.as_deref(),
777        )?;
778        if self.tls_ca.is_some() && self.tls_cert.is_none() {
779            return Err(ConfError::BadServer {
780                field: "tls_ca",
781                value: self
782                    .tls_ca
783                    .as_ref()
784                    .map_or_else(String::new, |p| p.display().to_string()),
785                reason: "requires tls_cert and tls_key to also be set".into(),
786            });
787        }
788        if let Some(modules) = self.wasm_modules.as_deref() {
789            let mut seen: std::collections::BTreeSet<&str> = std::collections::BTreeSet::new();
790            for m in modules {
791                if m.id.is_empty() {
792                    return Err(ConfError::BadServer {
793                        field: "wasm_modules.id",
794                        value: String::new(),
795                        reason: "wasm module id must not be empty".into(),
796                    });
797                }
798                if !seen.insert(m.id.as_str()) {
799                    return Err(ConfError::BadServer {
800                        field: "wasm_modules.id",
801                        value: m.id.clone(),
802                        reason: "wasm module ids must be unique".into(),
803                    });
804                }
805                if !m.path.is_file() {
806                    return Err(ConfError::BadServer {
807                        field: "wasm_modules.path",
808                        value: m.path.display().to_string(),
809                        reason: format!("wasm module file not found for id '{}'", m.id),
810                    });
811                }
812            }
813        }
814        Ok(())
815    }
816}
817
818/// Cross-check a `(cert, key)` TLS pair: both must be `Some` or
819/// both must be `None`. The `cert_field` and `key_field` static
820/// strings name the YAML keys for the error message.
821fn validate_tls_pair(
822    cert_field: &'static str,
823    key_field: &'static str,
824    cert: Option<&std::path::Path>,
825    key: Option<&std::path::Path>,
826) -> Result<(), ConfError> {
827    match (cert, key) {
828        (Some(_), Some(_)) | (None, None) => Ok(()),
829        (Some(c), None) => Err(ConfError::BadServer {
830            field: cert_field,
831            value: c.display().to_string(),
832            reason: format!(
833                "{cert_field} is set but {key_field} is not; both must be set together"
834            ),
835        }),
836        (None, Some(k)) => Err(ConfError::BadServer {
837            field: key_field,
838            value: k.display().to_string(),
839            reason: format!(
840                "{key_field} is set but {cert_field} is not; both must be set together"
841            ),
842        }),
843    }
844}
845
846fn validate_riak_addr(field: &'static str, value: &str) -> Result<(), ConfError> {
847    use std::net::ToSocketAddrs;
848    if value.is_empty() {
849        return Err(ConfError::BadServer {
850            field,
851            value: value.to_string(),
852            reason: "riak listen address must not be empty".into(),
853        });
854    }
855    // Accept anything that resolves; mirrors how the rest of
856    // the engine validates `listen:` strings (parse first,
857    // resolve as a fallback).
858    if value.parse::<std::net::SocketAddr>().is_ok() {
859        return Ok(());
860    }
861    match value.to_socket_addrs() {
862        Ok(mut iter) => {
863            if iter.next().is_some() {
864                Ok(())
865            } else {
866                Err(ConfError::BadServer {
867                    field,
868                    value: value.to_string(),
869                    reason: "resolved to no addresses".into(),
870                })
871            }
872        }
873        Err(e) => Err(ConfError::BadServer {
874            field,
875            value: value.to_string(),
876            reason: format!("could not resolve: {e}"),
877        }),
878    }
879}
880
881/// Routing-property bundle attached to a key bucket.
882///
883/// Bucket types let operators give different key classes
884/// different SLAs without running multiple pools. Cache-style
885/// keys can pin to `DC_ONE` while transactional keys sit on
886/// `DC_EACH_SAFE_QUORUM`; the same dynomited binary serves both.
887///
888/// `n_val` caps the replica fan-out for the lifetime of one
889/// request: when the topology offers more replicas than `n_val`,
890/// the dispatcher takes the first `n_val` (the existing rack /
891/// DC ordering puts preferred replicas first). A value of `0`
892/// means "no cap" and is treated identically to omitting the
893/// field.
894///
895/// # Examples
896///
897/// ```
898/// use dynomite::conf::{ConfBucketType, ConsistencyLevel};
899/// let bt = ConfBucketType {
900///     name: "sessions".into(),
901///     read_consistency: "DC_QUORUM".into(),
902///     write_consistency: "DC_EACH_SAFE_QUORUM".into(),
903///     n_val: 3,
904/// };
905/// assert_eq!(bt.name, "sessions");
906/// assert_eq!(
907///     ConsistencyLevel::parse("read_consistency", &bt.read_consistency).unwrap(),
908///     ConsistencyLevel::DcQuorum,
909/// );
910/// assert_eq!(bt.n_val, 3);
911/// ```
912#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
913#[serde(deny_unknown_fields)]
914pub struct ConfBucketType {
915    /// Bucket name. Compared verbatim against the bytes returned
916    /// by [`crate::proto::redis::bucket_name`]; no normalisation
917    /// is performed. Names must be unique within a pool and must
918    /// not be empty.
919    pub name: String,
920    /// Read-side consistency level for keys in this bucket.
921    /// Stored as a string so the YAML round-trip is
922    /// human-readable; parsed via
923    /// [`ConsistencyLevel::parse`](crate::conf::ConsistencyLevel::parse)
924    /// during validation.
925    pub read_consistency: String,
926    /// Write-side consistency level for keys in this bucket.
927    /// See [`ConfBucketType::read_consistency`].
928    pub write_consistency: String,
929    /// Replication-factor cap. The dispatcher trims its replica
930    /// fan-out to at most this many targets. `0` means "no cap";
931    /// any positive value caps the fan-out to the leading
932    /// `n_val` peers (rack-local first).
933    #[serde(default)]
934    pub n_val: u8,
935}
936
937impl ConfBucketType {
938    /// Parse [`Self::read_consistency`] into the typed enum.
939    ///
940    /// # Examples
941    ///
942    /// ```
943    /// use dynomite::conf::{ConfBucketType, ConsistencyLevel};
944    /// let bt = ConfBucketType {
945    ///     name: "s".into(),
946    ///     read_consistency: "DC_QUORUM".into(),
947    ///     write_consistency: "DC_ONE".into(),
948    ///     n_val: 0,
949    /// };
950    /// assert_eq!(bt.read_level().unwrap(), ConsistencyLevel::DcQuorum);
951    /// ```
952    pub fn read_level(&self) -> Result<ConsistencyLevel, ConfError> {
953        ConsistencyLevel::parse("read_consistency", &self.read_consistency)
954    }
955
956    /// Parse [`Self::write_consistency`] into the typed enum.
957    ///
958    /// # Examples
959    ///
960    /// ```
961    /// use dynomite::conf::{ConfBucketType, ConsistencyLevel};
962    /// let bt = ConfBucketType {
963    ///     name: "s".into(),
964    ///     read_consistency: "DC_ONE".into(),
965    ///     write_consistency: "DC_SAFE_QUORUM".into(),
966    ///     n_val: 0,
967    /// };
968    /// assert_eq!(bt.write_level().unwrap(), ConsistencyLevel::DcSafeQuorum);
969    /// ```
970    pub fn write_level(&self) -> Result<ConsistencyLevel, ConfError> {
971        ConsistencyLevel::parse("write_consistency", &self.write_consistency)
972    }
973}
974
975/// Opt-in observability configuration.
976///
977/// Absent or null disables every observability surface. Covers
978/// distributed-tracing today; an OTLP log-appender bridge using
979/// the same fields is scoped as a follow-up.
980///
981/// # Examples
982///
983/// ```
984/// use dynomite::conf::ObservabilityConfig;
985/// let cfg = ObservabilityConfig {
986///     otlp_traces_endpoint: Some("http://collector:4317".into()),
987///     otlp_logs_endpoint: None,
988///     service_name: Some("dynomited".into()),
989///     traces_sampling: Some(0.1),
990/// };
991/// assert!(cfg.otlp_traces_endpoint.is_some());
992/// ```
993#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
994#[serde(deny_unknown_fields, default)]
995pub struct ObservabilityConfig {
996    /// OTLP gRPC endpoint for distributed traces (e.g.
997    /// `http://localhost:4317`). When `None` the binary skips
998    /// the OTel SDK install entirely; tracing keeps using the
999    /// configured `tracing-subscriber` log layer only.
1000    pub otlp_traces_endpoint: Option<String>,
1001    /// OTLP gRPC endpoint for log records. When set the binary
1002    /// installs an `opentelemetry-appender-tracing` bridge
1003    /// alongside the local log writer. The wiring is scoped as
1004    /// a follow-up; the field is parsed today so YAML files
1005    /// authored against the eventual implementation validate.
1006    pub otlp_logs_endpoint: Option<String>,
1007    /// Service name attached to every emitted span / log record.
1008    /// Defaults to `"dynomited"` when unset.
1009    pub service_name: Option<String>,
1010    /// Trace sampling ratio in `[0.0, 1.0]`. `1.0` records every
1011    /// trace, `0.0` records none. Defaults to `1.0` when unset.
1012    pub traces_sampling: Option<f64>,
1013}
1014
1015impl ConfPool {
1016    /// Resolve the configured [`Distribution`] for the engine.
1017    ///
1018    /// Folds the legacy `ketama` / `modula` / `random` aliases
1019    /// down to [`Distribution::Vnode`] (the only algorithm those
1020    /// names ever resolved to in the Rust port). Emits a
1021    /// `tracing::warn!` for the legacy aliases the first time
1022    /// the field is read so the operator notices the
1023    /// deprecation.
1024    ///
1025    /// # Examples
1026    ///
1027    /// ```
1028    /// use dynomite::conf::{ConfPool, Distribution};
1029    /// let p = ConfPool {
1030    ///     distribution: Some(Distribution::RandomSlicing),
1031    ///     ..ConfPool::default()
1032    /// };
1033    /// assert_eq!(p.resolved_distribution(), Distribution::RandomSlicing);
1034    /// ```
1035    #[must_use]
1036    pub fn resolved_distribution(&self) -> Distribution {
1037        match self.distribution {
1038            None | Some(Distribution::Vnode) => Distribution::Vnode,
1039            Some(Distribution::RandomSlicing) => Distribution::RandomSlicing,
1040            Some(other) => {
1041                tracing::warn!(
1042                    target: "dynomite::conf",
1043                    distribution = other.as_str(),
1044                    "distribution mode '{}' is a legacy alias and resolves to 'vnode'; \
1045                     update the YAML to either 'vnode' or 'random_slicing'",
1046                    other
1047                );
1048                Distribution::Vnode
1049            }
1050        }
1051    }
1052}
1053
1054impl ConfPool {
1055    /// Apply defaults to any field still left `None` after parsing.
1056    ///
1057    /// # Examples
1058    ///
1059    /// ```
1060    /// use dynomite::conf::ConfPool;
1061    /// let mut p = ConfPool::default();
1062    /// p.apply_defaults();
1063    /// assert_eq!(p.timeout, Some(5_000));
1064    /// assert_eq!(p.rack.as_deref(), Some("localrack"));
1065    /// ```
1066    pub fn apply_defaults(&mut self) {
1067        if self.dyn_seed_provider.is_none() {
1068            self.dyn_seed_provider = Some(defaults::SEED_PROVIDER.to_string());
1069        }
1070        if self.hash.is_none() {
1071            self.hash = Some(HashType::Murmur);
1072        }
1073        if self.timeout.is_none() {
1074            self.timeout = Some(defaults::TIMEOUT_MS);
1075        }
1076        if self.backlog.is_none() {
1077            self.backlog = Some(defaults::LISTEN_BACKLOG);
1078        }
1079        // client_connections is unconditionally reset to its default
1080        // by the C validator; we mirror that exactly.
1081        self.client_connections = Some(defaults::CLIENT_CONNECTIONS);
1082        if self.data_store.is_none() {
1083            self.data_store = Some(defaults::DATA_STORE);
1084        }
1085        if self.preconnect.is_none() {
1086            self.preconnect = Some(defaults::PRECONNECT);
1087        }
1088        if self.auto_eject_hosts.is_none() {
1089            self.auto_eject_hosts = Some(defaults::AUTO_EJECT_HOSTS);
1090        }
1091        if self.server_retry_timeout.is_none() {
1092            self.server_retry_timeout = Some(defaults::SERVER_RETRY_TIMEOUT_MS);
1093        }
1094        if self.server_failure_limit.is_none() {
1095            self.server_failure_limit = Some(defaults::SERVER_FAILURE_LIMIT);
1096        }
1097        if self.dyn_read_timeout.is_none() {
1098            self.dyn_read_timeout = Some(defaults::DYN_READ_TIMEOUT_MS);
1099        }
1100        if self.dyn_write_timeout.is_none() {
1101            self.dyn_write_timeout = Some(defaults::DYN_WRITE_TIMEOUT_MS);
1102        }
1103        if self.dyn_connections.is_none() {
1104            self.dyn_connections = Some(defaults::DYN_CONNECTIONS);
1105        }
1106        if self.gos_interval.is_none() {
1107            self.gos_interval = Some(defaults::GOS_INTERVAL_MS);
1108        }
1109        if self.conn_msg_rate.is_none() {
1110            self.conn_msg_rate = Some(defaults::CONN_MSG_RATE);
1111        }
1112        if self.rack.is_none() {
1113            self.rack = Some(defaults::RACK.to_string());
1114        }
1115        if self.datacenter.is_none() {
1116            self.datacenter = Some(defaults::DC.to_string());
1117        }
1118        if self.secure_server_option.is_none() {
1119            self.secure_server_option = Some(defaults::SECURE_SERVER_OPTION.to_string());
1120        }
1121        if self.read_consistency.is_none() {
1122            self.read_consistency = Some(defaults::CONSISTENCY.to_string());
1123        }
1124        if self.write_consistency.is_none() {
1125            self.write_consistency = Some(defaults::CONSISTENCY.to_string());
1126        }
1127        if self.stats_interval.is_none() {
1128            self.stats_interval = Some(defaults::STATS_INTERVAL_MS);
1129        }
1130        if self.stats_listen.is_none() {
1131            // Safe: the constant is a hard-coded valid pname.
1132            self.stats_listen = Some(
1133                ConfListen::parse("stats_listen", defaults::STATS_PNAME)
1134                    .expect("invariant: STATS_PNAME constant is valid"),
1135            );
1136        }
1137        if self.env.is_none() {
1138            self.env = Some(defaults::ENV.to_string());
1139        }
1140        if self.pem_key_file.is_none() {
1141            self.pem_key_file = Some(defaults::PEM_KEY_FILE.to_string());
1142        }
1143        if self.recon_key_file.is_none() {
1144            self.recon_key_file = Some(defaults::RECON_KEY_FILE.to_string());
1145        }
1146        if self.recon_iv_file.is_none() {
1147            self.recon_iv_file = Some(defaults::RECON_IV_FILE.to_string());
1148        }
1149        if self.recon_interval_seconds.is_none() {
1150            self.recon_interval_seconds = Some(defaults::RECON_INTERVAL_SECONDS);
1151        }
1152        if self.datastore_connections.is_none() {
1153            self.datastore_connections = Some(defaults::DATASTORE_CONNECTIONS);
1154        }
1155        if self.local_peer_connections.is_none() {
1156            self.local_peer_connections = Some(defaults::LOCAL_PEER_CONNECTIONS);
1157        }
1158        if self.remote_peer_connections.is_none() {
1159            self.remote_peer_connections = Some(defaults::REMOTE_PEER_CONNECTIONS);
1160        }
1161        if self.read_repairs_enabled.is_none() {
1162            self.read_repairs_enabled = Some(false);
1163        }
1164        if self.enable_gossip.is_none() {
1165            self.enable_gossip = Some(false);
1166        }
1167        self.apply_hinted_handoff_defaults();
1168    }
1169
1170    /// Fill in the hinted-handoff knobs and the transport
1171    /// selector. Factored out of [`Self::apply_defaults`] to
1172    /// keep the parent method under the project's per-function
1173    /// line budget while still covering every recently-added
1174    /// key with a default.
1175    fn apply_hinted_handoff_defaults(&mut self) {
1176        if self.enable_hinted_handoff.is_none() {
1177            self.enable_hinted_handoff = Some(defaults::ENABLE_HINTED_HANDOFF);
1178        }
1179        if self.hint_ttl_seconds.is_none() {
1180            self.hint_ttl_seconds = Some(defaults::HINT_TTL_SECONDS);
1181        }
1182        if self.hint_store_max_bytes.is_none() {
1183            self.hint_store_max_bytes = Some(defaults::HINT_STORE_MAX_BYTES);
1184        }
1185        if self.hint_drain_interval_ms.is_none() {
1186            self.hint_drain_interval_ms = Some(defaults::HINT_DRAIN_INTERVAL_MS);
1187        }
1188        if self.transport.is_none() {
1189            self.transport = Some(Transport::default());
1190        }
1191    }
1192
1193    /// Run the full validation pass against the (presumably finalized)
1194    /// pool body.
1195    ///
1196    /// # Examples
1197    ///
1198    /// ```
1199    /// use dynomite::conf::{ConfListen, ConfPool, ConfServer, Servers, TokenList};
1200    /// let mut p = ConfPool {
1201    ///     listen: Some(ConfListen::parse("listen", "127.0.0.1:8102").unwrap()),
1202    ///     servers: Some(Servers::from_vec(vec![ConfServer::parse("127.0.0.1:6379:1").unwrap()])),
1203    ///     tokens: Some(TokenList::parse("0").unwrap()),
1204    ///     ..ConfPool::default()
1205    /// };
1206    /// p.apply_defaults();
1207    /// assert!(p.validate("dyn_o_mite").is_ok());
1208    /// ```
1209    pub fn validate(&self, pool_name: &str) -> Result<(), ConfError> {
1210        if pool_name.is_empty() {
1211            return Err(ConfError::EmptyPoolName);
1212        }
1213
1214        if self.listen.is_none() {
1215            return Err(ConfError::MissingRequired("listen"));
1216        }
1217
1218        self.validate_numeric_ranges()?;
1219        self.validate_mbuf_size()?;
1220        self.validate_max_msgs()?;
1221
1222        if let Some(n) = self.data_store {
1223            let ds = DataStore::from_int(n)?;
1224            if ds == DataStore::Noxu {
1225                self.validate_noxu()?;
1226            }
1227        }
1228        if let Some(tag) = &self.hash_tag {
1229            if tag.chars().count() != 2 {
1230                return Err(ConfError::BadHashTag(tag.clone()));
1231            }
1232        }
1233
1234        let secure = if let Some(s) = &self.secure_server_option {
1235            SecureServerOption::parse(s)?
1236        } else {
1237            SecureServerOption::None
1238        };
1239        if let Some(s) = &self.read_consistency {
1240            ConsistencyLevel::parse("read_consistency", s)?;
1241        }
1242        if let Some(s) = &self.write_consistency {
1243            ConsistencyLevel::parse("write_consistency", s)?;
1244        }
1245        if secure != SecureServerOption::None {
1246            match &self.pem_key_file {
1247                Some(s) if !s.is_empty() => {}
1248                _ => return Err(ConfError::MissingRequired("pem_key_file")),
1249            }
1250        }
1251
1252        if let Some(s) = &self.log_format {
1253            crate::core::log::LogFormat::parse(s).map_err(|e| ConfError::BadServer {
1254                field: "log_format",
1255                value: s.clone(),
1256                reason: e.to_string(),
1257            })?;
1258        }
1259
1260        self.validate_bucket_types()?;
1261        self.validate_hinted_handoff()?;
1262        self.validate_peer_tls()?;
1263        self.validate_transport()?;
1264        if let Some(r) = &self.riak {
1265            r.validate()?;
1266        }
1267
1268        match &self.servers {
1269            None => return Err(ConfError::MissingRequired("servers")),
1270            Some(s) if s.is_empty() => return Err(ConfError::MissingRequired("servers")),
1271            Some(s) if s.len() > 1 => {
1272                return Err(ConfError::BadServer {
1273                    field: "servers",
1274                    value: s.len().to_string(),
1275                    reason: "expected exactly one datastore entry".to_string(),
1276                });
1277            }
1278            Some(_) => {}
1279        }
1280
1281        Ok(())
1282    }
1283
1284    fn validate_numeric_ranges(&self) -> Result<(), ConfError> {
1285        check_positive("timeout", self.timeout)?;
1286        check_positive("backlog", self.backlog)?;
1287        check_non_negative("client_connections", self.client_connections)?;
1288        check_positive("server_retry_timeout", self.server_retry_timeout)?;
1289        check_positive("server_failure_limit", self.server_failure_limit)?;
1290        check_positive("dyn_read_timeout", self.dyn_read_timeout)?;
1291        check_positive("dyn_write_timeout", self.dyn_write_timeout)?;
1292        check_positive("gos_interval", self.gos_interval)?;
1293        check_positive("stats_interval", self.stats_interval)?;
1294
1295        if let Some(n) = self.dyn_connections {
1296            if n <= 0 {
1297                return Err(ConfError::OutOfRange {
1298                    field: "dyn_connections",
1299                    value: n,
1300                    reason: "must be a positive non-zero number",
1301                });
1302            }
1303        }
1304        Ok(())
1305    }
1306
1307    fn validate_mbuf_size(&self) -> Result<(), ConfError> {
1308        let Some(n) = self.mbuf_size else {
1309            return Ok(());
1310        };
1311        if n <= 0 {
1312            return Err(ConfError::OutOfRange {
1313                field: "mbuf_size",
1314                value: n,
1315                reason: "must be a positive number",
1316            });
1317        }
1318        if !(defaults::MBUF_MIN_SIZE..=defaults::MBUF_MAX_SIZE).contains(&n) {
1319            return Err(ConfError::OutOfRange {
1320                field: "mbuf_size",
1321                value: n,
1322                reason: "must be between 512 and 512000 bytes",
1323            });
1324        }
1325        if n % 16 != 0 {
1326            return Err(ConfError::OutOfRange {
1327                field: "mbuf_size",
1328                value: n,
1329                reason: "must be a multiple of 16",
1330            });
1331        }
1332        Ok(())
1333    }
1334
1335    fn validate_max_msgs(&self) -> Result<(), ConfError> {
1336        let Some(n) = self.max_msgs else {
1337            return Ok(());
1338        };
1339        if n <= 0 {
1340            return Err(ConfError::OutOfRange {
1341                field: "max_msgs",
1342                value: n,
1343                reason: "requires a non-zero number",
1344            });
1345        }
1346        if !(defaults::ALLOC_MSGS_MIN..=defaults::ALLOC_MSGS_MAX).contains(&n) {
1347            return Err(ConfError::OutOfRange {
1348                field: "max_msgs",
1349                value: n,
1350                reason: "must be between 100000 and 1000000 messages",
1351            });
1352        }
1353        Ok(())
1354    }
1355
1356    fn validate_bucket_types(&self) -> Result<(), ConfError> {
1357        use std::collections::BTreeSet;
1358        let mut seen: BTreeSet<&str> = BTreeSet::new();
1359        for bt in &self.bucket_types {
1360            if bt.name.is_empty() {
1361                return Err(ConfError::BadServer {
1362                    field: "bucket_types",
1363                    value: String::new(),
1364                    reason: "bucket-type name must not be empty".to_string(),
1365                });
1366            }
1367            if !seen.insert(bt.name.as_str()) {
1368                return Err(ConfError::BadServer {
1369                    field: "bucket_types",
1370                    value: bt.name.clone(),
1371                    reason: "duplicate bucket-type name".to_string(),
1372                });
1373            }
1374            ConsistencyLevel::parse("read_consistency", &bt.read_consistency)?;
1375            ConsistencyLevel::parse("write_consistency", &bt.write_consistency)?;
1376        }
1377        if let Some(name) = &self.default_bucket_type {
1378            if !self.bucket_types.iter().any(|bt| &bt.name == name) {
1379                return Err(ConfError::BadServer {
1380                    field: "default_bucket_type",
1381                    value: name.clone(),
1382                    reason: "references an undefined bucket-type name".to_string(),
1383                });
1384            }
1385        }
1386        Ok(())
1387    }
1388
1389    /// Validate the cross-field invariants of the Noxu
1390    /// datastore selection.
1391    ///
1392    /// Selecting `data_store: noxu` is permitted only when the
1393    /// binary was built with `--features riak`; without it,
1394    /// `dynomited` cannot construct a `NoxuDatastore` because
1395    /// the `dyniak` crate (which owns the type) is not
1396    /// linked. The check is gated on a `cfg!(feature = ...)`
1397    /// expression that the parent crate threads through via
1398    /// the [`crate::conf::set_noxu_supported`] toggle: the
1399    /// engine ships with the toggle off, the `dynomited` binary
1400    /// turns it on under `--features riak`. The toggle is
1401    /// global because `data_store: noxu` is a build-time
1402    /// configuration constraint, not a per-pool one.
1403    ///
1404    /// `noxu_path:` must be set and non-empty.
1405    fn validate_noxu(&self) -> Result<(), ConfError> {
1406        if !crate::conf::is_noxu_supported() {
1407            return Err(ConfError::BadNoxuConfig(
1408                "noxu data_store requires dynomited built with --features riak",
1409            ));
1410        }
1411        match self.noxu_path.as_deref() {
1412            Some(p) if !p.as_os_str().is_empty() => Ok(()),
1413            _ => Err(ConfError::BadNoxuConfig(
1414                "data_store: noxu requires a non-empty 'noxu_path:' directive",
1415            )),
1416        }
1417    }
1418
1419    fn validate_hinted_handoff(&self) -> Result<(), ConfError> {
1420        if self.enable_hinted_handoff != Some(true) {
1421            return Ok(());
1422        }
1423        if let Some(ttl) = self.hint_ttl_seconds {
1424            if ttl == 0 {
1425                return Err(ConfError::BadServer {
1426                    field: "hint_ttl_seconds",
1427                    value: ttl.to_string(),
1428                    reason: "must be a positive number when enable_hinted_handoff is true"
1429                        .to_string(),
1430                });
1431            }
1432        }
1433        if let Some(cap) = self.hint_store_max_bytes {
1434            if cap == 0 {
1435                return Err(ConfError::BadServer {
1436                    field: "hint_store_max_bytes",
1437                    value: cap.to_string(),
1438                    reason: "must be a positive number when enable_hinted_handoff is true"
1439                        .to_string(),
1440                });
1441            }
1442        }
1443        if let Some(period) = self.hint_drain_interval_ms {
1444            if period == 0 {
1445                return Err(ConfError::BadServer {
1446                    field: "hint_drain_interval_ms",
1447                    value: period.to_string(),
1448                    reason: "must be a positive number when enable_hinted_handoff is true"
1449                        .to_string(),
1450                });
1451            }
1452        }
1453        Ok(())
1454    }
1455
1456    /// Cross-check the peer-plane TLS knobs.
1457    ///
1458    /// `peer_tls_cert` and `peer_tls_key` must both be set or
1459    /// both be unset. `peer_tls_ca` is independent (it controls
1460    /// optional mutual TLS) but only meaningful when the cert /
1461    /// key pair is set. Each per-DC profile in
1462    /// `peer_tls_profiles` is validated by
1463    /// [`ConfTlsProfile::validate`]; the per-DC profile names
1464    /// must be non-empty.
1465    fn validate_peer_tls(&self) -> Result<(), ConfError> {
1466        validate_tls_pair(
1467            "peer_tls_cert",
1468            "peer_tls_key",
1469            self.peer_tls_cert.as_deref(),
1470            self.peer_tls_key.as_deref(),
1471        )?;
1472        if self.peer_tls_ca.is_some() && self.peer_tls_cert.is_none() {
1473            return Err(ConfError::BadServer {
1474                field: "peer_tls_ca",
1475                value: self
1476                    .peer_tls_ca
1477                    .as_ref()
1478                    .map_or_else(String::new, |p| p.display().to_string()),
1479                reason: "requires peer_tls_cert and peer_tls_key to also be set".into(),
1480            });
1481        }
1482        for (dc, profile) in &self.peer_tls_profiles {
1483            if dc.is_empty() {
1484                return Err(ConfError::BadServer {
1485                    field: "peer_tls_profiles",
1486                    value: String::new(),
1487                    reason: "per-DC TLS profile name must not be empty".into(),
1488                });
1489            }
1490            profile.validate(dc)?;
1491        }
1492        Ok(())
1493    }
1494
1495    /// Cross-check the `transport:` selection against the
1496    /// QUIC cert / key knobs.
1497    ///
1498    /// `transport: quic` requires both [`Self::quic_cert_file`]
1499    /// and [`Self::quic_key_file`] to be set; the QUIC listener
1500    /// in `dynomite::net::quic::QuicConfig` cannot bind without
1501    /// a server cert chain and matching private key. The fields
1502    /// are tolerated (but ignored) when `transport: tcp` so
1503    /// operators can switch transports by toggling a single
1504    /// directive without rewriting the whole pool block.
1505    fn validate_transport(&self) -> Result<(), ConfError> {
1506        let resolved = self.transport.unwrap_or_default();
1507        if resolved != Transport::Quic {
1508            return Ok(());
1509        }
1510        match (
1511            self.quic_cert_file.as_deref(),
1512            self.quic_key_file.as_deref(),
1513        ) {
1514            (Some(_), Some(_)) => Ok(()),
1515            (None, _) => Err(ConfError::BadServer {
1516                field: "quic_cert_file",
1517                value: String::new(),
1518                reason: "transport: quic requires quic_cert_file to be set".into(),
1519            }),
1520            (Some(_), None) => Err(ConfError::BadServer {
1521                field: "quic_key_file",
1522                value: String::new(),
1523                reason: "transport: quic requires quic_key_file to be set".into(),
1524            }),
1525        }
1526    }
1527}
1528
1529impl fmt::Display for ConfPool {
1530    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1531        // We render the pool body by re-serializing through serde_yaml
1532        // so the round-trip is well defined; this is used by `test_conf`
1533        // and rustdoc examples.
1534        match serde_yaml::to_string(self) {
1535            Ok(s) => f.write_str(&s),
1536            Err(_) => Err(fmt::Error),
1537        }
1538    }
1539}
1540
1541fn check_positive(field: &'static str, v: Option<i64>) -> Result<(), ConfError> {
1542    if let Some(n) = v {
1543        if n <= 0 {
1544            return Err(ConfError::OutOfRange {
1545                field,
1546                value: n,
1547                reason: "must be a positive number",
1548            });
1549        }
1550    }
1551    Ok(())
1552}
1553
1554fn check_non_negative(field: &'static str, v: Option<i64>) -> Result<(), ConfError> {
1555    if let Some(n) = v {
1556        if n < 0 {
1557            return Err(ConfError::OutOfRange {
1558                field,
1559                value: n,
1560                reason: "must be a non-negative number",
1561            });
1562        }
1563    }
1564    Ok(())
1565}
1566
1567/// Custom deserializer for `data_store:` that accepts either the
1568/// historical integer form (`0`, `1`, `2`) or the textual form
1569/// (`redis`, `memcache`, `noxu`). Both shapes normalise to the
1570/// integer code that the rest of the engine consumes.
1571fn deserialize_data_store<'de, D>(de: D) -> Result<Option<i64>, D::Error>
1572where
1573    D: serde::Deserializer<'de>,
1574{
1575    use serde::de::{self, Visitor};
1576    use std::fmt;
1577
1578    struct V;
1579    impl<'de> Visitor<'de> for V {
1580        type Value = Option<i64>;
1581
1582        fn expecting(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1583            f.write_str("a data_store value: integer (0, 1, 2) or string (redis, memcache, noxu)")
1584        }
1585
1586        fn visit_none<E: de::Error>(self) -> Result<Self::Value, E> {
1587            Ok(None)
1588        }
1589
1590        fn visit_unit<E: de::Error>(self) -> Result<Self::Value, E> {
1591            Ok(None)
1592        }
1593
1594        fn visit_some<D2: serde::Deserializer<'de>>(
1595            self,
1596            de: D2,
1597        ) -> Result<Self::Value, D2::Error> {
1598            de.deserialize_any(V)
1599        }
1600
1601        fn visit_i64<E: de::Error>(self, v: i64) -> Result<Self::Value, E> {
1602            DataStore::from_int(v)
1603                .map(|d| Some(d.as_int()))
1604                .map_err(|e| E::custom(e.to_string()))
1605        }
1606
1607        fn visit_u64<E: de::Error>(self, v: u64) -> Result<Self::Value, E> {
1608            let n = i64::try_from(v).map_err(|_| E::custom("data_store integer overflow"))?;
1609            self.visit_i64(n)
1610        }
1611
1612        fn visit_str<E: de::Error>(self, v: &str) -> Result<Self::Value, E> {
1613            DataStore::from_name(v)
1614                .map(|d| Some(d.as_int()))
1615                .map_err(|_| {
1616                    E::custom(format!(
1617                        "data_store: unknown name '{v}'; expected one of: redis, memcache, noxu"
1618                    ))
1619                })
1620        }
1621
1622        fn visit_string<E: de::Error>(self, v: String) -> Result<Self::Value, E> {
1623            self.visit_str(&v)
1624        }
1625    }
1626
1627    de.deserialize_any(V)
1628}
1629
1630#[cfg(test)]
1631mod tests {
1632    use super::*;
1633
1634    fn pool() -> ConfPool {
1635        ConfPool {
1636            listen: Some(ConfListen::parse("listen", "127.0.0.1:8102").unwrap()),
1637            servers: Some(Servers::from_vec(vec![ConfServer::parse(
1638                "127.0.0.1:6379:1",
1639            )
1640            .unwrap()])),
1641            tokens: Some(TokenList::parse("0").unwrap()),
1642            ..ConfPool::default()
1643        }
1644    }
1645
1646    #[test]
1647    fn validate_minimal_post_finalize() {
1648        let mut p = pool();
1649        p.apply_defaults();
1650        p.validate("dyn_o_mite").unwrap();
1651    }
1652
1653    #[test]
1654    fn missing_listen_rejected() {
1655        let mut p = pool();
1656        p.listen = None;
1657        p.apply_defaults();
1658        assert!(matches!(
1659            p.validate("p"),
1660            Err(ConfError::MissingRequired("listen"))
1661        ));
1662    }
1663
1664    #[test]
1665    fn out_of_range_mbuf_rejected() {
1666        let mut p = pool();
1667        p.mbuf_size = Some(127);
1668        p.apply_defaults();
1669        assert!(matches!(p.validate("p"), Err(ConfError::OutOfRange { .. })));
1670    }
1671
1672    #[test]
1673    fn distribution_field_round_trips_through_yaml() {
1674        let yaml = r"
1675p:
1676  listen: 127.0.0.1:8102
1677  dyn_listen: 127.0.0.1:8101
1678  tokens: '0'
1679  servers:
1680  - 127.0.0.1:6379:1
1681  data_store: 0
1682  distribution: random_slicing
1683  distribution_shadow: vnode
1684  hash: murmur3_x64_64
1685";
1686        let parsed: std::collections::BTreeMap<String, ConfPool> =
1687            serde_yaml::from_str(yaml).unwrap();
1688        let pool = parsed.get("p").unwrap();
1689        assert_eq!(pool.distribution, Some(Distribution::RandomSlicing));
1690        assert_eq!(pool.distribution_shadow, Some(Distribution::Vnode));
1691        assert_eq!(pool.hash, Some(HashType::Murmur3X64_64));
1692        assert_eq!(pool.resolved_distribution(), Distribution::RandomSlicing);
1693    }
1694
1695    #[test]
1696    fn distribution_legacy_alias_resolves_to_vnode() {
1697        let mut p = pool();
1698        p.distribution = Some(Distribution::Ketama);
1699        assert_eq!(p.resolved_distribution(), Distribution::Vnode);
1700        p.distribution = Some(Distribution::Modula);
1701        assert_eq!(p.resolved_distribution(), Distribution::Vnode);
1702        p.distribution = Some(Distribution::Random);
1703        assert_eq!(p.resolved_distribution(), Distribution::Vnode);
1704    }
1705
1706    #[test]
1707    fn distribution_default_unset_is_vnode() {
1708        let p = pool();
1709        assert!(p.distribution.is_none());
1710        assert_eq!(p.resolved_distribution(), Distribution::Vnode);
1711    }
1712
1713    #[test]
1714    fn mbuf_size_not_multiple_of_16_rejected() {
1715        let mut p = pool();
1716        p.mbuf_size = Some(513);
1717        p.apply_defaults();
1718        assert!(matches!(p.validate("p"), Err(ConfError::OutOfRange { .. })));
1719    }
1720
1721    #[test]
1722    fn pem_required_when_secure() {
1723        let mut p = pool();
1724        p.secure_server_option = Some("datacenter".to_string());
1725        p.pem_key_file = Some(String::new());
1726        p.apply_defaults();
1727        // apply_defaults restores pem_key_file because it's `Some("")`,
1728        // which is non-None; so we expect MissingRequired("pem_key_file").
1729        assert!(matches!(
1730            p.validate("p"),
1731            Err(ConfError::MissingRequired("pem_key_file"))
1732        ));
1733    }
1734
1735    #[test]
1736    fn data_store_out_of_range_rejected() {
1737        let mut p = pool();
1738        p.data_store = Some(7);
1739        p.apply_defaults();
1740        assert!(matches!(p.validate("p"), Err(ConfError::BadDataStore(7))));
1741    }
1742
1743    /// Lock serialising tests that mutate the process-wide
1744    /// `NOXU_SUPPORTED` flag. cargo test runs tests on multiple
1745    /// threads; without serialisation a parallel test can flip
1746    /// the flag back before the assertion runs.
1747    static NOXU_FLAG_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
1748
1749    #[test]
1750    fn data_store_noxu_requires_riak_feature() {
1751        let _g = NOXU_FLAG_LOCK
1752            .lock()
1753            .unwrap_or_else(std::sync::PoisonError::into_inner);
1754        // Default state: noxu support flag is off; selecting
1755        // noxu must be rejected with the documented message.
1756        let prev = crate::conf::is_noxu_supported();
1757        crate::conf::set_noxu_supported(false);
1758        let mut p = pool();
1759        p.data_store = Some(2);
1760        p.noxu_path = Some("/tmp/test".into());
1761        p.apply_defaults();
1762        let err = p.validate("p");
1763        crate::conf::set_noxu_supported(prev);
1764        match err {
1765            Err(ConfError::BadNoxuConfig(msg)) => {
1766                assert!(msg.contains("--features riak"), "unexpected message: {msg}");
1767            }
1768            other => panic!("expected BadNoxuConfig, got {other:?}"),
1769        }
1770    }
1771
1772    #[test]
1773    fn data_store_noxu_requires_path() {
1774        let _g = NOXU_FLAG_LOCK
1775            .lock()
1776            .unwrap_or_else(std::sync::PoisonError::into_inner);
1777        let prev = crate::conf::is_noxu_supported();
1778        crate::conf::set_noxu_supported(true);
1779        let mut p = pool();
1780        p.data_store = Some(2);
1781        p.noxu_path = None;
1782        p.apply_defaults();
1783        let err = p.validate("p");
1784        crate::conf::set_noxu_supported(prev);
1785        match err {
1786            Err(ConfError::BadNoxuConfig(msg)) => {
1787                assert!(msg.contains("noxu_path"), "unexpected message: {msg}");
1788            }
1789            other => panic!("expected BadNoxuConfig, got {other:?}"),
1790        }
1791    }
1792
1793    #[test]
1794    fn data_store_noxu_yaml_round_trip_string_form() {
1795        // String form `data_store: noxu` and integer form
1796        // `data_store: 2` both normalise to integer 2 on parse.
1797        let yaml = r"
1798listen: 127.0.0.1:8102
1799servers:
1800- 127.0.0.1:6379:1
1801tokens: '0'
1802data_store: noxu
1803noxu_path: /tmp/test
1804";
1805        let p: ConfPool = serde_yaml::from_str(yaml).unwrap();
1806        assert_eq!(p.data_store, Some(2));
1807        assert_eq!(
1808            p.noxu_path.as_deref(),
1809            Some(std::path::Path::new("/tmp/test"))
1810        );
1811        // Re-emit and re-parse: round-trip is stable.
1812        let dumped = serde_yaml::to_string(&p).unwrap();
1813        let p2: ConfPool = serde_yaml::from_str(&dumped).unwrap();
1814        assert_eq!(p2.data_store, p.data_store);
1815        assert_eq!(p2.noxu_path, p.noxu_path);
1816    }
1817
1818    #[test]
1819    fn data_store_yaml_int_form_still_works() {
1820        let yaml = r"
1821listen: 127.0.0.1:8102
1822servers:
1823- 127.0.0.1:6379:1
1824tokens: '0'
1825data_store: 2
1826noxu_path: /tmp/test
1827";
1828        let p: ConfPool = serde_yaml::from_str(yaml).unwrap();
1829        assert_eq!(p.data_store, Some(2));
1830    }
1831
1832    #[test]
1833    fn data_store_string_form_unknown_rejected() {
1834        let yaml = r"
1835listen: 127.0.0.1:8102
1836servers:
1837- 127.0.0.1:6379:1
1838tokens: '0'
1839data_store: postgres
1840";
1841        let err = serde_yaml::from_str::<ConfPool>(yaml).unwrap_err();
1842        let msg = err.to_string();
1843        assert!(
1844            msg.contains("unknown name") || msg.contains("data_store"),
1845            "unexpected message: {msg}"
1846        );
1847    }
1848
1849    #[test]
1850    fn hash_tag_must_be_two_chars() {
1851        let mut p = pool();
1852        p.hash_tag = Some("abc".to_string());
1853        p.apply_defaults();
1854        assert!(matches!(p.validate("p"), Err(ConfError::BadHashTag(_))));
1855    }
1856
1857    #[test]
1858    fn empty_servers_rejected() {
1859        let mut p = pool();
1860        p.servers = Some(Servers::from_vec(vec![]));
1861        p.apply_defaults();
1862        assert!(matches!(
1863            p.validate("p"),
1864            Err(ConfError::MissingRequired("servers"))
1865        ));
1866    }
1867
1868    #[test]
1869    fn log_format_known_values_accepted() {
1870        for value in ["default", "rfc5424", "rfc3164", "json", "ndjson", "DEFAULT"] {
1871            let mut p = pool();
1872            p.log_format = Some(value.to_string());
1873            p.apply_defaults();
1874            assert!(p.validate("p").is_ok(), "value {value:?} should validate");
1875        }
1876    }
1877
1878    #[test]
1879    fn log_format_unknown_rejected() {
1880        let mut p = pool();
1881        p.log_format = Some("yaml".to_string());
1882        p.apply_defaults();
1883        let err = p.validate("p").unwrap_err();
1884        assert!(
1885            matches!(
1886                err,
1887                ConfError::BadServer {
1888                    field: "log_format",
1889                    ..
1890                }
1891            ),
1892            "unexpected error: {err:?}"
1893        );
1894    }
1895
1896    #[test]
1897    fn observability_block_round_trips() {
1898        let yaml = r"
1899observability:
1900  otlp_logs_endpoint: http://collector:4317
1901  service_name: dynomited
1902listen: 127.0.0.1:8102
1903servers:
1904- 127.0.0.1:6379:1
1905tokens: '0'
1906";
1907        let p: ConfPool = serde_yaml::from_str(yaml).unwrap();
1908        let obs = p.observability.as_ref().expect("observability set");
1909        assert_eq!(
1910            obs.otlp_logs_endpoint.as_deref(),
1911            Some("http://collector:4317")
1912        );
1913        assert_eq!(obs.service_name.as_deref(), Some("dynomited"));
1914    }
1915
1916    #[test]
1917    fn bucket_types_round_trip() {
1918        let yaml = r"
1919listen: 127.0.0.1:8102
1920servers:
1921- 127.0.0.1:6379:1
1922tokens: '0'
1923bucket_types:
1924- name: hot
1925  read_consistency: DC_QUORUM
1926  write_consistency: DC_EACH_SAFE_QUORUM
1927  n_val: 3
1928- name: cold
1929  read_consistency: DC_ONE
1930  write_consistency: DC_ONE
1931  n_val: 1
1932default_bucket_type: cold
1933";
1934        let p: ConfPool = serde_yaml::from_str(yaml).unwrap();
1935        assert_eq!(p.bucket_types.len(), 2);
1936        assert_eq!(p.bucket_types[0].name, "hot");
1937        assert_eq!(p.bucket_types[0].n_val, 3);
1938        assert_eq!(
1939            p.bucket_types[0].read_level().unwrap(),
1940            crate::conf::ConsistencyLevel::DcQuorum,
1941        );
1942        assert_eq!(p.default_bucket_type.as_deref(), Some("cold"));
1943        // Re-emit and re-parse: round-trip preserves the data.
1944        let dumped = serde_yaml::to_string(&p).unwrap();
1945        let p2: ConfPool = serde_yaml::from_str(&dumped).unwrap();
1946        assert_eq!(p2.bucket_types, p.bucket_types);
1947        assert_eq!(p2.default_bucket_type, p.default_bucket_type);
1948    }
1949
1950    #[test]
1951    fn bucket_types_default_is_empty() {
1952        let mut p = pool();
1953        p.apply_defaults();
1954        assert!(p.bucket_types.is_empty());
1955        assert!(p.default_bucket_type.is_none());
1956        assert!(p.validate("p").is_ok());
1957    }
1958
1959    #[test]
1960    fn duplicate_bucket_type_name_rejected() {
1961        let mut p = pool();
1962        p.bucket_types = vec![
1963            ConfBucketType {
1964                name: "a".into(),
1965                read_consistency: "DC_ONE".into(),
1966                write_consistency: "DC_ONE".into(),
1967                n_val: 0,
1968            },
1969            ConfBucketType {
1970                name: "a".into(),
1971                read_consistency: "DC_ONE".into(),
1972                write_consistency: "DC_ONE".into(),
1973                n_val: 0,
1974            },
1975        ];
1976        p.apply_defaults();
1977        let err = p.validate("p").unwrap_err();
1978        assert!(
1979            matches!(
1980                err,
1981                ConfError::BadServer {
1982                    field: "bucket_types",
1983                    ..
1984                }
1985            ),
1986            "unexpected error: {err:?}",
1987        );
1988    }
1989
1990    #[test]
1991    fn bucket_type_unknown_consistency_rejected() {
1992        let mut p = pool();
1993        p.bucket_types = vec![ConfBucketType {
1994            name: "a".into(),
1995            read_consistency: "DC_PURPLE".into(),
1996            write_consistency: "DC_ONE".into(),
1997            n_val: 0,
1998        }];
1999        p.apply_defaults();
2000        let err = p.validate("p").unwrap_err();
2001        assert!(matches!(err, ConfError::BadConsistency { .. }));
2002    }
2003
2004    #[test]
2005    fn unknown_default_bucket_type_rejected() {
2006        let mut p = pool();
2007        p.default_bucket_type = Some("missing".into());
2008        p.apply_defaults();
2009        let err = p.validate("p").unwrap_err();
2010        assert!(matches!(
2011            err,
2012            ConfError::BadServer {
2013                field: "default_bucket_type",
2014                ..
2015            }
2016        ));
2017    }
2018
2019    #[test]
2020    fn hinted_handoff_default_off_with_canonical_constants() {
2021        let mut p = pool();
2022        p.apply_defaults();
2023        assert_eq!(p.enable_hinted_handoff, Some(false));
2024        assert_eq!(p.hint_ttl_seconds, Some(defaults::HINT_TTL_SECONDS));
2025        assert_eq!(p.hint_store_max_bytes, Some(defaults::HINT_STORE_MAX_BYTES));
2026        assert_eq!(
2027            p.hint_drain_interval_ms,
2028            Some(defaults::HINT_DRAIN_INTERVAL_MS)
2029        );
2030        assert!(p.validate("p").is_ok());
2031    }
2032
2033    #[test]
2034    fn hinted_handoff_yaml_round_trip() {
2035        let yaml = r"
2036listen: 127.0.0.1:8102
2037servers:
2038- 127.0.0.1:6379:1
2039tokens: '0'
2040enable_hinted_handoff: true
2041hint_ttl_seconds: 7200
2042hint_store_max_bytes: 8388608
2043hint_drain_interval_ms: 5000
2044";
2045        let p: ConfPool = serde_yaml::from_str(yaml).unwrap();
2046        assert_eq!(p.enable_hinted_handoff, Some(true));
2047        assert_eq!(p.hint_ttl_seconds, Some(7200));
2048        assert_eq!(p.hint_store_max_bytes, Some(8_388_608));
2049        assert_eq!(p.hint_drain_interval_ms, Some(5_000));
2050        let dumped = serde_yaml::to_string(&p).unwrap();
2051        let p2: ConfPool = serde_yaml::from_str(&dumped).unwrap();
2052        assert_eq!(p2.enable_hinted_handoff, p.enable_hinted_handoff);
2053        assert_eq!(p2.hint_ttl_seconds, p.hint_ttl_seconds);
2054        assert_eq!(p2.hint_store_max_bytes, p.hint_store_max_bytes);
2055        assert_eq!(p2.hint_drain_interval_ms, p.hint_drain_interval_ms);
2056    }
2057
2058    #[test]
2059    fn hinted_handoff_zero_ttl_rejected_when_enabled() {
2060        let mut p = pool();
2061        p.enable_hinted_handoff = Some(true);
2062        p.hint_ttl_seconds = Some(0);
2063        p.apply_defaults();
2064        // apply_defaults() does NOT overwrite Some(0) with the
2065        // default; the validator should reject it.
2066        let err = p.validate("p").unwrap_err();
2067        assert!(matches!(
2068            err,
2069            ConfError::BadServer {
2070                field: "hint_ttl_seconds",
2071                ..
2072            }
2073        ));
2074    }
2075
2076    #[test]
2077    fn hinted_handoff_zero_max_bytes_rejected_when_enabled() {
2078        let mut p = pool();
2079        p.enable_hinted_handoff = Some(true);
2080        p.hint_store_max_bytes = Some(0);
2081        p.apply_defaults();
2082        let err = p.validate("p").unwrap_err();
2083        assert!(matches!(
2084            err,
2085            ConfError::BadServer {
2086                field: "hint_store_max_bytes",
2087                ..
2088            }
2089        ));
2090    }
2091
2092    #[test]
2093    fn hinted_handoff_zero_values_ignored_when_disabled() {
2094        // With handoff off, the validator must NOT reject
2095        // out-of-range values: operators may legitimately leave
2096        // them at zero with handoff off.
2097        let mut p = pool();
2098        p.enable_hinted_handoff = Some(false);
2099        p.hint_ttl_seconds = Some(0);
2100        p.hint_store_max_bytes = Some(0);
2101        p.hint_drain_interval_ms = Some(0);
2102        p.apply_defaults();
2103        assert!(p.validate("p").is_ok());
2104    }
2105
2106    #[test]
2107    fn riak_block_validates_when_unset() {
2108        let mut p = pool();
2109        p.riak = Some(ConfRiak::default());
2110        p.apply_defaults();
2111        assert!(p.validate("p").is_ok());
2112    }
2113
2114    #[test]
2115    fn riak_block_validates_with_addresses() {
2116        let mut p = pool();
2117        p.riak = Some(ConfRiak {
2118            pbc_listen: Some("127.0.0.1:8087".into()),
2119            http_listen: Some("127.0.0.1:8098".into()),
2120            ..ConfRiak::default()
2121        });
2122        p.apply_defaults();
2123        assert!(p.validate("p").is_ok());
2124    }
2125
2126    #[test]
2127    fn riak_block_rejects_bad_pbc_addr() {
2128        let mut p = pool();
2129        p.riak = Some(ConfRiak {
2130            pbc_listen: Some(String::new()),
2131            ..ConfRiak::default()
2132        });
2133        p.apply_defaults();
2134        assert!(matches!(p.validate("p"), Err(ConfError::BadServer { .. })));
2135    }
2136
2137    #[test]
2138    fn riak_block_rejects_segment_above_full_sweep() {
2139        let mut p = pool();
2140        p.riak = Some(ConfRiak {
2141            aae_segment_interval_seconds: Some(120),
2142            aae_full_sweep_interval_seconds: Some(60),
2143            ..ConfRiak::default()
2144        });
2145        p.apply_defaults();
2146        assert!(matches!(p.validate("p"), Err(ConfError::BadServer { .. })));
2147    }
2148
2149    #[test]
2150    fn riak_block_round_trips_through_yaml() {
2151        let yaml = r"
2152p:
2153  listen: 127.0.0.1:1
2154  dyn_listen: 127.0.0.1:2
2155  tokens: '0'
2156  servers:
2157  - 127.0.0.1:3:1
2158  data_store: 0
2159  riak:
2160    pbc_listen: 127.0.0.1:8087
2161    http_listen: 127.0.0.1:8098
2162    aae_enabled: true
2163    aae_full_sweep_interval_seconds: 3600
2164    aae_segment_interval_seconds: 30
2165";
2166        let cfg: std::collections::BTreeMap<String, ConfPool> = serde_yaml::from_str(yaml).unwrap();
2167        let p = cfg.get("p").unwrap();
2168        let r = p.riak.as_ref().unwrap();
2169        assert_eq!(r.pbc_listen.as_deref(), Some("127.0.0.1:8087"));
2170        assert_eq!(r.http_listen.as_deref(), Some("127.0.0.1:8098"));
2171        assert_eq!(r.aae_enabled, Some(true));
2172        assert_eq!(r.aae_full_sweep_interval_seconds, Some(3600));
2173        assert_eq!(r.aae_segment_interval_seconds, Some(30));
2174    }
2175
2176    #[test]
2177    fn peer_tls_pair_unset_is_ok() {
2178        let mut p = pool();
2179        p.apply_defaults();
2180        assert!(p.validate("p").is_ok(), "plaintext default must validate");
2181    }
2182
2183    #[test]
2184    fn peer_tls_pair_both_set_is_ok() {
2185        let mut p = pool();
2186        p.peer_tls_cert = Some(std::path::PathBuf::from("/etc/dynomite/peer.crt"));
2187        p.peer_tls_key = Some(std::path::PathBuf::from("/etc/dynomite/peer.key"));
2188        p.apply_defaults();
2189        assert!(p.validate("p").is_ok());
2190    }
2191
2192    #[test]
2193    fn peer_tls_cert_without_key_rejected() {
2194        let mut p = pool();
2195        p.peer_tls_cert = Some(std::path::PathBuf::from("/x.crt"));
2196        p.apply_defaults();
2197        let err = p.validate("p").unwrap_err();
2198        assert!(
2199            matches!(
2200                err,
2201                ConfError::BadServer {
2202                    field: "peer_tls_cert",
2203                    ..
2204                }
2205            ),
2206            "got {err:?}"
2207        );
2208    }
2209
2210    #[test]
2211    fn peer_tls_key_without_cert_rejected() {
2212        let mut p = pool();
2213        p.peer_tls_key = Some(std::path::PathBuf::from("/x.key"));
2214        p.apply_defaults();
2215        let err = p.validate("p").unwrap_err();
2216        assert!(
2217            matches!(
2218                err,
2219                ConfError::BadServer {
2220                    field: "peer_tls_key",
2221                    ..
2222                }
2223            ),
2224            "got {err:?}"
2225        );
2226    }
2227
2228    #[test]
2229    fn peer_tls_ca_without_cert_rejected() {
2230        let mut p = pool();
2231        p.peer_tls_ca = Some(std::path::PathBuf::from("/x.ca"));
2232        p.apply_defaults();
2233        let err = p.validate("p").unwrap_err();
2234        assert!(
2235            matches!(
2236                err,
2237                ConfError::BadServer {
2238                    field: "peer_tls_ca",
2239                    ..
2240                }
2241            ),
2242            "got {err:?}"
2243        );
2244    }
2245
2246    #[test]
2247    fn riak_tls_cert_without_key_rejected() {
2248        let mut p = pool();
2249        p.riak = Some(ConfRiak {
2250            pbc_listen: Some("127.0.0.1:8087".into()),
2251            tls_cert: Some(std::path::PathBuf::from("/x.crt")),
2252            ..ConfRiak::default()
2253        });
2254        p.apply_defaults();
2255        let err = p.validate("p").unwrap_err();
2256        assert!(
2257            matches!(
2258                err,
2259                ConfError::BadServer {
2260                    field: "tls_cert",
2261                    ..
2262                }
2263            ),
2264            "got {err:?}"
2265        );
2266    }
2267
2268    #[test]
2269    fn riak_tls_pair_both_set_is_ok() {
2270        let mut p = pool();
2271        p.riak = Some(ConfRiak {
2272            pbc_listen: Some("127.0.0.1:8087".into()),
2273            tls_cert: Some(std::path::PathBuf::from("/x.crt")),
2274            tls_key: Some(std::path::PathBuf::from("/x.key")),
2275            ..ConfRiak::default()
2276        });
2277        p.apply_defaults();
2278        assert!(p.validate("p").is_ok());
2279    }
2280
2281    #[test]
2282    fn riak_wasm_modules_yaml_round_trip() {
2283        let dir = tempfile::tempdir().unwrap();
2284        let m1 = dir.path().join("identity.wasm");
2285        let m2 = dir.path().join("sum.wasm");
2286        std::fs::write(&m1, b"\0asm\x01\0\0\0").unwrap();
2287        std::fs::write(&m2, b"\0asm\x01\0\0\0").unwrap();
2288        let yaml = format!(
2289            r"
2290listen: 127.0.0.1:8102
2291servers:
2292- 127.0.0.1:6379:1
2293tokens: '0'
2294riak:
2295  pbc_listen: 127.0.0.1:8087
2296  wasm_modules:
2297  - id: identity
2298    path: {m1}
2299  - id: sum
2300    path: {m2}
2301",
2302            m1 = m1.display(),
2303            m2 = m2.display(),
2304        );
2305        let p: ConfPool = serde_yaml::from_str(&yaml).unwrap();
2306        let r = p.riak.as_ref().unwrap();
2307        let mods = r.wasm_modules.as_ref().unwrap();
2308        assert_eq!(mods.len(), 2);
2309        assert_eq!(mods[0].id, "identity");
2310        assert_eq!(mods[0].path, m1);
2311        assert_eq!(mods[1].id, "sum");
2312        assert_eq!(mods[1].path, m2);
2313        // Round-trip back to YAML and re-parse.
2314        let dumped = serde_yaml::to_string(&p).unwrap();
2315        let p2: ConfPool = serde_yaml::from_str(&dumped).unwrap();
2316        assert_eq!(p2.riak.unwrap().wasm_modules, r.wasm_modules);
2317    }
2318
2319    #[test]
2320    fn riak_wasm_modules_unique_ids_required() {
2321        let dir = tempfile::tempdir().unwrap();
2322        let path = dir.path().join("m.wasm");
2323        std::fs::write(&path, b"\0").unwrap();
2324        let r = ConfRiak {
2325            wasm_modules: Some(vec![
2326                ConfRiakWasmModule {
2327                    id: "m".into(),
2328                    path: path.clone(),
2329                },
2330                ConfRiakWasmModule {
2331                    id: "m".into(),
2332                    path: path.clone(),
2333                },
2334            ]),
2335            ..ConfRiak::default()
2336        };
2337        let err = r.validate().unwrap_err();
2338        assert!(matches!(
2339            err,
2340            ConfError::BadServer {
2341                field: "wasm_modules.id",
2342                ..
2343            }
2344        ));
2345    }
2346
2347    #[test]
2348    fn riak_wasm_modules_path_must_exist() {
2349        let r = ConfRiak {
2350            wasm_modules: Some(vec![ConfRiakWasmModule {
2351                id: "missing".into(),
2352                path: std::path::PathBuf::from("/no/such/path/at/all.wasm"),
2353            }]),
2354            ..ConfRiak::default()
2355        };
2356        let err = r.validate().unwrap_err();
2357        assert!(matches!(
2358            err,
2359            ConfError::BadServer {
2360                field: "wasm_modules.path",
2361                ..
2362            }
2363        ));
2364    }
2365
2366    #[test]
2367    fn riak_wasm_modules_empty_id_rejected() {
2368        let dir = tempfile::tempdir().unwrap();
2369        let path = dir.path().join("m.wasm");
2370        std::fs::write(&path, b"\0").unwrap();
2371        let r = ConfRiak {
2372            wasm_modules: Some(vec![ConfRiakWasmModule {
2373                id: String::new(),
2374                path,
2375            }]),
2376            ..ConfRiak::default()
2377        };
2378        let err = r.validate().unwrap_err();
2379        assert!(matches!(
2380            err,
2381            ConfError::BadServer {
2382                field: "wasm_modules.id",
2383                ..
2384            }
2385        ));
2386    }
2387
2388    #[test]
2389    fn peer_tls_profile_pair_unset_is_ok() {
2390        let p = ConfTlsProfile::default();
2391        assert!(p.validate("dc1").is_ok());
2392    }
2393
2394    #[test]
2395    fn peer_tls_profile_cert_without_key_rejected() {
2396        let p = ConfTlsProfile {
2397            cert: Some(std::path::PathBuf::from("/x.crt")),
2398            ..ConfTlsProfile::default()
2399        };
2400        let err = p.validate("dc1").unwrap_err();
2401        assert!(matches!(
2402            err,
2403            ConfError::BadServer {
2404                field: "peer_tls_profiles.cert",
2405                ..
2406            }
2407        ));
2408    }
2409
2410    #[test]
2411    fn peer_tls_profile_key_without_cert_rejected() {
2412        let p = ConfTlsProfile {
2413            key: Some(std::path::PathBuf::from("/x.key")),
2414            ..ConfTlsProfile::default()
2415        };
2416        let err = p.validate("dc1").unwrap_err();
2417        assert!(matches!(
2418            err,
2419            ConfError::BadServer {
2420                field: "peer_tls_profiles.key",
2421                ..
2422            }
2423        ));
2424    }
2425
2426    #[test]
2427    fn peer_tls_profile_ca_without_cert_rejected() {
2428        let p = ConfTlsProfile {
2429            ca: Some(std::path::PathBuf::from("/x.ca")),
2430            ..ConfTlsProfile::default()
2431        };
2432        let err = p.validate("dc1").unwrap_err();
2433        assert!(matches!(
2434            err,
2435            ConfError::BadServer {
2436                field: "peer_tls_profiles.ca",
2437                ..
2438            }
2439        ));
2440    }
2441
2442    #[test]
2443    fn peer_tls_profiles_empty_dc_name_rejected() {
2444        let mut p = pool();
2445        p.peer_tls_profiles.insert(
2446            String::new(),
2447            ConfTlsProfile {
2448                cert: Some(std::path::PathBuf::from("/x.crt")),
2449                key: Some(std::path::PathBuf::from("/x.key")),
2450                ca: None,
2451            },
2452        );
2453        p.apply_defaults();
2454        let err = p.validate("p").unwrap_err();
2455        assert!(matches!(
2456            err,
2457            ConfError::BadServer {
2458                field: "peer_tls_profiles",
2459                ..
2460            }
2461        ));
2462    }
2463
2464    #[test]
2465    fn peer_tls_profiles_per_dc_pair_validates() {
2466        let mut p = pool();
2467        p.peer_tls_profiles.insert(
2468            "dc1".into(),
2469            ConfTlsProfile {
2470                cert: Some(std::path::PathBuf::from("/dc1.crt")),
2471                key: Some(std::path::PathBuf::from("/dc1.key")),
2472                ca: None,
2473            },
2474        );
2475        p.apply_defaults();
2476        assert!(p.validate("p").is_ok());
2477    }
2478
2479    #[test]
2480    fn peer_tls_profiles_per_dc_cert_without_key_rejected() {
2481        let mut p = pool();
2482        p.peer_tls_profiles.insert(
2483            "dc1".into(),
2484            ConfTlsProfile {
2485                cert: Some(std::path::PathBuf::from("/dc1.crt")),
2486                key: None,
2487                ca: None,
2488            },
2489        );
2490        p.apply_defaults();
2491        let err = p.validate("p").unwrap_err();
2492        assert!(matches!(
2493            err,
2494            ConfError::BadServer {
2495                field: "peer_tls_profiles.cert",
2496                ..
2497            }
2498        ));
2499    }
2500
2501    #[test]
2502    fn peer_tls_profiles_yaml_round_trip() {
2503        let yaml = r"
2504listen: 127.0.0.1:8102
2505servers:
2506- 127.0.0.1:6379:1
2507tokens: '0'
2508peer_tls_profiles:
2509  dc1:
2510    cert: /etc/dynomite/dc1.pem
2511    key: /etc/dynomite/dc1.key
2512    ca: /etc/dynomite/dc1-ca.pem
2513  dc2:
2514    cert: /etc/dynomite/dc2.pem
2515    key: /etc/dynomite/dc2.key
2516";
2517        let p: ConfPool = serde_yaml::from_str(yaml).unwrap();
2518        assert_eq!(p.peer_tls_profiles.len(), 2);
2519        assert_eq!(
2520            p.peer_tls_profiles["dc1"].cert.as_deref(),
2521            Some(std::path::Path::new("/etc/dynomite/dc1.pem"))
2522        );
2523        assert!(p.peer_tls_profiles["dc2"].ca.is_none());
2524        let dumped = serde_yaml::to_string(&p).unwrap();
2525        let p2: ConfPool = serde_yaml::from_str(&dumped).unwrap();
2526        assert_eq!(p2.peer_tls_profiles, p.peer_tls_profiles);
2527    }
2528
2529    #[test]
2530    fn transport_default_is_tcp_after_finalize() {
2531        let mut p = pool();
2532        p.apply_defaults();
2533        assert_eq!(p.transport, Some(Transport::Tcp));
2534        assert!(p.validate("p").is_ok());
2535    }
2536
2537    #[test]
2538    fn transport_quic_yaml_round_trip() {
2539        let yaml = r"
2540listen: 127.0.0.1:8102
2541servers:
2542- 127.0.0.1:6379:1
2543tokens: '0'
2544transport: quic
2545quic_cert_file: /tmp/test.crt
2546quic_key_file: /tmp/test.key
2547";
2548        let p: ConfPool = serde_yaml::from_str(yaml).unwrap();
2549        assert_eq!(p.transport, Some(Transport::Quic));
2550        assert_eq!(
2551            p.quic_cert_file.as_deref(),
2552            Some(std::path::Path::new("/tmp/test.crt"))
2553        );
2554        assert_eq!(
2555            p.quic_key_file.as_deref(),
2556            Some(std::path::Path::new("/tmp/test.key"))
2557        );
2558        let dumped = serde_yaml::to_string(&p).unwrap();
2559        let p2: ConfPool = serde_yaml::from_str(&dumped).unwrap();
2560        assert_eq!(p2.transport, p.transport);
2561        assert_eq!(p2.quic_cert_file, p.quic_cert_file);
2562        assert_eq!(p2.quic_key_file, p.quic_key_file);
2563    }
2564
2565    #[test]
2566    fn transport_quic_requires_cert_and_key() {
2567        let mut p = pool();
2568        p.transport = Some(Transport::Quic);
2569        p.apply_defaults();
2570        let err = p.validate("p").unwrap_err();
2571        assert!(matches!(
2572            err,
2573            ConfError::BadServer {
2574                field: "quic_cert_file",
2575                ..
2576            }
2577        ));
2578        p.quic_cert_file = Some(std::path::PathBuf::from("/tmp/c.pem"));
2579        let err = p.validate("p").unwrap_err();
2580        assert!(matches!(
2581            err,
2582            ConfError::BadServer {
2583                field: "quic_key_file",
2584                ..
2585            }
2586        ));
2587        p.quic_key_file = Some(std::path::PathBuf::from("/tmp/k.pem"));
2588        assert!(p.validate("p").is_ok());
2589    }
2590
2591    #[test]
2592    fn transport_tcp_ignores_quic_files() {
2593        let mut p = pool();
2594        p.transport = Some(Transport::Tcp);
2595        // Setting cert / key under TCP is tolerated; the
2596        // listener is plain TCP so the QUIC knobs are simply
2597        // unused.
2598        p.quic_cert_file = Some(std::path::PathBuf::from("/tmp/c.pem"));
2599        p.apply_defaults();
2600        assert!(p.validate("p").is_ok());
2601    }
2602}