Skip to main content

noxu_rep/
rep_config.rs

1//! Replication configuration.
2//!
3
4use std::path::PathBuf;
5use std::time::Duration;
6
7use crate::commit_durability::CommitDurability;
8use crate::consistency::ConsistencyPolicy;
9use crate::node_type::NodeType;
10use crate::quorum_policy::QuorumPolicy;
11use crate::rep_node::RepNode;
12use crate::stream::reconnect::ReconnectConfig;
13
14/// Default election timeout.
15const DEFAULT_ELECTION_TIMEOUT: Duration = Duration::from_secs(10);
16/// Default heartbeat interval.
17const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(1);
18/// Default replication port.
19///
20/// Default port:
21/// changed from `5001` (which collides with PostgreSQL's REPMGR
22/// default and various Cisco services) to `14_001`, an unprivileged
23/// IANA-unassigned default.  Most production deployments override
24/// this; the new default is just intended to fail closed during
25/// development rather than silently bind on something else's port.
26const DEFAULT_NODE_PORT: u16 = 14_001;
27/// Default per-phase election message timeout.
28const DEFAULT_ELECTION_PHASE_TIMEOUT: Duration = Duration::from_millis(500);
29/// Default phi accrual sample window size.
30const DEFAULT_PHI_WINDOW_SIZE: usize = 200;
31
32/// Wire-level transport selected for replication traffic.
33///
34/// The in-memory transport is a first-class
35/// production option alongside TCP / TLS / QUIC.  This enum lets a
36/// caller declare the transport choice in [`RepConfig`] so higher-level
37/// orchestration code (e.g., the test harness, the `RepTestBase`
38/// integration tests, embedded deployments) can route channel
39/// construction through the right factory.
40///
41/// Note: noxu-rep's [`crate::net`] channel types are constructed
42/// directly by the user code that drives the cluster (a `TcpListener`
43/// on a port, a [`crate::net::InMemoryTransport::new_group`] mesh,
44/// etc.).  This field is therefore advisory — it documents intent and
45/// lets observability / chaos / harness layers introspect the
46/// transport without inspecting individual channel types.
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
48pub enum RepTransportKind {
49    /// Plaintext TCP via [`crate::net::TcpChannel`].
50    Tcp,
51    /// TLS-encrypted TCP via `crate::net::TlsTcpChannel`
52    /// (requires `tls-rustls` or `tls-native`).
53    Tls,
54    /// QUIC over UDP via `crate::net::QuicChannel`
55    /// (requires the `quic` feature).
56    Quic,
57    /// In-process [`crate::net::InMemoryTransport`].  Useful for
58    /// embedded deployments and integration tests that want real
59    /// `ReplicatedEnvironment` behaviour without opening sockets.
60    InMemory,
61}
62
63impl Default for RepTransportKind {
64    /// Defaults to [`RepTransportKind::Tcp`] to preserve
65    /// backward compatibility.
66    fn default() -> Self {
67        Self::Tcp
68    }
69}
70
71/// Configuration for a replication node.
72///
73/// Use the builder
74/// pattern to construct.
75#[derive(Debug, Clone)]
76pub struct RepConfig {
77    /// Name of the replication group.
78    pub group_name: String,
79    /// Name of this node within the group (must be unique).
80    pub node_name: String,
81    /// Hostname or IP address for this node.
82    pub node_host: String,
83    /// Port for replication communication.
84    pub node_port: u16,
85    /// Type of this node.
86    pub node_type: NodeType,
87    /// Timeout for elections.
88    pub election_timeout: Duration,
89    /// Interval between heartbeat messages.
90    pub heartbeat_interval: Duration,
91    /// Default consistency policy for read operations.
92    pub consistency_policy: ConsistencyPolicy,
93    /// Default commit durability for replicated transactions.
94    ///
95    /// The `ack_timeout` field on `commit_durability` governs the
96    /// commit-side wait for replica acks; there is no separate
97    /// per-RepConfig replica-ack timeout.
98    pub commit_durability: CommitDurability,
99    /// Path to the local environment home directory (`.ndb` files).
100    ///
101    /// When set, `ReplicatedEnvironment` registers a `NetworkRestoreServer`
102    /// on the service dispatcher so that other nodes can restore from this
103    /// node via the `"RESTORE"` service.
104    pub env_home: Option<PathBuf>,
105    /// Quorum policy for elections. Default: `SimpleMajority`.
106    pub quorum_policy: QuorumPolicy,
107    /// Phi accrual suspicion threshold.
108    ///
109    /// `None` (default) uses a binary heartbeat timeout.
110    /// `Some(8.0)` enables phi accrual detection with the paper's recommended
111    /// threshold (mistake rate ≈ 10⁻⁸).
112    pub phi_threshold: Option<f64>,
113    /// Sliding-window size for phi accrual inter-arrival samples.
114    ///
115    /// Default `200` is adequate for LAN; use `1000` for WAN.
116    pub phi_window_size: usize,
117    /// Fully-described peers added to the replication group at startup.
118    ///
119    /// Useful for pre-populating quoracle capacity/latency metadata.
120    pub initial_peers: Vec<RepNode>,
121    /// Timeout per peer message exchange during Phase 1 and Phase 2 of an
122    /// election.  Default: 500 ms.
123    pub election_phase_timeout: Duration,
124    /// Reconnection backoff configuration for replica partition recovery.
125    pub reconnect_config: ReconnectConfig,
126    /// Wire-level transport this node will use.
127    ///
128    /// This field lets callers declare whether they
129    /// intend to drive replication over TCP, TLS, QUIC, or the
130    /// in-process [`crate::net::InMemoryTransport`].  See
131    /// [`RepTransportKind`] for the variants.  Defaults to
132    /// [`RepTransportKind::Tcp`] for backward compatibility.
133    pub transport_kind: RepTransportKind,
134}
135
136impl RepConfig {
137    /// Creates a builder for `RepConfig`.
138    pub fn builder(
139        group_name: &str,
140        node_name: &str,
141        node_host: &str,
142    ) -> RepConfigBuilder {
143        RepConfigBuilder {
144            group_name: group_name.to_string(),
145            node_name: node_name.to_string(),
146            node_host: node_host.to_string(),
147            node_port: DEFAULT_NODE_PORT,
148            node_type: NodeType::Electable,
149            election_timeout: DEFAULT_ELECTION_TIMEOUT,
150            heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL,
151            consistency_policy: ConsistencyPolicy::default(),
152            commit_durability: CommitDurability::default(),
153            env_home: None,
154            quorum_policy: QuorumPolicy::SimpleMajority,
155            phi_threshold: None,
156            phi_window_size: DEFAULT_PHI_WINDOW_SIZE,
157            initial_peers: Vec::new(),
158            election_phase_timeout: DEFAULT_ELECTION_PHASE_TIMEOUT,
159            reconnect_config: ReconnectConfig::default(),
160            transport_kind: RepTransportKind::default(),
161        }
162    }
163
164    /// Convenience constructor matching the original v1.4 shape.
165    ///
166    /// Equivalent to `builder(group, node, host).node_port(port).build()`.
167    /// Provided so doc snippets and short tests don't need to write the
168    /// full builder chain.
169    /// "`RepConfig::new` example").
170    pub fn new(
171        group_name: impl Into<String>,
172        node_name: impl Into<String>,
173        node_host: impl Into<String>,
174        node_port: u16,
175    ) -> RepConfig {
176        let g = group_name.into();
177        let n = node_name.into();
178        let h = node_host.into();
179        RepConfig::builder(&g, &n, &h).node_port(node_port).build()
180    }
181
182    /// Returns the socket address string for this node.
183    pub fn socket_address(&self) -> String {
184        format!("{}:{}", self.node_host, self.node_port)
185    }
186}
187
188/// Builder for [`RepConfig`].
189#[derive(Debug, Clone)]
190pub struct RepConfigBuilder {
191    group_name: String,
192    node_name: String,
193    node_host: String,
194    node_port: u16,
195    node_type: NodeType,
196    election_timeout: Duration,
197    heartbeat_interval: Duration,
198    consistency_policy: ConsistencyPolicy,
199    commit_durability: CommitDurability,
200    env_home: Option<PathBuf>,
201    quorum_policy: QuorumPolicy,
202    phi_threshold: Option<f64>,
203    phi_window_size: usize,
204    initial_peers: Vec<RepNode>,
205    election_phase_timeout: Duration,
206    reconnect_config: ReconnectConfig,
207    transport_kind: RepTransportKind,
208}
209
210impl RepConfigBuilder {
211    /// Sets the replication port.
212    pub fn node_port(mut self, port: u16) -> Self {
213        self.node_port = port;
214        self
215    }
216
217    /// Sets the node type.
218    pub fn node_type(mut self, node_type: NodeType) -> Self {
219        self.node_type = node_type;
220        self
221    }
222
223    /// Sets the election timeout.
224    pub fn election_timeout(mut self, timeout: Duration) -> Self {
225        self.election_timeout = timeout;
226        self
227    }
228
229    /// Sets the heartbeat interval.
230    pub fn heartbeat_interval(mut self, interval: Duration) -> Self {
231        self.heartbeat_interval = interval;
232        self
233    }
234
235    /// Sets the consistency policy.
236    pub fn consistency_policy(mut self, policy: ConsistencyPolicy) -> Self {
237        self.consistency_policy = policy;
238        self
239    }
240
241    /// Sets the commit durability.
242    pub fn commit_durability(mut self, durability: CommitDurability) -> Self {
243        self.commit_durability = durability;
244        self
245    }
246
247    /// Sets the environment home directory (serves `.ndb` files for network restore).
248    pub fn env_home(mut self, path: impl Into<PathBuf>) -> Self {
249        self.env_home = Some(path.into());
250        self
251    }
252
253    /// Sets the quorum policy for elections (default: `SimpleMajority`).
254    pub fn quorum_policy(mut self, policy: QuorumPolicy) -> Self {
255        self.quorum_policy = policy;
256        self
257    }
258
259    /// Enable phi accrual failure detection with the given suspicion threshold.
260    ///
261    /// `8.0` is the paper's recommended production value (mistake rate ≈ 10⁻⁸).
262    /// Call with `None` to revert to binary heartbeat timeout detection.
263    pub fn phi_threshold(mut self, threshold: Option<f64>) -> Self {
264        self.phi_threshold = threshold;
265        self
266    }
267
268    /// Sets the phi accrual inter-arrival sample window size (default 200).
269    pub fn phi_window_size(mut self, size: usize) -> Self {
270        self.phi_window_size = size;
271        self
272    }
273
274    /// Add a fully-described initial peer to the group at startup.
275    pub fn add_initial_peer(mut self, node: RepNode) -> Self {
276        self.initial_peers.push(node);
277        self
278    }
279
280    /// Set the per-peer message timeout for Phase 1 and Phase 2 election
281    /// exchanges (default: 500 ms).
282    pub fn election_phase_timeout(mut self, timeout: Duration) -> Self {
283        self.election_phase_timeout = timeout;
284        self
285    }
286
287    /// Sets the reconnection backoff configuration for replica partition recovery.
288    pub fn reconnect_config(mut self, config: ReconnectConfig) -> Self {
289        self.reconnect_config = config;
290        self
291    }
292
293    /// Sets the wire-level transport this node will use.
294    ///
295    /// Defaults to [`RepTransportKind::Tcp`].
296    /// [`RepTransportKind::InMemory`] for in-process clusters.
297    pub fn transport_kind(mut self, kind: RepTransportKind) -> Self {
298        self.transport_kind = kind;
299        self
300    }
301
302    /// Builds the `RepConfig`.
303    pub fn build(self) -> RepConfig {
304        RepConfig {
305            group_name: self.group_name,
306            node_name: self.node_name,
307            node_host: self.node_host,
308            node_port: self.node_port,
309            node_type: self.node_type,
310            election_timeout: self.election_timeout,
311            heartbeat_interval: self.heartbeat_interval,
312            consistency_policy: self.consistency_policy,
313            commit_durability: self.commit_durability,
314            env_home: self.env_home,
315            quorum_policy: self.quorum_policy,
316            phi_threshold: self.phi_threshold,
317            phi_window_size: self.phi_window_size,
318            initial_peers: self.initial_peers,
319            election_phase_timeout: self.election_phase_timeout,
320            reconnect_config: self.reconnect_config,
321            transport_kind: self.transport_kind,
322        }
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use super::*;
329    use crate::commit_durability::ReplicaAckPolicy;
330
331    #[test]
332    fn test_builder_defaults() {
333        let config = RepConfig::builder("group1", "node1", "localhost").build();
334        assert_eq!(config.group_name, "group1");
335        assert_eq!(config.node_name, "node1");
336        assert_eq!(config.node_host, "localhost");
337        assert_eq!(config.node_port, DEFAULT_NODE_PORT);
338        assert_eq!(config.node_type, NodeType::Electable);
339        assert_eq!(config.election_timeout, DEFAULT_ELECTION_TIMEOUT);
340        assert_eq!(config.heartbeat_interval, DEFAULT_HEARTBEAT_INTERVAL);
341        assert_eq!(config.consistency_policy, ConsistencyPolicy::NoConsistency);
342    }
343
344    #[test]
345    fn test_default_port_is_unprivileged() {
346        // Wave 1C audit cleanup (rep low "default port collision"): the
347        // default port must be in the IANA unassigned range and is not
348        // shared with another well-known service we might collide with
349        // (5001 was the v1.5.0 default; it overlaps with REPMGR among
350        // others).
351        let config = RepConfig::builder("g", "n", "h").build();
352        assert_eq!(config.node_port, 14_001);
353    }
354
355    #[test]
356    fn test_new_constructor_matches_builder() {
357        let a = RepConfig::new("g", "n", "h", 6000);
358        let b = RepConfig::builder("g", "n", "h").node_port(6000).build();
359        // The two paths must produce the same on-the-wire identity.
360        assert_eq!(a.group_name, b.group_name);
361        assert_eq!(a.node_name, b.node_name);
362        assert_eq!(a.node_host, b.node_host);
363        assert_eq!(a.node_port, b.node_port);
364        assert_eq!(a.node_type, b.node_type);
365    }
366
367    #[test]
368    fn test_builder_custom_port() {
369        let config = RepConfig::builder("g", "n", "h").node_port(6000).build();
370        assert_eq!(config.node_port, 6000);
371    }
372
373    #[test]
374    fn test_builder_node_type() {
375        let config = RepConfig::builder("g", "n", "h")
376            .node_type(NodeType::Secondary)
377            .build();
378        assert_eq!(config.node_type, NodeType::Secondary);
379    }
380
381    #[test]
382    fn test_builder_timeouts() {
383        let config = RepConfig::builder("g", "n", "h")
384            .election_timeout(Duration::from_secs(20))
385            .heartbeat_interval(Duration::from_millis(500))
386            .build();
387        assert_eq!(config.election_timeout, Duration::from_secs(20));
388        assert_eq!(config.heartbeat_interval, Duration::from_millis(500));
389    }
390
391    #[test]
392    fn test_builder_consistency_policy() {
393        let policy = ConsistencyPolicy::TimeConsistency {
394            max_lag: Duration::from_millis(500),
395            timeout: Duration::from_secs(10),
396        };
397        let config = RepConfig::builder("g", "n", "h")
398            .consistency_policy(policy.clone())
399            .build();
400        assert_eq!(config.consistency_policy, policy);
401    }
402
403    #[test]
404    fn test_builder_commit_durability() {
405        let durability = CommitDurability::new(
406            ReplicaAckPolicy::All,
407            Duration::from_secs(15),
408        );
409        let config = RepConfig::builder("g", "n", "h")
410            .commit_durability(durability)
411            .build();
412        assert_eq!(config.commit_durability.ack_policy, ReplicaAckPolicy::All);
413        assert_eq!(
414            config.commit_durability.ack_timeout,
415            Duration::from_secs(15)
416        );
417    }
418
419    #[test]
420    fn test_socket_address() {
421        let config =
422            RepConfig::builder("g", "n", "192.168.1.1").node_port(7000).build();
423        assert_eq!(config.socket_address(), "192.168.1.1:7000");
424    }
425
426    #[test]
427    fn test_builder_chaining() {
428        let config = RepConfig::builder("mygroup", "node1", "10.0.0.1")
429            .node_port(5555)
430            .node_type(NodeType::Arbiter)
431            .election_timeout(Duration::from_secs(30))
432            .build();
433        assert_eq!(config.group_name, "mygroup");
434        assert_eq!(config.node_name, "node1");
435        assert_eq!(config.node_host, "10.0.0.1");
436        assert_eq!(config.node_port, 5555);
437        assert_eq!(config.node_type, NodeType::Arbiter);
438        assert_eq!(config.election_timeout, Duration::from_secs(30));
439    }
440
441    #[test]
442    fn test_config_clone() {
443        let config = RepConfig::builder("g", "n", "h").build();
444        let cloned = config.clone();
445        assert_eq!(config.group_name, cloned.group_name);
446        assert_eq!(config.node_name, cloned.node_name);
447    }
448
449    #[test]
450    fn test_config_debug() {
451        let config = RepConfig::builder("g", "n", "h").build();
452        let s = format!("{:?}", config);
453        assert!(s.contains("RepConfig"));
454    }
455}