Skip to main content

noxu_rep/
rep_config.rs

1//! Replication configuration.
2//!
3
4use std::path::PathBuf;
5use std::time::Duration;
6
7use crate::commit_durability::CommitDurability;
8use crate::consistency::ConsistencyPolicy;
9use crate::node_type::NodeType;
10use crate::quorum_policy::QuorumPolicy;
11use crate::rep_node::RepNode;
12use crate::stream::reconnect::ReconnectConfig;
13
14/// Default election timeout.
15const DEFAULT_ELECTION_TIMEOUT: Duration = Duration::from_secs(10);
16/// Default heartbeat interval.
17const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(1);
18/// Default replication port.
19///
20/// Default port:
21/// changed from `5001` (which collides with PostgreSQL's REPMGR
22/// default and various Cisco services) to `14_001`, an unprivileged
23/// IANA-unassigned default.  Most production deployments override
24/// this; the new default is just intended to fail closed during
25/// development rather than silently bind on something else's port.
26const DEFAULT_NODE_PORT: u16 = 14_001;
27/// Default per-phase election message timeout.
28const DEFAULT_ELECTION_PHASE_TIMEOUT: Duration = Duration::from_millis(500);
29/// Default phi accrual sample window size.
30const DEFAULT_PHI_WINDOW_SIZE: usize = 200;
31
32/// Wire-level transport selected for replication traffic.
33///
34/// The in-memory transport is a first-class
35/// production option alongside TCP / TLS / QUIC.  This enum lets a
36/// caller declare the transport choice in [`RepConfig`] so higher-level
37/// orchestration code (e.g., the test harness, the `RepTestBase`
38/// integration tests, embedded deployments) can route channel
39/// construction through the right factory.
40///
41/// Note: noxu-rep's [`crate::net`] channel types are constructed
42/// directly by the user code that drives the cluster (a `TcpListener`
43/// on a port, a [`crate::net::InMemoryTransport::new_group`] mesh,
44/// etc.).  This field is therefore advisory — it documents intent and
45/// lets observability / chaos / harness layers introspect the
46/// transport without inspecting individual channel types.
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
48pub enum RepTransportKind {
49    /// Plaintext TCP via [`crate::net::TcpChannel`].
50    Tcp,
51    /// TLS-encrypted TCP via `crate::net::TlsTcpChannel`
52    /// (requires `tls-rustls` or `tls-native`).
53    Tls,
54    /// QUIC over UDP via `crate::net::QuicChannel`
55    /// (requires the `quic` feature).
56    Quic,
57    /// In-process [`crate::net::InMemoryTransport`].  Useful for
58    /// embedded deployments and integration tests that want real
59    /// `ReplicatedEnvironment` behaviour without opening sockets.
60    InMemory,
61}
62
63impl Default for RepTransportKind {
64    /// Defaults to [`RepTransportKind::Tcp`] to preserve
65    /// backward compatibility.
66    fn default() -> Self {
67        Self::Tcp
68    }
69}
70
71/// Configuration for a replication node.
72///
73/// Use the builder
74/// pattern to construct.
75#[derive(Debug, Clone)]
76pub struct RepConfig {
77    /// Name of the replication group.
78    pub group_name: String,
79    /// Name of this node within the group (must be unique).
80    pub node_name: String,
81    /// Hostname or IP address for this node.
82    pub node_host: String,
83    /// Port for replication communication.
84    pub node_port: u16,
85    /// Type of this node.
86    pub node_type: NodeType,
87    /// Timeout for elections.
88    pub election_timeout: Duration,
89    /// Interval between heartbeat messages.
90    pub heartbeat_interval: Duration,
91    /// Default consistency policy for read operations.
92    pub consistency_policy: ConsistencyPolicy,
93    /// Default commit durability for replicated transactions.
94    ///
95    /// The `ack_timeout` field on `commit_durability` governs the
96    /// commit-side wait for replica acks; there is no separate
97    /// per-RepConfig replica-ack timeout.
98    pub commit_durability: CommitDurability,
99    /// Path to the local environment home directory (`.ndb` files).
100    ///
101    /// When set, `ReplicatedEnvironment` registers a `NetworkRestoreServer`
102    /// on the service dispatcher so that other nodes can restore from this
103    /// node via the `"RESTORE"` service.
104    pub env_home: Option<PathBuf>,
105    /// Quorum policy for elections. Default: `SimpleMajority`.
106    pub quorum_policy: QuorumPolicy,
107    /// Phi accrual suspicion threshold.
108    ///
109    /// `None` (default) uses a binary heartbeat timeout.
110    /// `Some(8.0)` enables phi accrual detection with the paper's recommended
111    /// threshold (mistake rate ≈ 10⁻⁸).
112    pub phi_threshold: Option<f64>,
113    /// Sliding-window size for phi accrual inter-arrival samples.
114    ///
115    /// Default `200` is adequate for LAN; use `1000` for WAN.
116    pub phi_window_size: usize,
117    /// Fully-described peers added to the replication group at startup.
118    ///
119    /// Useful for pre-populating quoracle capacity/latency metadata.
120    pub initial_peers: Vec<RepNode>,
121    /// Timeout per peer message exchange during Phase 1 and Phase 2 of an
122    /// election.  Default: 500 ms.
123    pub election_phase_timeout: Duration,
124    /// Reconnection backoff configuration for replica partition recovery.
125    pub reconnect_config: ReconnectConfig,
126    /// Wire-level transport this node will use.
127    ///
128    /// This field lets callers declare whether they
129    /// intend to drive replication over TCP, TLS, QUIC, or the
130    /// in-process [`crate::net::InMemoryTransport`].  See
131    /// [`RepTransportKind`] for the variants.  Defaults to
132    /// [`RepTransportKind::Tcp`] for backward compatibility.
133    pub transport_kind: RepTransportKind,
134
135    /// Allowlist of peer subject names for mTLS enforcement (Phase 2, v3.1.0).
136    ///
137    /// When non-empty and [`RepTransportKind::Tls`] is configured, the
138    /// server will:
139    ///
140    /// 1. **Require a client certificate** on every incoming TLS connection.
141    /// 2. **Validate the chain** against the CA roots in the `TlsConfig`.
142    /// 3. **Check subject names** — the peer's Subject Common Name (CN) and
143    ///    every DNS Subject Alternative Name (SAN) entry are compared
144    ///    case-insensitively against this list.  If none match, the
145    ///    handshake is aborted before any application data is exchanged.
146    ///
147    /// Matching is exact (no wildcards).  Names are compared
148    /// case-insensitively.  Whitespace-only and empty entries are ignored.
149    ///
150    /// The client side automatically presents its own certificate when the
151    /// `TlsConfig` identity is `PemFiles` or `PemBytes`.
152    ///
153    /// ## Empty list
154    ///
155    /// An empty list means no peers are admitted (`PeerAllowlistVerifier`
156    /// returns an error at construction time, which surfaces as a
157    /// `RepError::ConfigError` from `TlsConfig::to_rustls_server_config_with_allowlist`).
158    /// This is intentional fail-closed behaviour: an empty allowlist is
159    /// almost certainly a misconfiguration.
160    ///
161    /// ## Transport requirement
162    ///
163    /// Enforcement requires `transport_kind = RepTransportKind::Tls`.  With
164    /// plain TCP there is no TLS handshake and therefore no cert to inspect.
165    /// Setting this field with a non-TLS transport emits a `log::warn!`.
166    pub peer_allowlist: Vec<String>,
167
168    /// TLS configuration for the service dispatcher (Phase 3).
169    ///
170    /// When set and `transport_kind` is [`RepTransportKind::Tls`],
171    /// [`crate::replicated_environment::ReplicatedEnvironment`] will
172    /// start a `TlsTcpServiceDispatcher` (feature `tls-rustls`)
173    /// instead of the plain-TCP dispatcher.  Combined with a non-empty
174    /// `peer_allowlist`, this enforces mTLS on every incoming replication
175    /// connection at the dispatcher level.
176    ///
177    /// `None` (the default) preserves the Phase-2 behaviour: the
178    /// dispatcher uses plain TCP and the operator must wire
179    /// `TlsTcpChannelListener::bind_with_tls_and_allowlist` separately.
180    pub tls_config: Option<crate::tls::TlsConfig>,
181
182    /// Enable chained / replica-to-replica log feeding (default `false`).
183    ///
184    /// When `true`, a node that becomes a **replica** ALSO runs a feeder
185    /// source on its `PEER_FEEDER` service, serving the VLSN-tagged log
186    /// stream from its OWN WAL to a downstream replica.  This lets a
187    /// mid-tier replica relay the stream (master → R1 → R2) instead of every
188    /// replica connecting directly to the master.
189    ///
190    /// Faithful to JE's cascading-feeder model: `FeederSource` is
191    /// documented as "a real Master OR a Replica in a Replica chain that is
192    /// replaying log records it received from some other source"
193    /// (`FeederSource.java`).  The feeder source on a replica reads its
194    /// VLSNIndex + log files exactly as `MasterFeederSource` does on the
195    /// master, so the downstream's syncup (REP-1) and live-apply (REP-7)
196    /// work unchanged against a replica-feeder source.
197    ///
198    /// **Default `false`** preserves master-direct behaviour: a replica
199    /// does not feed downstream peers unless cascade is explicitly enabled.
200    ///
201    /// **Durability bound**: a mid-tier replica does NOT count its
202    /// downstream's acks toward the master's commit-durability quorum.
203    /// JE evaluates the durability quorum at the master
204    /// (`FeederManager.getNumCurrentAckFeeders`); a chained replica only
205    /// tracks the downstream's progress for its own VLSN/lag bookkeeping.
206    /// A downstream replica is therefore never more durable than the
207    /// entries its mid-tier has itself persisted.
208    pub cascade_feeding: bool,
209}
210
211impl RepConfig {
212    /// Creates a builder for `RepConfig`.
213    pub fn builder(
214        group_name: &str,
215        node_name: &str,
216        node_host: &str,
217    ) -> RepConfigBuilder {
218        RepConfigBuilder {
219            group_name: group_name.to_string(),
220            node_name: node_name.to_string(),
221            node_host: node_host.to_string(),
222            node_port: DEFAULT_NODE_PORT,
223            node_type: NodeType::Electable,
224            election_timeout: DEFAULT_ELECTION_TIMEOUT,
225            heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL,
226            consistency_policy: ConsistencyPolicy::default(),
227            commit_durability: CommitDurability::default(),
228            env_home: None,
229            quorum_policy: QuorumPolicy::SimpleMajority,
230            phi_threshold: None,
231            phi_window_size: DEFAULT_PHI_WINDOW_SIZE,
232            initial_peers: Vec::new(),
233            election_phase_timeout: DEFAULT_ELECTION_PHASE_TIMEOUT,
234            reconnect_config: ReconnectConfig::default(),
235            transport_kind: RepTransportKind::default(),
236            peer_allowlist: Vec::new(),
237            tls_config: None,
238            cascade_feeding: false,
239        }
240    }
241
242    /// Convenience constructor matching the original v1.4 shape.
243    ///
244    /// Equivalent to `builder(group, node, host).node_port(port).build()`.
245    /// Provided so doc snippets and short tests don't need to write the
246    /// full builder chain.
247    /// "`RepConfig::new` example").
248    pub fn new(
249        group_name: impl Into<String>,
250        node_name: impl Into<String>,
251        node_host: impl Into<String>,
252        node_port: u16,
253    ) -> RepConfig {
254        let g = group_name.into();
255        let n = node_name.into();
256        let h = node_host.into();
257        RepConfig::builder(&g, &n, &h).node_port(node_port).build()
258    }
259
260    /// Returns the socket address string for this node.
261    pub fn socket_address(&self) -> String {
262        format!("{}:{}", self.node_host, self.node_port)
263    }
264}
265
266/// Builder for [`RepConfig`].
267#[derive(Debug, Clone)]
268pub struct RepConfigBuilder {
269    group_name: String,
270    node_name: String,
271    node_host: String,
272    node_port: u16,
273    node_type: NodeType,
274    election_timeout: Duration,
275    heartbeat_interval: Duration,
276    consistency_policy: ConsistencyPolicy,
277    commit_durability: CommitDurability,
278    env_home: Option<PathBuf>,
279    quorum_policy: QuorumPolicy,
280    phi_threshold: Option<f64>,
281    phi_window_size: usize,
282    initial_peers: Vec<RepNode>,
283    election_phase_timeout: Duration,
284    reconnect_config: ReconnectConfig,
285    transport_kind: RepTransportKind,
286    peer_allowlist: Vec<String>,
287    tls_config: Option<crate::tls::TlsConfig>,
288    cascade_feeding: bool,
289}
290
291impl RepConfigBuilder {
292    /// Sets the replication port.
293    pub fn node_port(mut self, port: u16) -> Self {
294        self.node_port = port;
295        self
296    }
297
298    /// Sets the node type.
299    pub fn node_type(mut self, node_type: NodeType) -> Self {
300        self.node_type = node_type;
301        self
302    }
303
304    /// Sets the election timeout.
305    pub fn election_timeout(mut self, timeout: Duration) -> Self {
306        self.election_timeout = timeout;
307        self
308    }
309
310    /// Sets the heartbeat interval.
311    pub fn heartbeat_interval(mut self, interval: Duration) -> Self {
312        self.heartbeat_interval = interval;
313        self
314    }
315
316    /// Sets the consistency policy.
317    pub fn consistency_policy(mut self, policy: ConsistencyPolicy) -> Self {
318        self.consistency_policy = policy;
319        self
320    }
321
322    /// Sets the commit durability.
323    pub fn commit_durability(mut self, durability: CommitDurability) -> Self {
324        self.commit_durability = durability;
325        self
326    }
327
328    /// Sets the environment home directory (serves `.ndb` files for network restore).
329    pub fn env_home(mut self, path: impl Into<PathBuf>) -> Self {
330        self.env_home = Some(path.into());
331        self
332    }
333
334    /// Enable chained / replica-to-replica log feeding (default `false`).
335    ///
336    /// When `true`, a node that becomes a replica also runs a feeder source
337    /// on its `PEER_FEEDER` service, serving its OWN WAL to a downstream
338    /// replica (master → R1 → R2).  See [`RepConfig::cascade_feeding`] for
339    /// the JE `FeederSource` citation and the durability bound.
340    pub fn cascade_feeding(mut self, enabled: bool) -> Self {
341        self.cascade_feeding = enabled;
342        self
343    }
344
345    /// Sets the quorum policy for elections (default: `SimpleMajority`).
346    pub fn quorum_policy(mut self, policy: QuorumPolicy) -> Self {
347        self.quorum_policy = policy;
348        self
349    }
350
351    /// Enable phi accrual failure detection with the given suspicion threshold.
352    ///
353    /// `8.0` is the paper's recommended production value (mistake rate ≈ 10⁻⁸).
354    /// Call with `None` to revert to binary heartbeat timeout detection.
355    pub fn phi_threshold(mut self, threshold: Option<f64>) -> Self {
356        self.phi_threshold = threshold;
357        self
358    }
359
360    /// Sets the phi accrual inter-arrival sample window size (default 200).
361    pub fn phi_window_size(mut self, size: usize) -> Self {
362        self.phi_window_size = size;
363        self
364    }
365
366    /// Add a fully-described initial peer to the group at startup.
367    pub fn add_initial_peer(mut self, node: RepNode) -> Self {
368        self.initial_peers.push(node);
369        self
370    }
371
372    /// Set the per-peer message timeout for Phase 1 and Phase 2 election
373    /// exchanges (default: 500 ms).
374    pub fn election_phase_timeout(mut self, timeout: Duration) -> Self {
375        self.election_phase_timeout = timeout;
376        self
377    }
378
379    /// Sets the reconnection backoff configuration for replica partition recovery.
380    pub fn reconnect_config(mut self, config: ReconnectConfig) -> Self {
381        self.reconnect_config = config;
382        self
383    }
384
385    /// Sets the wire-level transport this node will use.
386    ///
387    /// Defaults to [`RepTransportKind::Tcp`].
388    /// [`RepTransportKind::InMemory`] for in-process clusters.
389    pub fn transport_kind(mut self, kind: RepTransportKind) -> Self {
390        self.transport_kind = kind;
391        self
392    }
393
394    /// Set the mTLS peer allowlist (Phase 2, v3.1.0).
395    ///
396    /// When non-empty and `transport_kind` is [`RepTransportKind::Tls`],
397    /// incoming TLS connections must present a certificate whose Subject CN
398    /// or DNS SAN matches at least one entry here (case-insensitive, exact
399    /// match).  Connections that fail the check are rejected at the TLS
400    /// handshake layer.
401    ///
402    /// See [`RepConfig::peer_allowlist`] for full details.
403    pub fn peer_allowlist(mut self, names: Vec<String>) -> Self {
404        self.peer_allowlist = names;
405        self
406    }
407
408    /// Set the TLS configuration for the service dispatcher (Phase 3).
409    ///
410    /// When set and `transport_kind` is [`RepTransportKind::Tls`],
411    /// [`crate::replicated_environment::ReplicatedEnvironment`] will use a
412    /// TLS-capable service dispatcher that enforces mTLS when
413    /// `peer_allowlist` is also non-empty.
414    ///
415    /// See [`RepConfig::tls_config`] for full details.
416    pub fn tls_config(mut self, tls: crate::tls::TlsConfig) -> Self {
417        self.tls_config = Some(tls);
418        self
419    }
420
421    /// Builds the `RepConfig`.
422    pub fn build(self) -> RepConfig {
423        RepConfig {
424            group_name: self.group_name,
425            node_name: self.node_name,
426            node_host: self.node_host,
427            node_port: self.node_port,
428            node_type: self.node_type,
429            election_timeout: self.election_timeout,
430            heartbeat_interval: self.heartbeat_interval,
431            consistency_policy: self.consistency_policy,
432            commit_durability: self.commit_durability,
433            env_home: self.env_home,
434            quorum_policy: self.quorum_policy,
435            phi_threshold: self.phi_threshold,
436            phi_window_size: self.phi_window_size,
437            initial_peers: self.initial_peers,
438            election_phase_timeout: self.election_phase_timeout,
439            reconnect_config: self.reconnect_config,
440            transport_kind: self.transport_kind,
441            peer_allowlist: self.peer_allowlist,
442            tls_config: self.tls_config,
443            cascade_feeding: self.cascade_feeding,
444        }
445    }
446}
447
448#[cfg(test)]
449mod tests {
450    use super::*;
451    use crate::commit_durability::ReplicaAckPolicy;
452
453    #[test]
454    fn test_builder_defaults() {
455        let config = RepConfig::builder("group1", "node1", "localhost").build();
456        assert_eq!(config.group_name, "group1");
457        assert_eq!(config.node_name, "node1");
458        assert_eq!(config.node_host, "localhost");
459        assert_eq!(config.node_port, DEFAULT_NODE_PORT);
460        assert_eq!(config.node_type, NodeType::Electable);
461        assert_eq!(config.election_timeout, DEFAULT_ELECTION_TIMEOUT);
462        assert_eq!(config.heartbeat_interval, DEFAULT_HEARTBEAT_INTERVAL);
463        assert_eq!(config.consistency_policy, ConsistencyPolicy::NoConsistency);
464    }
465
466    #[test]
467    fn test_default_port_is_unprivileged() {
468        // Wave 1C audit cleanup (rep low "default port collision"): the
469        // default port must be in the IANA unassigned range and is not
470        // shared with another well-known service we might collide with
471        // (5001 was the v1.5.0 default; it overlaps with REPMGR among
472        // others).
473        let config = RepConfig::builder("g", "n", "h").build();
474        assert_eq!(config.node_port, 14_001);
475    }
476
477    #[test]
478    fn test_new_constructor_matches_builder() {
479        let a = RepConfig::new("g", "n", "h", 6000);
480        let b = RepConfig::builder("g", "n", "h").node_port(6000).build();
481        // The two paths must produce the same on-the-wire identity.
482        assert_eq!(a.group_name, b.group_name);
483        assert_eq!(a.node_name, b.node_name);
484        assert_eq!(a.node_host, b.node_host);
485        assert_eq!(a.node_port, b.node_port);
486        assert_eq!(a.node_type, b.node_type);
487    }
488
489    #[test]
490    fn test_builder_custom_port() {
491        let config = RepConfig::builder("g", "n", "h").node_port(6000).build();
492        assert_eq!(config.node_port, 6000);
493    }
494
495    #[test]
496    fn test_builder_node_type() {
497        let config = RepConfig::builder("g", "n", "h")
498            .node_type(NodeType::Secondary)
499            .build();
500        assert_eq!(config.node_type, NodeType::Secondary);
501    }
502
503    #[test]
504    fn test_builder_timeouts() {
505        let config = RepConfig::builder("g", "n", "h")
506            .election_timeout(Duration::from_secs(20))
507            .heartbeat_interval(Duration::from_millis(500))
508            .build();
509        assert_eq!(config.election_timeout, Duration::from_secs(20));
510        assert_eq!(config.heartbeat_interval, Duration::from_millis(500));
511    }
512
513    #[test]
514    fn test_builder_consistency_policy() {
515        let policy = ConsistencyPolicy::TimeConsistency {
516            max_lag: Duration::from_millis(500),
517            timeout: Duration::from_secs(10),
518        };
519        let config = RepConfig::builder("g", "n", "h")
520            .consistency_policy(policy.clone())
521            .build();
522        assert_eq!(config.consistency_policy, policy);
523    }
524
525    #[test]
526    fn test_builder_commit_durability() {
527        let durability = CommitDurability::new(
528            ReplicaAckPolicy::All,
529            Duration::from_secs(15),
530        );
531        let config = RepConfig::builder("g", "n", "h")
532            .commit_durability(durability)
533            .build();
534        assert_eq!(config.commit_durability.ack_policy, ReplicaAckPolicy::All);
535        assert_eq!(
536            config.commit_durability.ack_timeout,
537            Duration::from_secs(15)
538        );
539    }
540
541    #[test]
542    fn test_socket_address() {
543        let config =
544            RepConfig::builder("g", "n", "192.168.1.1").node_port(7000).build();
545        assert_eq!(config.socket_address(), "192.168.1.1:7000");
546    }
547
548    #[test]
549    fn test_builder_chaining() {
550        let config = RepConfig::builder("mygroup", "node1", "10.0.0.1")
551            .node_port(5555)
552            .node_type(NodeType::Arbiter)
553            .election_timeout(Duration::from_secs(30))
554            .build();
555        assert_eq!(config.group_name, "mygroup");
556        assert_eq!(config.node_name, "node1");
557        assert_eq!(config.node_host, "10.0.0.1");
558        assert_eq!(config.node_port, 5555);
559        assert_eq!(config.node_type, NodeType::Arbiter);
560        assert_eq!(config.election_timeout, Duration::from_secs(30));
561    }
562
563    #[test]
564    fn test_config_clone() {
565        let config = RepConfig::builder("g", "n", "h").build();
566        let cloned = config.clone();
567        assert_eq!(config.group_name, cloned.group_name);
568        assert_eq!(config.node_name, cloned.node_name);
569    }
570
571    #[test]
572    fn test_config_debug() {
573        let config = RepConfig::builder("g", "n", "h").build();
574        let s = format!("{:?}", config);
575        assert!(s.contains("RepConfig"));
576    }
577}