noxu_rep/rep_config.rs
1//! Replication configuration.
2//!
3
4use std::path::PathBuf;
5use std::time::Duration;
6
7use crate::commit_durability::CommitDurability;
8use crate::consistency::ConsistencyPolicy;
9use crate::node_type::NodeType;
10use crate::quorum_policy::QuorumPolicy;
11use crate::rep_node::RepNode;
12use crate::stream::reconnect::ReconnectConfig;
13
14/// Default election timeout.
15const DEFAULT_ELECTION_TIMEOUT: Duration = Duration::from_secs(10);
16/// Default heartbeat interval.
17const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(1);
18/// Default replication port.
19///
20/// Default port:
21/// changed from `5001` (which collides with PostgreSQL's REPMGR
22/// default and various Cisco services) to `14_001`, an unprivileged
23/// IANA-unassigned default. Most production deployments override
24/// this; the new default is just intended to fail closed during
25/// development rather than silently bind on something else's port.
26const DEFAULT_NODE_PORT: u16 = 14_001;
27/// Default per-phase election message timeout.
28const DEFAULT_ELECTION_PHASE_TIMEOUT: Duration = Duration::from_millis(500);
29/// Default phi accrual sample window size.
30const DEFAULT_PHI_WINDOW_SIZE: usize = 200;
31
32/// Wire-level transport selected for replication traffic.
33///
34/// The in-memory transport is a first-class
35/// production option alongside TCP / TLS / QUIC. This enum lets a
36/// caller declare the transport choice in [`RepConfig`] so higher-level
37/// orchestration code (e.g., the test harness, the `RepTestBase`
38/// integration tests, embedded deployments) can route channel
39/// construction through the right factory.
40///
41/// Note: noxu-rep's [`crate::net`] channel types are constructed
42/// directly by the user code that drives the cluster (a `TcpListener`
43/// on a port, a [`crate::net::InMemoryTransport::new_group`] mesh,
44/// etc.). This field is therefore advisory — it documents intent and
45/// lets observability / chaos / harness layers introspect the
46/// transport without inspecting individual channel types.
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
48pub enum RepTransportKind {
49 /// Plaintext TCP via [`crate::net::TcpChannel`].
50 Tcp,
51 /// TLS-encrypted TCP via `crate::net::TlsTcpChannel`
52 /// (requires `tls-rustls` or `tls-native`).
53 Tls,
54 /// QUIC over UDP via `crate::net::QuicChannel`
55 /// (requires the `quic` feature).
56 Quic,
57 /// In-process [`crate::net::InMemoryTransport`]. Useful for
58 /// embedded deployments and integration tests that want real
59 /// `ReplicatedEnvironment` behaviour without opening sockets.
60 InMemory,
61}
62
63impl Default for RepTransportKind {
64 /// Defaults to [`RepTransportKind::Tcp`] to preserve
65 /// backward compatibility.
66 fn default() -> Self {
67 Self::Tcp
68 }
69}
70
71/// Configuration for a replication node.
72///
73/// Use the builder
74/// pattern to construct.
75#[derive(Debug, Clone)]
76pub struct RepConfig {
77 /// Name of the replication group.
78 pub group_name: String,
79 /// Name of this node within the group (must be unique).
80 pub node_name: String,
81 /// Hostname or IP address for this node.
82 pub node_host: String,
83 /// Port for replication communication.
84 pub node_port: u16,
85 /// Type of this node.
86 pub node_type: NodeType,
87 /// Timeout for elections.
88 pub election_timeout: Duration,
89 /// Interval between heartbeat messages.
90 pub heartbeat_interval: Duration,
91 /// Default consistency policy for read operations.
92 pub consistency_policy: ConsistencyPolicy,
93 /// Default commit durability for replicated transactions.
94 ///
95 /// The `ack_timeout` field on `commit_durability` governs the
96 /// commit-side wait for replica acks; there is no separate
97 /// per-RepConfig replica-ack timeout.
98 pub commit_durability: CommitDurability,
99 /// Path to the local environment home directory (`.ndb` files).
100 ///
101 /// When set, `ReplicatedEnvironment` registers a `NetworkRestoreServer`
102 /// on the service dispatcher so that other nodes can restore from this
103 /// node via the `"RESTORE"` service.
104 pub env_home: Option<PathBuf>,
105 /// Quorum policy for elections. Default: `SimpleMajority`.
106 pub quorum_policy: QuorumPolicy,
107 /// Phi accrual suspicion threshold.
108 ///
109 /// `None` (default) uses a binary heartbeat timeout.
110 /// `Some(8.0)` enables phi accrual detection with the paper's recommended
111 /// threshold (mistake rate ≈ 10⁻⁸).
112 pub phi_threshold: Option<f64>,
113 /// Sliding-window size for phi accrual inter-arrival samples.
114 ///
115 /// Default `200` is adequate for LAN; use `1000` for WAN.
116 pub phi_window_size: usize,
117 /// Fully-described peers added to the replication group at startup.
118 ///
119 /// Useful for pre-populating quoracle capacity/latency metadata.
120 pub initial_peers: Vec<RepNode>,
121 /// Timeout per peer message exchange during Phase 1 and Phase 2 of an
122 /// election. Default: 500 ms.
123 pub election_phase_timeout: Duration,
124 /// Reconnection backoff configuration for replica partition recovery.
125 pub reconnect_config: ReconnectConfig,
126 /// Wire-level transport this node will use.
127 ///
128 /// This field lets callers declare whether they
129 /// intend to drive replication over TCP, TLS, QUIC, or the
130 /// in-process [`crate::net::InMemoryTransport`]. See
131 /// [`RepTransportKind`] for the variants. Defaults to
132 /// [`RepTransportKind::Tcp`] for backward compatibility.
133 pub transport_kind: RepTransportKind,
134
135 /// Allowlist of peer subject names for mTLS enforcement (Phase 2, v3.1.0).
136 ///
137 /// When non-empty and [`RepTransportKind::Tls`] is configured, the
138 /// server will:
139 ///
140 /// 1. **Require a client certificate** on every incoming TLS connection.
141 /// 2. **Validate the chain** against the CA roots in the `TlsConfig`.
142 /// 3. **Check subject names** — the peer's Subject Common Name (CN) and
143 /// every DNS Subject Alternative Name (SAN) entry are compared
144 /// case-insensitively against this list. If none match, the
145 /// handshake is aborted before any application data is exchanged.
146 ///
147 /// Matching is exact (no wildcards). Names are compared
148 /// case-insensitively. Whitespace-only and empty entries are ignored.
149 ///
150 /// The client side automatically presents its own certificate when the
151 /// `TlsConfig` identity is `PemFiles` or `PemBytes`.
152 ///
153 /// ## Empty list
154 ///
155 /// An empty list means no peers are admitted (`PeerAllowlistVerifier`
156 /// returns an error at construction time, which surfaces as a
157 /// `RepError::ConfigError` from `TlsConfig::to_rustls_server_config_with_allowlist`).
158 /// This is intentional fail-closed behaviour: an empty allowlist is
159 /// almost certainly a misconfiguration.
160 ///
161 /// ## Transport requirement
162 ///
163 /// Enforcement requires `transport_kind = RepTransportKind::Tls`. With
164 /// plain TCP there is no TLS handshake and therefore no cert to inspect.
165 /// Setting this field with a non-TLS transport emits a `log::warn!`.
166 pub peer_allowlist: Vec<String>,
167
168 /// TLS configuration for the service dispatcher (Phase 3).
169 ///
170 /// When set and `transport_kind` is [`RepTransportKind::Tls`],
171 /// [`crate::replicated_environment::ReplicatedEnvironment`] will
172 /// start a `TlsTcpServiceDispatcher` (feature `tls-rustls`)
173 /// instead of the plain-TCP dispatcher. Combined with a non-empty
174 /// `peer_allowlist`, this enforces mTLS on every incoming replication
175 /// connection at the dispatcher level.
176 ///
177 /// `None` (the default) preserves the Phase-2 behaviour: the
178 /// dispatcher uses plain TCP and the operator must wire
179 /// `TlsTcpChannelListener::bind_with_tls_and_allowlist` separately.
180 pub tls_config: Option<crate::tls::TlsConfig>,
181}
182
183impl RepConfig {
184 /// Creates a builder for `RepConfig`.
185 pub fn builder(
186 group_name: &str,
187 node_name: &str,
188 node_host: &str,
189 ) -> RepConfigBuilder {
190 RepConfigBuilder {
191 group_name: group_name.to_string(),
192 node_name: node_name.to_string(),
193 node_host: node_host.to_string(),
194 node_port: DEFAULT_NODE_PORT,
195 node_type: NodeType::Electable,
196 election_timeout: DEFAULT_ELECTION_TIMEOUT,
197 heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL,
198 consistency_policy: ConsistencyPolicy::default(),
199 commit_durability: CommitDurability::default(),
200 env_home: None,
201 quorum_policy: QuorumPolicy::SimpleMajority,
202 phi_threshold: None,
203 phi_window_size: DEFAULT_PHI_WINDOW_SIZE,
204 initial_peers: Vec::new(),
205 election_phase_timeout: DEFAULT_ELECTION_PHASE_TIMEOUT,
206 reconnect_config: ReconnectConfig::default(),
207 transport_kind: RepTransportKind::default(),
208 peer_allowlist: Vec::new(),
209 tls_config: None,
210 }
211 }
212
213 /// Convenience constructor matching the original v1.4 shape.
214 ///
215 /// Equivalent to `builder(group, node, host).node_port(port).build()`.
216 /// Provided so doc snippets and short tests don't need to write the
217 /// full builder chain.
218 /// "`RepConfig::new` example").
219 pub fn new(
220 group_name: impl Into<String>,
221 node_name: impl Into<String>,
222 node_host: impl Into<String>,
223 node_port: u16,
224 ) -> RepConfig {
225 let g = group_name.into();
226 let n = node_name.into();
227 let h = node_host.into();
228 RepConfig::builder(&g, &n, &h).node_port(node_port).build()
229 }
230
231 /// Returns the socket address string for this node.
232 pub fn socket_address(&self) -> String {
233 format!("{}:{}", self.node_host, self.node_port)
234 }
235}
236
237/// Builder for [`RepConfig`].
238#[derive(Debug, Clone)]
239pub struct RepConfigBuilder {
240 group_name: String,
241 node_name: String,
242 node_host: String,
243 node_port: u16,
244 node_type: NodeType,
245 election_timeout: Duration,
246 heartbeat_interval: Duration,
247 consistency_policy: ConsistencyPolicy,
248 commit_durability: CommitDurability,
249 env_home: Option<PathBuf>,
250 quorum_policy: QuorumPolicy,
251 phi_threshold: Option<f64>,
252 phi_window_size: usize,
253 initial_peers: Vec<RepNode>,
254 election_phase_timeout: Duration,
255 reconnect_config: ReconnectConfig,
256 transport_kind: RepTransportKind,
257 peer_allowlist: Vec<String>,
258 tls_config: Option<crate::tls::TlsConfig>,
259}
260
261impl RepConfigBuilder {
262 /// Sets the replication port.
263 pub fn node_port(mut self, port: u16) -> Self {
264 self.node_port = port;
265 self
266 }
267
268 /// Sets the node type.
269 pub fn node_type(mut self, node_type: NodeType) -> Self {
270 self.node_type = node_type;
271 self
272 }
273
274 /// Sets the election timeout.
275 pub fn election_timeout(mut self, timeout: Duration) -> Self {
276 self.election_timeout = timeout;
277 self
278 }
279
280 /// Sets the heartbeat interval.
281 pub fn heartbeat_interval(mut self, interval: Duration) -> Self {
282 self.heartbeat_interval = interval;
283 self
284 }
285
286 /// Sets the consistency policy.
287 pub fn consistency_policy(mut self, policy: ConsistencyPolicy) -> Self {
288 self.consistency_policy = policy;
289 self
290 }
291
292 /// Sets the commit durability.
293 pub fn commit_durability(mut self, durability: CommitDurability) -> Self {
294 self.commit_durability = durability;
295 self
296 }
297
298 /// Sets the environment home directory (serves `.ndb` files for network restore).
299 pub fn env_home(mut self, path: impl Into<PathBuf>) -> Self {
300 self.env_home = Some(path.into());
301 self
302 }
303
304 /// Sets the quorum policy for elections (default: `SimpleMajority`).
305 pub fn quorum_policy(mut self, policy: QuorumPolicy) -> Self {
306 self.quorum_policy = policy;
307 self
308 }
309
310 /// Enable phi accrual failure detection with the given suspicion threshold.
311 ///
312 /// `8.0` is the paper's recommended production value (mistake rate ≈ 10⁻⁸).
313 /// Call with `None` to revert to binary heartbeat timeout detection.
314 pub fn phi_threshold(mut self, threshold: Option<f64>) -> Self {
315 self.phi_threshold = threshold;
316 self
317 }
318
319 /// Sets the phi accrual inter-arrival sample window size (default 200).
320 pub fn phi_window_size(mut self, size: usize) -> Self {
321 self.phi_window_size = size;
322 self
323 }
324
325 /// Add a fully-described initial peer to the group at startup.
326 pub fn add_initial_peer(mut self, node: RepNode) -> Self {
327 self.initial_peers.push(node);
328 self
329 }
330
331 /// Set the per-peer message timeout for Phase 1 and Phase 2 election
332 /// exchanges (default: 500 ms).
333 pub fn election_phase_timeout(mut self, timeout: Duration) -> Self {
334 self.election_phase_timeout = timeout;
335 self
336 }
337
338 /// Sets the reconnection backoff configuration for replica partition recovery.
339 pub fn reconnect_config(mut self, config: ReconnectConfig) -> Self {
340 self.reconnect_config = config;
341 self
342 }
343
344 /// Sets the wire-level transport this node will use.
345 ///
346 /// Defaults to [`RepTransportKind::Tcp`].
347 /// [`RepTransportKind::InMemory`] for in-process clusters.
348 pub fn transport_kind(mut self, kind: RepTransportKind) -> Self {
349 self.transport_kind = kind;
350 self
351 }
352
353 /// Set the mTLS peer allowlist (Phase 2, v3.1.0).
354 ///
355 /// When non-empty and `transport_kind` is [`RepTransportKind::Tls`],
356 /// incoming TLS connections must present a certificate whose Subject CN
357 /// or DNS SAN matches at least one entry here (case-insensitive, exact
358 /// match). Connections that fail the check are rejected at the TLS
359 /// handshake layer.
360 ///
361 /// See [`RepConfig::peer_allowlist`] for full details.
362 pub fn peer_allowlist(mut self, names: Vec<String>) -> Self {
363 self.peer_allowlist = names;
364 self
365 }
366
367 /// Set the TLS configuration for the service dispatcher (Phase 3).
368 ///
369 /// When set and `transport_kind` is [`RepTransportKind::Tls`],
370 /// [`crate::replicated_environment::ReplicatedEnvironment`] will use a
371 /// TLS-capable service dispatcher that enforces mTLS when
372 /// `peer_allowlist` is also non-empty.
373 ///
374 /// See [`RepConfig::tls_config`] for full details.
375 pub fn tls_config(mut self, tls: crate::tls::TlsConfig) -> Self {
376 self.tls_config = Some(tls);
377 self
378 }
379
380 /// Builds the `RepConfig`.
381 pub fn build(self) -> RepConfig {
382 RepConfig {
383 group_name: self.group_name,
384 node_name: self.node_name,
385 node_host: self.node_host,
386 node_port: self.node_port,
387 node_type: self.node_type,
388 election_timeout: self.election_timeout,
389 heartbeat_interval: self.heartbeat_interval,
390 consistency_policy: self.consistency_policy,
391 commit_durability: self.commit_durability,
392 env_home: self.env_home,
393 quorum_policy: self.quorum_policy,
394 phi_threshold: self.phi_threshold,
395 phi_window_size: self.phi_window_size,
396 initial_peers: self.initial_peers,
397 election_phase_timeout: self.election_phase_timeout,
398 reconnect_config: self.reconnect_config,
399 transport_kind: self.transport_kind,
400 peer_allowlist: self.peer_allowlist,
401 tls_config: self.tls_config,
402 }
403 }
404}
405
406#[cfg(test)]
407mod tests {
408 use super::*;
409 use crate::commit_durability::ReplicaAckPolicy;
410
411 #[test]
412 fn test_builder_defaults() {
413 let config = RepConfig::builder("group1", "node1", "localhost").build();
414 assert_eq!(config.group_name, "group1");
415 assert_eq!(config.node_name, "node1");
416 assert_eq!(config.node_host, "localhost");
417 assert_eq!(config.node_port, DEFAULT_NODE_PORT);
418 assert_eq!(config.node_type, NodeType::Electable);
419 assert_eq!(config.election_timeout, DEFAULT_ELECTION_TIMEOUT);
420 assert_eq!(config.heartbeat_interval, DEFAULT_HEARTBEAT_INTERVAL);
421 assert_eq!(config.consistency_policy, ConsistencyPolicy::NoConsistency);
422 }
423
424 #[test]
425 fn test_default_port_is_unprivileged() {
426 // Wave 1C audit cleanup (rep low "default port collision"): the
427 // default port must be in the IANA unassigned range and is not
428 // shared with another well-known service we might collide with
429 // (5001 was the v1.5.0 default; it overlaps with REPMGR among
430 // others).
431 let config = RepConfig::builder("g", "n", "h").build();
432 assert_eq!(config.node_port, 14_001);
433 }
434
435 #[test]
436 fn test_new_constructor_matches_builder() {
437 let a = RepConfig::new("g", "n", "h", 6000);
438 let b = RepConfig::builder("g", "n", "h").node_port(6000).build();
439 // The two paths must produce the same on-the-wire identity.
440 assert_eq!(a.group_name, b.group_name);
441 assert_eq!(a.node_name, b.node_name);
442 assert_eq!(a.node_host, b.node_host);
443 assert_eq!(a.node_port, b.node_port);
444 assert_eq!(a.node_type, b.node_type);
445 }
446
447 #[test]
448 fn test_builder_custom_port() {
449 let config = RepConfig::builder("g", "n", "h").node_port(6000).build();
450 assert_eq!(config.node_port, 6000);
451 }
452
453 #[test]
454 fn test_builder_node_type() {
455 let config = RepConfig::builder("g", "n", "h")
456 .node_type(NodeType::Secondary)
457 .build();
458 assert_eq!(config.node_type, NodeType::Secondary);
459 }
460
461 #[test]
462 fn test_builder_timeouts() {
463 let config = RepConfig::builder("g", "n", "h")
464 .election_timeout(Duration::from_secs(20))
465 .heartbeat_interval(Duration::from_millis(500))
466 .build();
467 assert_eq!(config.election_timeout, Duration::from_secs(20));
468 assert_eq!(config.heartbeat_interval, Duration::from_millis(500));
469 }
470
471 #[test]
472 fn test_builder_consistency_policy() {
473 let policy = ConsistencyPolicy::TimeConsistency {
474 max_lag: Duration::from_millis(500),
475 timeout: Duration::from_secs(10),
476 };
477 let config = RepConfig::builder("g", "n", "h")
478 .consistency_policy(policy.clone())
479 .build();
480 assert_eq!(config.consistency_policy, policy);
481 }
482
483 #[test]
484 fn test_builder_commit_durability() {
485 let durability = CommitDurability::new(
486 ReplicaAckPolicy::All,
487 Duration::from_secs(15),
488 );
489 let config = RepConfig::builder("g", "n", "h")
490 .commit_durability(durability)
491 .build();
492 assert_eq!(config.commit_durability.ack_policy, ReplicaAckPolicy::All);
493 assert_eq!(
494 config.commit_durability.ack_timeout,
495 Duration::from_secs(15)
496 );
497 }
498
499 #[test]
500 fn test_socket_address() {
501 let config =
502 RepConfig::builder("g", "n", "192.168.1.1").node_port(7000).build();
503 assert_eq!(config.socket_address(), "192.168.1.1:7000");
504 }
505
506 #[test]
507 fn test_builder_chaining() {
508 let config = RepConfig::builder("mygroup", "node1", "10.0.0.1")
509 .node_port(5555)
510 .node_type(NodeType::Arbiter)
511 .election_timeout(Duration::from_secs(30))
512 .build();
513 assert_eq!(config.group_name, "mygroup");
514 assert_eq!(config.node_name, "node1");
515 assert_eq!(config.node_host, "10.0.0.1");
516 assert_eq!(config.node_port, 5555);
517 assert_eq!(config.node_type, NodeType::Arbiter);
518 assert_eq!(config.election_timeout, Duration::from_secs(30));
519 }
520
521 #[test]
522 fn test_config_clone() {
523 let config = RepConfig::builder("g", "n", "h").build();
524 let cloned = config.clone();
525 assert_eq!(config.group_name, cloned.group_name);
526 assert_eq!(config.node_name, cloned.node_name);
527 }
528
529 #[test]
530 fn test_config_debug() {
531 let config = RepConfig::builder("g", "n", "h").build();
532 let s = format!("{:?}", config);
533 assert!(s.contains("RepConfig"));
534 }
535}