1use std::path::PathBuf;
5use std::time::Duration;
6
7use crate::commit_durability::CommitDurability;
8use crate::consistency::ConsistencyPolicy;
9use crate::node_type::NodeType;
10use crate::quorum_policy::QuorumPolicy;
11use crate::rep_node::RepNode;
12use crate::stream::reconnect::ReconnectConfig;
13
14const DEFAULT_ELECTION_TIMEOUT: Duration = Duration::from_secs(10);
16const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(1);
18const DEFAULT_NODE_PORT: u16 = 14_001;
27const DEFAULT_ELECTION_PHASE_TIMEOUT: Duration = Duration::from_millis(500);
29const DEFAULT_PHI_WINDOW_SIZE: usize = 200;
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
48pub enum RepTransportKind {
49 Tcp,
51 Tls,
54 Quic,
57 InMemory,
61}
62
63impl Default for RepTransportKind {
64 fn default() -> Self {
67 Self::Tcp
68 }
69}
70
71#[derive(Debug, Clone)]
76pub struct RepConfig {
77 pub group_name: String,
79 pub node_name: String,
81 pub node_host: String,
83 pub node_port: u16,
85 pub node_type: NodeType,
87 pub election_timeout: Duration,
89 pub heartbeat_interval: Duration,
91 pub consistency_policy: ConsistencyPolicy,
93 pub commit_durability: CommitDurability,
99 pub env_home: Option<PathBuf>,
105 pub quorum_policy: QuorumPolicy,
107 pub phi_threshold: Option<f64>,
113 pub phi_window_size: usize,
117 pub initial_peers: Vec<RepNode>,
121 pub election_phase_timeout: Duration,
124 pub reconnect_config: ReconnectConfig,
126 pub transport_kind: RepTransportKind,
134}
135
136impl RepConfig {
137 pub fn builder(
139 group_name: &str,
140 node_name: &str,
141 node_host: &str,
142 ) -> RepConfigBuilder {
143 RepConfigBuilder {
144 group_name: group_name.to_string(),
145 node_name: node_name.to_string(),
146 node_host: node_host.to_string(),
147 node_port: DEFAULT_NODE_PORT,
148 node_type: NodeType::Electable,
149 election_timeout: DEFAULT_ELECTION_TIMEOUT,
150 heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL,
151 consistency_policy: ConsistencyPolicy::default(),
152 commit_durability: CommitDurability::default(),
153 env_home: None,
154 quorum_policy: QuorumPolicy::SimpleMajority,
155 phi_threshold: None,
156 phi_window_size: DEFAULT_PHI_WINDOW_SIZE,
157 initial_peers: Vec::new(),
158 election_phase_timeout: DEFAULT_ELECTION_PHASE_TIMEOUT,
159 reconnect_config: ReconnectConfig::default(),
160 transport_kind: RepTransportKind::default(),
161 }
162 }
163
164 pub fn new(
171 group_name: impl Into<String>,
172 node_name: impl Into<String>,
173 node_host: impl Into<String>,
174 node_port: u16,
175 ) -> RepConfig {
176 let g = group_name.into();
177 let n = node_name.into();
178 let h = node_host.into();
179 RepConfig::builder(&g, &n, &h).node_port(node_port).build()
180 }
181
182 pub fn socket_address(&self) -> String {
184 format!("{}:{}", self.node_host, self.node_port)
185 }
186}
187
188#[derive(Debug, Clone)]
190pub struct RepConfigBuilder {
191 group_name: String,
192 node_name: String,
193 node_host: String,
194 node_port: u16,
195 node_type: NodeType,
196 election_timeout: Duration,
197 heartbeat_interval: Duration,
198 consistency_policy: ConsistencyPolicy,
199 commit_durability: CommitDurability,
200 env_home: Option<PathBuf>,
201 quorum_policy: QuorumPolicy,
202 phi_threshold: Option<f64>,
203 phi_window_size: usize,
204 initial_peers: Vec<RepNode>,
205 election_phase_timeout: Duration,
206 reconnect_config: ReconnectConfig,
207 transport_kind: RepTransportKind,
208}
209
210impl RepConfigBuilder {
211 pub fn node_port(mut self, port: u16) -> Self {
213 self.node_port = port;
214 self
215 }
216
217 pub fn node_type(mut self, node_type: NodeType) -> Self {
219 self.node_type = node_type;
220 self
221 }
222
223 pub fn election_timeout(mut self, timeout: Duration) -> Self {
225 self.election_timeout = timeout;
226 self
227 }
228
229 pub fn heartbeat_interval(mut self, interval: Duration) -> Self {
231 self.heartbeat_interval = interval;
232 self
233 }
234
235 pub fn consistency_policy(mut self, policy: ConsistencyPolicy) -> Self {
237 self.consistency_policy = policy;
238 self
239 }
240
241 pub fn commit_durability(mut self, durability: CommitDurability) -> Self {
243 self.commit_durability = durability;
244 self
245 }
246
247 pub fn env_home(mut self, path: impl Into<PathBuf>) -> Self {
249 self.env_home = Some(path.into());
250 self
251 }
252
253 pub fn quorum_policy(mut self, policy: QuorumPolicy) -> Self {
255 self.quorum_policy = policy;
256 self
257 }
258
259 pub fn phi_threshold(mut self, threshold: Option<f64>) -> Self {
264 self.phi_threshold = threshold;
265 self
266 }
267
268 pub fn phi_window_size(mut self, size: usize) -> Self {
270 self.phi_window_size = size;
271 self
272 }
273
274 pub fn add_initial_peer(mut self, node: RepNode) -> Self {
276 self.initial_peers.push(node);
277 self
278 }
279
280 pub fn election_phase_timeout(mut self, timeout: Duration) -> Self {
283 self.election_phase_timeout = timeout;
284 self
285 }
286
287 pub fn reconnect_config(mut self, config: ReconnectConfig) -> Self {
289 self.reconnect_config = config;
290 self
291 }
292
293 pub fn transport_kind(mut self, kind: RepTransportKind) -> Self {
298 self.transport_kind = kind;
299 self
300 }
301
302 pub fn build(self) -> RepConfig {
304 RepConfig {
305 group_name: self.group_name,
306 node_name: self.node_name,
307 node_host: self.node_host,
308 node_port: self.node_port,
309 node_type: self.node_type,
310 election_timeout: self.election_timeout,
311 heartbeat_interval: self.heartbeat_interval,
312 consistency_policy: self.consistency_policy,
313 commit_durability: self.commit_durability,
314 env_home: self.env_home,
315 quorum_policy: self.quorum_policy,
316 phi_threshold: self.phi_threshold,
317 phi_window_size: self.phi_window_size,
318 initial_peers: self.initial_peers,
319 election_phase_timeout: self.election_phase_timeout,
320 reconnect_config: self.reconnect_config,
321 transport_kind: self.transport_kind,
322 }
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use super::*;
329 use crate::commit_durability::ReplicaAckPolicy;
330
331 #[test]
332 fn test_builder_defaults() {
333 let config = RepConfig::builder("group1", "node1", "localhost").build();
334 assert_eq!(config.group_name, "group1");
335 assert_eq!(config.node_name, "node1");
336 assert_eq!(config.node_host, "localhost");
337 assert_eq!(config.node_port, DEFAULT_NODE_PORT);
338 assert_eq!(config.node_type, NodeType::Electable);
339 assert_eq!(config.election_timeout, DEFAULT_ELECTION_TIMEOUT);
340 assert_eq!(config.heartbeat_interval, DEFAULT_HEARTBEAT_INTERVAL);
341 assert_eq!(config.consistency_policy, ConsistencyPolicy::NoConsistency);
342 }
343
344 #[test]
345 fn test_default_port_is_unprivileged() {
346 let config = RepConfig::builder("g", "n", "h").build();
352 assert_eq!(config.node_port, 14_001);
353 }
354
355 #[test]
356 fn test_new_constructor_matches_builder() {
357 let a = RepConfig::new("g", "n", "h", 6000);
358 let b = RepConfig::builder("g", "n", "h").node_port(6000).build();
359 assert_eq!(a.group_name, b.group_name);
361 assert_eq!(a.node_name, b.node_name);
362 assert_eq!(a.node_host, b.node_host);
363 assert_eq!(a.node_port, b.node_port);
364 assert_eq!(a.node_type, b.node_type);
365 }
366
367 #[test]
368 fn test_builder_custom_port() {
369 let config = RepConfig::builder("g", "n", "h").node_port(6000).build();
370 assert_eq!(config.node_port, 6000);
371 }
372
373 #[test]
374 fn test_builder_node_type() {
375 let config = RepConfig::builder("g", "n", "h")
376 .node_type(NodeType::Secondary)
377 .build();
378 assert_eq!(config.node_type, NodeType::Secondary);
379 }
380
381 #[test]
382 fn test_builder_timeouts() {
383 let config = RepConfig::builder("g", "n", "h")
384 .election_timeout(Duration::from_secs(20))
385 .heartbeat_interval(Duration::from_millis(500))
386 .build();
387 assert_eq!(config.election_timeout, Duration::from_secs(20));
388 assert_eq!(config.heartbeat_interval, Duration::from_millis(500));
389 }
390
391 #[test]
392 fn test_builder_consistency_policy() {
393 let policy = ConsistencyPolicy::TimeConsistency {
394 max_lag: Duration::from_millis(500),
395 timeout: Duration::from_secs(10),
396 };
397 let config = RepConfig::builder("g", "n", "h")
398 .consistency_policy(policy.clone())
399 .build();
400 assert_eq!(config.consistency_policy, policy);
401 }
402
403 #[test]
404 fn test_builder_commit_durability() {
405 let durability = CommitDurability::new(
406 ReplicaAckPolicy::All,
407 Duration::from_secs(15),
408 );
409 let config = RepConfig::builder("g", "n", "h")
410 .commit_durability(durability)
411 .build();
412 assert_eq!(config.commit_durability.ack_policy, ReplicaAckPolicy::All);
413 assert_eq!(
414 config.commit_durability.ack_timeout,
415 Duration::from_secs(15)
416 );
417 }
418
419 #[test]
420 fn test_socket_address() {
421 let config =
422 RepConfig::builder("g", "n", "192.168.1.1").node_port(7000).build();
423 assert_eq!(config.socket_address(), "192.168.1.1:7000");
424 }
425
426 #[test]
427 fn test_builder_chaining() {
428 let config = RepConfig::builder("mygroup", "node1", "10.0.0.1")
429 .node_port(5555)
430 .node_type(NodeType::Arbiter)
431 .election_timeout(Duration::from_secs(30))
432 .build();
433 assert_eq!(config.group_name, "mygroup");
434 assert_eq!(config.node_name, "node1");
435 assert_eq!(config.node_host, "10.0.0.1");
436 assert_eq!(config.node_port, 5555);
437 assert_eq!(config.node_type, NodeType::Arbiter);
438 assert_eq!(config.election_timeout, Duration::from_secs(30));
439 }
440
441 #[test]
442 fn test_config_clone() {
443 let config = RepConfig::builder("g", "n", "h").build();
444 let cloned = config.clone();
445 assert_eq!(config.group_name, cloned.group_name);
446 assert_eq!(config.node_name, cloned.node_name);
447 }
448
449 #[test]
450 fn test_config_debug() {
451 let config = RepConfig::builder("g", "n", "h").build();
452 let s = format!("{:?}", config);
453 assert!(s.contains("RepConfig"));
454 }
455}