noxu_rep/rep_config.rs
1//! Replication configuration.
2//!
3
4use std::path::PathBuf;
5use std::time::Duration;
6
7use crate::commit_durability::CommitDurability;
8use crate::consistency::ConsistencyPolicy;
9use crate::node_type::NodeType;
10use crate::quorum_policy::QuorumPolicy;
11use crate::rep_node::RepNode;
12use crate::stream::reconnect::ReconnectConfig;
13
14/// Default election timeout.
15const DEFAULT_ELECTION_TIMEOUT: Duration = Duration::from_secs(10);
16/// Default heartbeat interval.
17const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(1);
18/// Default replication port.
19///
20/// Default port:
21/// changed from `5001` (which collides with PostgreSQL's REPMGR
22/// default and various Cisco services) to `14_001`, an unprivileged
23/// IANA-unassigned default. Most production deployments override
24/// this; the new default is just intended to fail closed during
25/// development rather than silently bind on something else's port.
26const DEFAULT_NODE_PORT: u16 = 14_001;
27/// Default per-phase election message timeout.
28const DEFAULT_ELECTION_PHASE_TIMEOUT: Duration = Duration::from_millis(500);
29/// Default phi accrual sample window size.
30const DEFAULT_PHI_WINDOW_SIZE: usize = 200;
31
32/// Wire-level transport selected for replication traffic.
33///
34/// The in-memory transport is a first-class
35/// production option alongside TCP / TLS / QUIC. This enum lets a
36/// caller declare the transport choice in [`RepConfig`] so higher-level
37/// orchestration code (e.g., the test harness, the `RepTestBase`
38/// integration tests, embedded deployments) can route channel
39/// construction through the right factory.
40///
41/// Note: noxu-rep's [`crate::net`] channel types are constructed
42/// directly by the user code that drives the cluster (a `TcpListener`
43/// on a port, a [`crate::net::InMemoryTransport::new_group`] mesh,
44/// etc.). This field is therefore advisory — it documents intent and
45/// lets observability / chaos / harness layers introspect the
46/// transport without inspecting individual channel types.
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
48pub enum RepTransportKind {
49 /// Plaintext TCP via [`crate::net::TcpChannel`].
50 Tcp,
51 /// TLS-encrypted TCP via `crate::net::TlsTcpChannel`
52 /// (requires `tls-rustls` or `tls-native`).
53 Tls,
54 /// QUIC over UDP via `crate::net::QuicChannel`
55 /// (requires the `quic` feature).
56 Quic,
57 /// In-process [`crate::net::InMemoryTransport`]. Useful for
58 /// embedded deployments and integration tests that want real
59 /// `ReplicatedEnvironment` behaviour without opening sockets.
60 InMemory,
61}
62
63impl Default for RepTransportKind {
64 /// Defaults to [`RepTransportKind::Tcp`] to preserve
65 /// backward compatibility.
66 fn default() -> Self {
67 Self::Tcp
68 }
69}
70
71/// Configuration for a replication node.
72///
73/// Use the builder
74/// pattern to construct.
75#[derive(Debug, Clone)]
76pub struct RepConfig {
77 /// Name of the replication group.
78 pub group_name: String,
79 /// Name of this node within the group (must be unique).
80 pub node_name: String,
81 /// Hostname or IP address for this node.
82 pub node_host: String,
83 /// Port for replication communication.
84 pub node_port: u16,
85 /// Type of this node.
86 pub node_type: NodeType,
87 /// Timeout for elections.
88 pub election_timeout: Duration,
89 /// Interval between heartbeat messages.
90 pub heartbeat_interval: Duration,
91 /// Default consistency policy for read operations.
92 pub consistency_policy: ConsistencyPolicy,
93 /// Default commit durability for replicated transactions.
94 ///
95 /// The `ack_timeout` field on `commit_durability` governs the
96 /// commit-side wait for replica acks; there is no separate
97 /// per-RepConfig replica-ack timeout.
98 pub commit_durability: CommitDurability,
99 /// Path to the local environment home directory (`.ndb` files).
100 ///
101 /// When set, `ReplicatedEnvironment` registers a `NetworkRestoreServer`
102 /// on the service dispatcher so that other nodes can restore from this
103 /// node via the `"RESTORE"` service.
104 pub env_home: Option<PathBuf>,
105 /// Quorum policy for elections. Default: `SimpleMajority`.
106 pub quorum_policy: QuorumPolicy,
107 /// Phi accrual suspicion threshold.
108 ///
109 /// `None` (default) uses a binary heartbeat timeout.
110 /// `Some(8.0)` enables phi accrual detection with the paper's recommended
111 /// threshold (mistake rate ≈ 10⁻⁸).
112 pub phi_threshold: Option<f64>,
113 /// Sliding-window size for phi accrual inter-arrival samples.
114 ///
115 /// Default `200` is adequate for LAN; use `1000` for WAN.
116 pub phi_window_size: usize,
117 /// Fully-described peers added to the replication group at startup.
118 ///
119 /// Useful for pre-populating quoracle capacity/latency metadata.
120 pub initial_peers: Vec<RepNode>,
121 /// Timeout per peer message exchange during Phase 1 and Phase 2 of an
122 /// election. Default: 500 ms.
123 pub election_phase_timeout: Duration,
124 /// Reconnection backoff configuration for replica partition recovery.
125 pub reconnect_config: ReconnectConfig,
126 /// Wire-level transport this node will use.
127 ///
128 /// This field lets callers declare whether they
129 /// intend to drive replication over TCP, TLS, QUIC, or the
130 /// in-process [`crate::net::InMemoryTransport`]. See
131 /// [`RepTransportKind`] for the variants. Defaults to
132 /// [`RepTransportKind::Tcp`] for backward compatibility.
133 pub transport_kind: RepTransportKind,
134
135 /// Allowlist of peer subject names for mTLS enforcement (Phase 2, v3.1.0).
136 ///
137 /// When non-empty and [`RepTransportKind::Tls`] is configured, the
138 /// server will:
139 ///
140 /// 1. **Require a client certificate** on every incoming TLS connection.
141 /// 2. **Validate the chain** against the CA roots in the `TlsConfig`.
142 /// 3. **Check subject names** — the peer's Subject Common Name (CN) and
143 /// every DNS Subject Alternative Name (SAN) entry are compared
144 /// case-insensitively against this list. If none match, the
145 /// handshake is aborted before any application data is exchanged.
146 ///
147 /// Matching is exact (no wildcards). Names are compared
148 /// case-insensitively. Whitespace-only and empty entries are ignored.
149 ///
150 /// The client side automatically presents its own certificate when the
151 /// `TlsConfig` identity is `PemFiles` or `PemBytes`.
152 ///
153 /// ## Empty list
154 ///
155 /// An empty list means no peers are admitted (`PeerAllowlistVerifier`
156 /// returns an error at construction time, which surfaces as a
157 /// `RepError::ConfigError` from `TlsConfig::to_rustls_server_config_with_allowlist`).
158 /// This is intentional fail-closed behaviour: an empty allowlist is
159 /// almost certainly a misconfiguration.
160 ///
161 /// ## Transport requirement
162 ///
163 /// Enforcement requires `transport_kind = RepTransportKind::Tls`. With
164 /// plain TCP there is no TLS handshake and therefore no cert to inspect.
165 /// Setting this field with a non-TLS transport emits a `log::warn!`.
166 pub peer_allowlist: Vec<String>,
167
168 /// TLS configuration for the service dispatcher (Phase 3).
169 ///
170 /// When set and `transport_kind` is [`RepTransportKind::Tls`],
171 /// [`crate::replicated_environment::ReplicatedEnvironment`] will
172 /// start a `TlsTcpServiceDispatcher` (feature `tls-rustls`)
173 /// instead of the plain-TCP dispatcher. Combined with a non-empty
174 /// `peer_allowlist`, this enforces mTLS on every incoming replication
175 /// connection at the dispatcher level.
176 ///
177 /// `None` (the default) preserves the Phase-2 behaviour: the
178 /// dispatcher uses plain TCP and the operator must wire
179 /// `TlsTcpChannelListener::bind_with_tls_and_allowlist` separately.
180 pub tls_config: Option<crate::tls::TlsConfig>,
181
182 /// Enable chained / replica-to-replica log feeding (default `false`).
183 ///
184 /// When `true`, a node that becomes a **replica** ALSO runs a feeder
185 /// source on its `PEER_FEEDER` service, serving the VLSN-tagged log
186 /// stream from its OWN WAL to a downstream replica. This lets a
187 /// mid-tier replica relay the stream (master → R1 → R2) instead of every
188 /// replica connecting directly to the master.
189 ///
190 /// Faithful to JE's cascading-feeder model: `FeederSource` is
191 /// documented as "a real Master OR a Replica in a Replica chain that is
192 /// replaying log records it received from some other source"
193 /// (`FeederSource.java`). The feeder source on a replica reads its
194 /// VLSNIndex + log files exactly as `MasterFeederSource` does on the
195 /// master, so the downstream's syncup (REP-1) and live-apply (REP-7)
196 /// work unchanged against a replica-feeder source.
197 ///
198 /// **Default `false`** preserves master-direct behaviour: a replica
199 /// does not feed downstream peers unless cascade is explicitly enabled.
200 ///
201 /// **Durability bound**: a mid-tier replica does NOT count its
202 /// downstream's acks toward the master's commit-durability quorum.
203 /// JE evaluates the durability quorum at the master
204 /// (`FeederManager.getNumCurrentAckFeeders`); a chained replica only
205 /// tracks the downstream's progress for its own VLSN/lag bookkeeping.
206 /// A downstream replica is therefore never more durable than the
207 /// entries its mid-tier has itself persisted.
208 pub cascade_feeding: bool,
209}
210
211impl RepConfig {
212 /// Creates a builder for `RepConfig`.
213 pub fn builder(
214 group_name: &str,
215 node_name: &str,
216 node_host: &str,
217 ) -> RepConfigBuilder {
218 RepConfigBuilder {
219 group_name: group_name.to_string(),
220 node_name: node_name.to_string(),
221 node_host: node_host.to_string(),
222 node_port: DEFAULT_NODE_PORT,
223 node_type: NodeType::Electable,
224 election_timeout: DEFAULT_ELECTION_TIMEOUT,
225 heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL,
226 consistency_policy: ConsistencyPolicy::default(),
227 commit_durability: CommitDurability::default(),
228 env_home: None,
229 quorum_policy: QuorumPolicy::SimpleMajority,
230 phi_threshold: None,
231 phi_window_size: DEFAULT_PHI_WINDOW_SIZE,
232 initial_peers: Vec::new(),
233 election_phase_timeout: DEFAULT_ELECTION_PHASE_TIMEOUT,
234 reconnect_config: ReconnectConfig::default(),
235 transport_kind: RepTransportKind::default(),
236 peer_allowlist: Vec::new(),
237 tls_config: None,
238 cascade_feeding: false,
239 }
240 }
241
242 /// Convenience constructor matching the original v1.4 shape.
243 ///
244 /// Equivalent to `builder(group, node, host).node_port(port).build()`.
245 /// Provided so doc snippets and short tests don't need to write the
246 /// full builder chain.
247 /// "`RepConfig::new` example").
248 pub fn new(
249 group_name: impl Into<String>,
250 node_name: impl Into<String>,
251 node_host: impl Into<String>,
252 node_port: u16,
253 ) -> RepConfig {
254 let g = group_name.into();
255 let n = node_name.into();
256 let h = node_host.into();
257 RepConfig::builder(&g, &n, &h).node_port(node_port).build()
258 }
259
260 /// Returns the socket address string for this node.
261 pub fn socket_address(&self) -> String {
262 format!("{}:{}", self.node_host, self.node_port)
263 }
264}
265
266/// Builder for [`RepConfig`].
267#[derive(Debug, Clone)]
268pub struct RepConfigBuilder {
269 group_name: String,
270 node_name: String,
271 node_host: String,
272 node_port: u16,
273 node_type: NodeType,
274 election_timeout: Duration,
275 heartbeat_interval: Duration,
276 consistency_policy: ConsistencyPolicy,
277 commit_durability: CommitDurability,
278 env_home: Option<PathBuf>,
279 quorum_policy: QuorumPolicy,
280 phi_threshold: Option<f64>,
281 phi_window_size: usize,
282 initial_peers: Vec<RepNode>,
283 election_phase_timeout: Duration,
284 reconnect_config: ReconnectConfig,
285 transport_kind: RepTransportKind,
286 peer_allowlist: Vec<String>,
287 tls_config: Option<crate::tls::TlsConfig>,
288 cascade_feeding: bool,
289}
290
291impl RepConfigBuilder {
292 /// Sets the replication port.
293 pub fn node_port(mut self, port: u16) -> Self {
294 self.node_port = port;
295 self
296 }
297
298 /// Sets the node type.
299 pub fn node_type(mut self, node_type: NodeType) -> Self {
300 self.node_type = node_type;
301 self
302 }
303
304 /// Sets the election timeout.
305 pub fn election_timeout(mut self, timeout: Duration) -> Self {
306 self.election_timeout = timeout;
307 self
308 }
309
310 /// Sets the heartbeat interval.
311 pub fn heartbeat_interval(mut self, interval: Duration) -> Self {
312 self.heartbeat_interval = interval;
313 self
314 }
315
316 /// Sets the consistency policy.
317 pub fn consistency_policy(mut self, policy: ConsistencyPolicy) -> Self {
318 self.consistency_policy = policy;
319 self
320 }
321
322 /// Sets the commit durability.
323 pub fn commit_durability(mut self, durability: CommitDurability) -> Self {
324 self.commit_durability = durability;
325 self
326 }
327
328 /// Sets the environment home directory (serves `.ndb` files for network restore).
329 pub fn env_home(mut self, path: impl Into<PathBuf>) -> Self {
330 self.env_home = Some(path.into());
331 self
332 }
333
334 /// Enable chained / replica-to-replica log feeding (default `false`).
335 ///
336 /// When `true`, a node that becomes a replica also runs a feeder source
337 /// on its `PEER_FEEDER` service, serving its OWN WAL to a downstream
338 /// replica (master → R1 → R2). See [`RepConfig::cascade_feeding`] for
339 /// the JE `FeederSource` citation and the durability bound.
340 pub fn cascade_feeding(mut self, enabled: bool) -> Self {
341 self.cascade_feeding = enabled;
342 self
343 }
344
345 /// Sets the quorum policy for elections (default: `SimpleMajority`).
346 pub fn quorum_policy(mut self, policy: QuorumPolicy) -> Self {
347 self.quorum_policy = policy;
348 self
349 }
350
351 /// Enable phi accrual failure detection with the given suspicion threshold.
352 ///
353 /// `8.0` is the paper's recommended production value (mistake rate ≈ 10⁻⁸).
354 /// Call with `None` to revert to binary heartbeat timeout detection.
355 pub fn phi_threshold(mut self, threshold: Option<f64>) -> Self {
356 self.phi_threshold = threshold;
357 self
358 }
359
360 /// Sets the phi accrual inter-arrival sample window size (default 200).
361 pub fn phi_window_size(mut self, size: usize) -> Self {
362 self.phi_window_size = size;
363 self
364 }
365
366 /// Add a fully-described initial peer to the group at startup.
367 pub fn add_initial_peer(mut self, node: RepNode) -> Self {
368 self.initial_peers.push(node);
369 self
370 }
371
372 /// Set the per-peer message timeout for Phase 1 and Phase 2 election
373 /// exchanges (default: 500 ms).
374 pub fn election_phase_timeout(mut self, timeout: Duration) -> Self {
375 self.election_phase_timeout = timeout;
376 self
377 }
378
379 /// Sets the reconnection backoff configuration for replica partition recovery.
380 pub fn reconnect_config(mut self, config: ReconnectConfig) -> Self {
381 self.reconnect_config = config;
382 self
383 }
384
385 /// Sets the wire-level transport this node will use.
386 ///
387 /// Defaults to [`RepTransportKind::Tcp`].
388 /// [`RepTransportKind::InMemory`] for in-process clusters.
389 pub fn transport_kind(mut self, kind: RepTransportKind) -> Self {
390 self.transport_kind = kind;
391 self
392 }
393
394 /// Set the mTLS peer allowlist (Phase 2, v3.1.0).
395 ///
396 /// When non-empty and `transport_kind` is [`RepTransportKind::Tls`],
397 /// incoming TLS connections must present a certificate whose Subject CN
398 /// or DNS SAN matches at least one entry here (case-insensitive, exact
399 /// match). Connections that fail the check are rejected at the TLS
400 /// handshake layer.
401 ///
402 /// See [`RepConfig::peer_allowlist`] for full details.
403 pub fn peer_allowlist(mut self, names: Vec<String>) -> Self {
404 self.peer_allowlist = names;
405 self
406 }
407
408 /// Set the TLS configuration for the service dispatcher (Phase 3).
409 ///
410 /// When set and `transport_kind` is [`RepTransportKind::Tls`],
411 /// [`crate::replicated_environment::ReplicatedEnvironment`] will use a
412 /// TLS-capable service dispatcher that enforces mTLS when
413 /// `peer_allowlist` is also non-empty.
414 ///
415 /// See [`RepConfig::tls_config`] for full details.
416 pub fn tls_config(mut self, tls: crate::tls::TlsConfig) -> Self {
417 self.tls_config = Some(tls);
418 self
419 }
420
421 /// Builds the `RepConfig`.
422 pub fn build(self) -> RepConfig {
423 RepConfig {
424 group_name: self.group_name,
425 node_name: self.node_name,
426 node_host: self.node_host,
427 node_port: self.node_port,
428 node_type: self.node_type,
429 election_timeout: self.election_timeout,
430 heartbeat_interval: self.heartbeat_interval,
431 consistency_policy: self.consistency_policy,
432 commit_durability: self.commit_durability,
433 env_home: self.env_home,
434 quorum_policy: self.quorum_policy,
435 phi_threshold: self.phi_threshold,
436 phi_window_size: self.phi_window_size,
437 initial_peers: self.initial_peers,
438 election_phase_timeout: self.election_phase_timeout,
439 reconnect_config: self.reconnect_config,
440 transport_kind: self.transport_kind,
441 peer_allowlist: self.peer_allowlist,
442 tls_config: self.tls_config,
443 cascade_feeding: self.cascade_feeding,
444 }
445 }
446}
447
448#[cfg(test)]
449mod tests {
450 use super::*;
451 use crate::commit_durability::ReplicaAckPolicy;
452
453 #[test]
454 fn test_builder_defaults() {
455 let config = RepConfig::builder("group1", "node1", "localhost").build();
456 assert_eq!(config.group_name, "group1");
457 assert_eq!(config.node_name, "node1");
458 assert_eq!(config.node_host, "localhost");
459 assert_eq!(config.node_port, DEFAULT_NODE_PORT);
460 assert_eq!(config.node_type, NodeType::Electable);
461 assert_eq!(config.election_timeout, DEFAULT_ELECTION_TIMEOUT);
462 assert_eq!(config.heartbeat_interval, DEFAULT_HEARTBEAT_INTERVAL);
463 assert_eq!(config.consistency_policy, ConsistencyPolicy::NoConsistency);
464 }
465
466 #[test]
467 fn test_default_port_is_unprivileged() {
468 // Wave 1C audit cleanup (rep low "default port collision"): the
469 // default port must be in the IANA unassigned range and is not
470 // shared with another well-known service we might collide with
471 // (5001 was the v1.5.0 default; it overlaps with REPMGR among
472 // others).
473 let config = RepConfig::builder("g", "n", "h").build();
474 assert_eq!(config.node_port, 14_001);
475 }
476
477 #[test]
478 fn test_new_constructor_matches_builder() {
479 let a = RepConfig::new("g", "n", "h", 6000);
480 let b = RepConfig::builder("g", "n", "h").node_port(6000).build();
481 // The two paths must produce the same on-the-wire identity.
482 assert_eq!(a.group_name, b.group_name);
483 assert_eq!(a.node_name, b.node_name);
484 assert_eq!(a.node_host, b.node_host);
485 assert_eq!(a.node_port, b.node_port);
486 assert_eq!(a.node_type, b.node_type);
487 }
488
489 #[test]
490 fn test_builder_custom_port() {
491 let config = RepConfig::builder("g", "n", "h").node_port(6000).build();
492 assert_eq!(config.node_port, 6000);
493 }
494
495 #[test]
496 fn test_builder_node_type() {
497 let config = RepConfig::builder("g", "n", "h")
498 .node_type(NodeType::Secondary)
499 .build();
500 assert_eq!(config.node_type, NodeType::Secondary);
501 }
502
503 #[test]
504 fn test_builder_timeouts() {
505 let config = RepConfig::builder("g", "n", "h")
506 .election_timeout(Duration::from_secs(20))
507 .heartbeat_interval(Duration::from_millis(500))
508 .build();
509 assert_eq!(config.election_timeout, Duration::from_secs(20));
510 assert_eq!(config.heartbeat_interval, Duration::from_millis(500));
511 }
512
513 #[test]
514 fn test_builder_consistency_policy() {
515 let policy = ConsistencyPolicy::TimeConsistency {
516 max_lag: Duration::from_millis(500),
517 timeout: Duration::from_secs(10),
518 };
519 let config = RepConfig::builder("g", "n", "h")
520 .consistency_policy(policy.clone())
521 .build();
522 assert_eq!(config.consistency_policy, policy);
523 }
524
525 #[test]
526 fn test_builder_commit_durability() {
527 let durability = CommitDurability::new(
528 ReplicaAckPolicy::All,
529 Duration::from_secs(15),
530 );
531 let config = RepConfig::builder("g", "n", "h")
532 .commit_durability(durability)
533 .build();
534 assert_eq!(config.commit_durability.ack_policy, ReplicaAckPolicy::All);
535 assert_eq!(
536 config.commit_durability.ack_timeout,
537 Duration::from_secs(15)
538 );
539 }
540
541 #[test]
542 fn test_socket_address() {
543 let config =
544 RepConfig::builder("g", "n", "192.168.1.1").node_port(7000).build();
545 assert_eq!(config.socket_address(), "192.168.1.1:7000");
546 }
547
548 #[test]
549 fn test_builder_chaining() {
550 let config = RepConfig::builder("mygroup", "node1", "10.0.0.1")
551 .node_port(5555)
552 .node_type(NodeType::Arbiter)
553 .election_timeout(Duration::from_secs(30))
554 .build();
555 assert_eq!(config.group_name, "mygroup");
556 assert_eq!(config.node_name, "node1");
557 assert_eq!(config.node_host, "10.0.0.1");
558 assert_eq!(config.node_port, 5555);
559 assert_eq!(config.node_type, NodeType::Arbiter);
560 assert_eq!(config.election_timeout, Duration::from_secs(30));
561 }
562
563 #[test]
564 fn test_config_clone() {
565 let config = RepConfig::builder("g", "n", "h").build();
566 let cloned = config.clone();
567 assert_eq!(config.group_name, cloned.group_name);
568 assert_eq!(config.node_name, cloned.node_name);
569 }
570
571 #[test]
572 fn test_config_debug() {
573 let config = RepConfig::builder("g", "n", "h").build();
574 let s = format!("{:?}", config);
575 assert!(s.contains("RepConfig"));
576 }
577}