Skip to main content

noxu_rep/
rep_config.rs

1//! Replication configuration.
2//!
3
4use std::path::PathBuf;
5use std::time::Duration;
6
7use crate::commit_durability::CommitDurability;
8use crate::consistency::ConsistencyPolicy;
9use crate::node_type::NodeType;
10use crate::quorum_policy::QuorumPolicy;
11use crate::rep_node::RepNode;
12use crate::stream::reconnect::ReconnectConfig;
13
14/// Default election timeout.
15const DEFAULT_ELECTION_TIMEOUT: Duration = Duration::from_secs(10);
16/// Default heartbeat interval.
17const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(1);
18/// Default replication port.
19///
20/// Default port:
21/// changed from `5001` (which collides with PostgreSQL's REPMGR
22/// default and various Cisco services) to `14_001`, an unprivileged
23/// IANA-unassigned default.  Most production deployments override
24/// this; the new default is just intended to fail closed during
25/// development rather than silently bind on something else's port.
26const DEFAULT_NODE_PORT: u16 = 14_001;
27/// Default per-phase election message timeout.
28const DEFAULT_ELECTION_PHASE_TIMEOUT: Duration = Duration::from_millis(500);
29/// Default phi accrual sample window size.
30const DEFAULT_PHI_WINDOW_SIZE: usize = 200;
31
32/// Wire-level transport selected for replication traffic.
33///
34/// The in-memory transport is a first-class
35/// production option alongside TCP / TLS / QUIC.  This enum lets a
36/// caller declare the transport choice in [`RepConfig`] so higher-level
37/// orchestration code (e.g., the test harness, the `RepTestBase`
38/// integration tests, embedded deployments) can route channel
39/// construction through the right factory.
40///
41/// Note: noxu-rep's [`crate::net`] channel types are constructed
42/// directly by the user code that drives the cluster (a `TcpListener`
43/// on a port, a [`crate::net::InMemoryTransport::new_group`] mesh,
44/// etc.).  This field is therefore advisory — it documents intent and
45/// lets observability / chaos / harness layers introspect the
46/// transport without inspecting individual channel types.
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
48pub enum RepTransportKind {
49    /// Plaintext TCP via [`crate::net::TcpChannel`].
50    Tcp,
51    /// TLS-encrypted TCP via `crate::net::TlsTcpChannel`
52    /// (requires `tls-rustls` or `tls-native`).
53    Tls,
54    /// QUIC over UDP via `crate::net::QuicChannel`
55    /// (requires the `quic` feature).
56    Quic,
57    /// In-process [`crate::net::InMemoryTransport`].  Useful for
58    /// embedded deployments and integration tests that want real
59    /// `ReplicatedEnvironment` behaviour without opening sockets.
60    InMemory,
61}
62
63impl Default for RepTransportKind {
64    /// Defaults to [`RepTransportKind::Tcp`] to preserve
65    /// backward compatibility.
66    fn default() -> Self {
67        Self::Tcp
68    }
69}
70
71/// Configuration for a replication node.
72///
73/// Use the builder
74/// pattern to construct.
75#[derive(Debug, Clone)]
76pub struct RepConfig {
77    /// Name of the replication group.
78    pub group_name: String,
79    /// Name of this node within the group (must be unique).
80    pub node_name: String,
81    /// Hostname or IP address for this node.
82    pub node_host: String,
83    /// Port for replication communication.
84    pub node_port: u16,
85    /// Type of this node.
86    pub node_type: NodeType,
87    /// Timeout for elections.
88    pub election_timeout: Duration,
89    /// Interval between heartbeat messages.
90    pub heartbeat_interval: Duration,
91    /// Default consistency policy for read operations.
92    pub consistency_policy: ConsistencyPolicy,
93    /// Default commit durability for replicated transactions.
94    ///
95    /// The `ack_timeout` field on `commit_durability` governs the
96    /// commit-side wait for replica acks; there is no separate
97    /// per-RepConfig replica-ack timeout.
98    pub commit_durability: CommitDurability,
99    /// Path to the local environment home directory (`.ndb` files).
100    ///
101    /// When set, `ReplicatedEnvironment` registers a `NetworkRestoreServer`
102    /// on the service dispatcher so that other nodes can restore from this
103    /// node via the `"RESTORE"` service.
104    pub env_home: Option<PathBuf>,
105    /// Quorum policy for elections. Default: `SimpleMajority`.
106    pub quorum_policy: QuorumPolicy,
107    /// Phi accrual suspicion threshold.
108    ///
109    /// `None` (default) uses a binary heartbeat timeout.
110    /// `Some(8.0)` enables phi accrual detection with the paper's recommended
111    /// threshold (mistake rate ≈ 10⁻⁸).
112    pub phi_threshold: Option<f64>,
113    /// Sliding-window size for phi accrual inter-arrival samples.
114    ///
115    /// Default `200` is adequate for LAN; use `1000` for WAN.
116    pub phi_window_size: usize,
117    /// Fully-described peers added to the replication group at startup.
118    ///
119    /// Useful for pre-populating quoracle capacity/latency metadata.
120    pub initial_peers: Vec<RepNode>,
121    /// Timeout per peer message exchange during Phase 1 and Phase 2 of an
122    /// election.  Default: 500 ms.
123    pub election_phase_timeout: Duration,
124    /// Reconnection backoff configuration for replica partition recovery.
125    pub reconnect_config: ReconnectConfig,
126    /// Wire-level transport this node will use.
127    ///
128    /// This field lets callers declare whether they
129    /// intend to drive replication over TCP, TLS, QUIC, or the
130    /// in-process [`crate::net::InMemoryTransport`].  See
131    /// [`RepTransportKind`] for the variants.  Defaults to
132    /// [`RepTransportKind::Tcp`] for backward compatibility.
133    pub transport_kind: RepTransportKind,
134
135    /// Allowlist of peer subject names for mTLS enforcement (Phase 2, v3.1.0).
136    ///
137    /// When non-empty and [`RepTransportKind::Tls`] is configured, the
138    /// server will:
139    ///
140    /// 1. **Require a client certificate** on every incoming TLS connection.
141    /// 2. **Validate the chain** against the CA roots in the `TlsConfig`.
142    /// 3. **Check subject names** — the peer's Subject Common Name (CN) and
143    ///    every DNS Subject Alternative Name (SAN) entry are compared
144    ///    case-insensitively against this list.  If none match, the
145    ///    handshake is aborted before any application data is exchanged.
146    ///
147    /// Matching is exact (no wildcards).  Names are compared
148    /// case-insensitively.  Whitespace-only and empty entries are ignored.
149    ///
150    /// The client side automatically presents its own certificate when the
151    /// `TlsConfig` identity is `PemFiles` or `PemBytes`.
152    ///
153    /// ## Empty list
154    ///
155    /// An empty list means no peers are admitted (`PeerAllowlistVerifier`
156    /// returns an error at construction time, which surfaces as a
157    /// `RepError::ConfigError` from `TlsConfig::to_rustls_server_config_with_allowlist`).
158    /// This is intentional fail-closed behaviour: an empty allowlist is
159    /// almost certainly a misconfiguration.
160    ///
161    /// ## Transport requirement
162    ///
163    /// Enforcement requires `transport_kind = RepTransportKind::Tls`.  With
164    /// plain TCP there is no TLS handshake and therefore no cert to inspect.
165    /// Setting this field with a non-TLS transport emits a `log::warn!`.
166    pub peer_allowlist: Vec<String>,
167
168    /// TLS configuration for the service dispatcher (Phase 3).
169    ///
170    /// When set and `transport_kind` is [`RepTransportKind::Tls`],
171    /// [`crate::replicated_environment::ReplicatedEnvironment`] will
172    /// start a `TlsTcpServiceDispatcher` (feature `tls-rustls`)
173    /// instead of the plain-TCP dispatcher.  Combined with a non-empty
174    /// `peer_allowlist`, this enforces mTLS on every incoming replication
175    /// connection at the dispatcher level.
176    ///
177    /// `None` (the default) preserves the Phase-2 behaviour: the
178    /// dispatcher uses plain TCP and the operator must wire
179    /// `TlsTcpChannelListener::bind_with_tls_and_allowlist` separately.
180    pub tls_config: Option<crate::tls::TlsConfig>,
181}
182
183impl RepConfig {
184    /// Creates a builder for `RepConfig`.
185    pub fn builder(
186        group_name: &str,
187        node_name: &str,
188        node_host: &str,
189    ) -> RepConfigBuilder {
190        RepConfigBuilder {
191            group_name: group_name.to_string(),
192            node_name: node_name.to_string(),
193            node_host: node_host.to_string(),
194            node_port: DEFAULT_NODE_PORT,
195            node_type: NodeType::Electable,
196            election_timeout: DEFAULT_ELECTION_TIMEOUT,
197            heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL,
198            consistency_policy: ConsistencyPolicy::default(),
199            commit_durability: CommitDurability::default(),
200            env_home: None,
201            quorum_policy: QuorumPolicy::SimpleMajority,
202            phi_threshold: None,
203            phi_window_size: DEFAULT_PHI_WINDOW_SIZE,
204            initial_peers: Vec::new(),
205            election_phase_timeout: DEFAULT_ELECTION_PHASE_TIMEOUT,
206            reconnect_config: ReconnectConfig::default(),
207            transport_kind: RepTransportKind::default(),
208            peer_allowlist: Vec::new(),
209            tls_config: None,
210        }
211    }
212
213    /// Convenience constructor matching the original v1.4 shape.
214    ///
215    /// Equivalent to `builder(group, node, host).node_port(port).build()`.
216    /// Provided so doc snippets and short tests don't need to write the
217    /// full builder chain.
218    /// "`RepConfig::new` example").
219    pub fn new(
220        group_name: impl Into<String>,
221        node_name: impl Into<String>,
222        node_host: impl Into<String>,
223        node_port: u16,
224    ) -> RepConfig {
225        let g = group_name.into();
226        let n = node_name.into();
227        let h = node_host.into();
228        RepConfig::builder(&g, &n, &h).node_port(node_port).build()
229    }
230
231    /// Returns the socket address string for this node.
232    pub fn socket_address(&self) -> String {
233        format!("{}:{}", self.node_host, self.node_port)
234    }
235}
236
237/// Builder for [`RepConfig`].
238#[derive(Debug, Clone)]
239pub struct RepConfigBuilder {
240    group_name: String,
241    node_name: String,
242    node_host: String,
243    node_port: u16,
244    node_type: NodeType,
245    election_timeout: Duration,
246    heartbeat_interval: Duration,
247    consistency_policy: ConsistencyPolicy,
248    commit_durability: CommitDurability,
249    env_home: Option<PathBuf>,
250    quorum_policy: QuorumPolicy,
251    phi_threshold: Option<f64>,
252    phi_window_size: usize,
253    initial_peers: Vec<RepNode>,
254    election_phase_timeout: Duration,
255    reconnect_config: ReconnectConfig,
256    transport_kind: RepTransportKind,
257    peer_allowlist: Vec<String>,
258    tls_config: Option<crate::tls::TlsConfig>,
259}
260
261impl RepConfigBuilder {
262    /// Sets the replication port.
263    pub fn node_port(mut self, port: u16) -> Self {
264        self.node_port = port;
265        self
266    }
267
268    /// Sets the node type.
269    pub fn node_type(mut self, node_type: NodeType) -> Self {
270        self.node_type = node_type;
271        self
272    }
273
274    /// Sets the election timeout.
275    pub fn election_timeout(mut self, timeout: Duration) -> Self {
276        self.election_timeout = timeout;
277        self
278    }
279
280    /// Sets the heartbeat interval.
281    pub fn heartbeat_interval(mut self, interval: Duration) -> Self {
282        self.heartbeat_interval = interval;
283        self
284    }
285
286    /// Sets the consistency policy.
287    pub fn consistency_policy(mut self, policy: ConsistencyPolicy) -> Self {
288        self.consistency_policy = policy;
289        self
290    }
291
292    /// Sets the commit durability.
293    pub fn commit_durability(mut self, durability: CommitDurability) -> Self {
294        self.commit_durability = durability;
295        self
296    }
297
298    /// Sets the environment home directory (serves `.ndb` files for network restore).
299    pub fn env_home(mut self, path: impl Into<PathBuf>) -> Self {
300        self.env_home = Some(path.into());
301        self
302    }
303
304    /// Sets the quorum policy for elections (default: `SimpleMajority`).
305    pub fn quorum_policy(mut self, policy: QuorumPolicy) -> Self {
306        self.quorum_policy = policy;
307        self
308    }
309
310    /// Enable phi accrual failure detection with the given suspicion threshold.
311    ///
312    /// `8.0` is the paper's recommended production value (mistake rate ≈ 10⁻⁸).
313    /// Call with `None` to revert to binary heartbeat timeout detection.
314    pub fn phi_threshold(mut self, threshold: Option<f64>) -> Self {
315        self.phi_threshold = threshold;
316        self
317    }
318
319    /// Sets the phi accrual inter-arrival sample window size (default 200).
320    pub fn phi_window_size(mut self, size: usize) -> Self {
321        self.phi_window_size = size;
322        self
323    }
324
325    /// Add a fully-described initial peer to the group at startup.
326    pub fn add_initial_peer(mut self, node: RepNode) -> Self {
327        self.initial_peers.push(node);
328        self
329    }
330
331    /// Set the per-peer message timeout for Phase 1 and Phase 2 election
332    /// exchanges (default: 500 ms).
333    pub fn election_phase_timeout(mut self, timeout: Duration) -> Self {
334        self.election_phase_timeout = timeout;
335        self
336    }
337
338    /// Sets the reconnection backoff configuration for replica partition recovery.
339    pub fn reconnect_config(mut self, config: ReconnectConfig) -> Self {
340        self.reconnect_config = config;
341        self
342    }
343
344    /// Sets the wire-level transport this node will use.
345    ///
346    /// Defaults to [`RepTransportKind::Tcp`].
347    /// [`RepTransportKind::InMemory`] for in-process clusters.
348    pub fn transport_kind(mut self, kind: RepTransportKind) -> Self {
349        self.transport_kind = kind;
350        self
351    }
352
353    /// Set the mTLS peer allowlist (Phase 2, v3.1.0).
354    ///
355    /// When non-empty and `transport_kind` is [`RepTransportKind::Tls`],
356    /// incoming TLS connections must present a certificate whose Subject CN
357    /// or DNS SAN matches at least one entry here (case-insensitive, exact
358    /// match).  Connections that fail the check are rejected at the TLS
359    /// handshake layer.
360    ///
361    /// See [`RepConfig::peer_allowlist`] for full details.
362    pub fn peer_allowlist(mut self, names: Vec<String>) -> Self {
363        self.peer_allowlist = names;
364        self
365    }
366
367    /// Set the TLS configuration for the service dispatcher (Phase 3).
368    ///
369    /// When set and `transport_kind` is [`RepTransportKind::Tls`],
370    /// [`crate::replicated_environment::ReplicatedEnvironment`] will use a
371    /// TLS-capable service dispatcher that enforces mTLS when
372    /// `peer_allowlist` is also non-empty.
373    ///
374    /// See [`RepConfig::tls_config`] for full details.
375    pub fn tls_config(mut self, tls: crate::tls::TlsConfig) -> Self {
376        self.tls_config = Some(tls);
377        self
378    }
379
380    /// Builds the `RepConfig`.
381    pub fn build(self) -> RepConfig {
382        RepConfig {
383            group_name: self.group_name,
384            node_name: self.node_name,
385            node_host: self.node_host,
386            node_port: self.node_port,
387            node_type: self.node_type,
388            election_timeout: self.election_timeout,
389            heartbeat_interval: self.heartbeat_interval,
390            consistency_policy: self.consistency_policy,
391            commit_durability: self.commit_durability,
392            env_home: self.env_home,
393            quorum_policy: self.quorum_policy,
394            phi_threshold: self.phi_threshold,
395            phi_window_size: self.phi_window_size,
396            initial_peers: self.initial_peers,
397            election_phase_timeout: self.election_phase_timeout,
398            reconnect_config: self.reconnect_config,
399            transport_kind: self.transport_kind,
400            peer_allowlist: self.peer_allowlist,
401            tls_config: self.tls_config,
402        }
403    }
404}
405
406#[cfg(test)]
407mod tests {
408    use super::*;
409    use crate::commit_durability::ReplicaAckPolicy;
410
411    #[test]
412    fn test_builder_defaults() {
413        let config = RepConfig::builder("group1", "node1", "localhost").build();
414        assert_eq!(config.group_name, "group1");
415        assert_eq!(config.node_name, "node1");
416        assert_eq!(config.node_host, "localhost");
417        assert_eq!(config.node_port, DEFAULT_NODE_PORT);
418        assert_eq!(config.node_type, NodeType::Electable);
419        assert_eq!(config.election_timeout, DEFAULT_ELECTION_TIMEOUT);
420        assert_eq!(config.heartbeat_interval, DEFAULT_HEARTBEAT_INTERVAL);
421        assert_eq!(config.consistency_policy, ConsistencyPolicy::NoConsistency);
422    }
423
424    #[test]
425    fn test_default_port_is_unprivileged() {
426        // Wave 1C audit cleanup (rep low "default port collision"): the
427        // default port must be in the IANA unassigned range and is not
428        // shared with another well-known service we might collide with
429        // (5001 was the v1.5.0 default; it overlaps with REPMGR among
430        // others).
431        let config = RepConfig::builder("g", "n", "h").build();
432        assert_eq!(config.node_port, 14_001);
433    }
434
435    #[test]
436    fn test_new_constructor_matches_builder() {
437        let a = RepConfig::new("g", "n", "h", 6000);
438        let b = RepConfig::builder("g", "n", "h").node_port(6000).build();
439        // The two paths must produce the same on-the-wire identity.
440        assert_eq!(a.group_name, b.group_name);
441        assert_eq!(a.node_name, b.node_name);
442        assert_eq!(a.node_host, b.node_host);
443        assert_eq!(a.node_port, b.node_port);
444        assert_eq!(a.node_type, b.node_type);
445    }
446
447    #[test]
448    fn test_builder_custom_port() {
449        let config = RepConfig::builder("g", "n", "h").node_port(6000).build();
450        assert_eq!(config.node_port, 6000);
451    }
452
453    #[test]
454    fn test_builder_node_type() {
455        let config = RepConfig::builder("g", "n", "h")
456            .node_type(NodeType::Secondary)
457            .build();
458        assert_eq!(config.node_type, NodeType::Secondary);
459    }
460
461    #[test]
462    fn test_builder_timeouts() {
463        let config = RepConfig::builder("g", "n", "h")
464            .election_timeout(Duration::from_secs(20))
465            .heartbeat_interval(Duration::from_millis(500))
466            .build();
467        assert_eq!(config.election_timeout, Duration::from_secs(20));
468        assert_eq!(config.heartbeat_interval, Duration::from_millis(500));
469    }
470
471    #[test]
472    fn test_builder_consistency_policy() {
473        let policy = ConsistencyPolicy::TimeConsistency {
474            max_lag: Duration::from_millis(500),
475            timeout: Duration::from_secs(10),
476        };
477        let config = RepConfig::builder("g", "n", "h")
478            .consistency_policy(policy.clone())
479            .build();
480        assert_eq!(config.consistency_policy, policy);
481    }
482
483    #[test]
484    fn test_builder_commit_durability() {
485        let durability = CommitDurability::new(
486            ReplicaAckPolicy::All,
487            Duration::from_secs(15),
488        );
489        let config = RepConfig::builder("g", "n", "h")
490            .commit_durability(durability)
491            .build();
492        assert_eq!(config.commit_durability.ack_policy, ReplicaAckPolicy::All);
493        assert_eq!(
494            config.commit_durability.ack_timeout,
495            Duration::from_secs(15)
496        );
497    }
498
499    #[test]
500    fn test_socket_address() {
501        let config =
502            RepConfig::builder("g", "n", "192.168.1.1").node_port(7000).build();
503        assert_eq!(config.socket_address(), "192.168.1.1:7000");
504    }
505
506    #[test]
507    fn test_builder_chaining() {
508        let config = RepConfig::builder("mygroup", "node1", "10.0.0.1")
509            .node_port(5555)
510            .node_type(NodeType::Arbiter)
511            .election_timeout(Duration::from_secs(30))
512            .build();
513        assert_eq!(config.group_name, "mygroup");
514        assert_eq!(config.node_name, "node1");
515        assert_eq!(config.node_host, "10.0.0.1");
516        assert_eq!(config.node_port, 5555);
517        assert_eq!(config.node_type, NodeType::Arbiter);
518        assert_eq!(config.election_timeout, Duration::from_secs(30));
519    }
520
521    #[test]
522    fn test_config_clone() {
523        let config = RepConfig::builder("g", "n", "h").build();
524        let cloned = config.clone();
525        assert_eq!(config.group_name, cloned.group_name);
526        assert_eq!(config.node_name, cloned.node_name);
527    }
528
529    #[test]
530    fn test_config_debug() {
531        let config = RepConfig::builder("g", "n", "h").build();
532        let s = format!("{:?}", config);
533        assert!(s.contains("RepConfig"));
534    }
535}