use std::path::PathBuf;
use std::time::Duration;
use crate::commit_durability::CommitDurability;
use crate::consistency::ConsistencyPolicy;
use crate::node_type::NodeType;
use crate::quorum_policy::QuorumPolicy;
use crate::rep_node::RepNode;
use crate::stream::reconnect::ReconnectConfig;
const DEFAULT_ELECTION_TIMEOUT: Duration = Duration::from_secs(10);
const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(1);
const DEFAULT_NODE_PORT: u16 = 14_001;
const DEFAULT_ELECTION_PHASE_TIMEOUT: Duration = Duration::from_millis(500);
const DEFAULT_PHI_WINDOW_SIZE: usize = 200;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum RepTransportKind {
Tcp,
Tls,
Quic,
InMemory,
}
impl Default for RepTransportKind {
fn default() -> Self {
Self::Tcp
}
}
#[derive(Debug, Clone)]
pub struct RepConfig {
pub group_name: String,
pub node_name: String,
pub node_host: String,
pub node_port: u16,
pub node_type: NodeType,
pub election_timeout: Duration,
pub heartbeat_interval: Duration,
pub consistency_policy: ConsistencyPolicy,
pub commit_durability: CommitDurability,
pub env_home: Option<PathBuf>,
pub quorum_policy: QuorumPolicy,
pub phi_threshold: Option<f64>,
pub phi_window_size: usize,
pub initial_peers: Vec<RepNode>,
pub election_phase_timeout: Duration,
pub reconnect_config: ReconnectConfig,
pub transport_kind: RepTransportKind,
pub peer_allowlist: Vec<String>,
pub tls_config: Option<crate::tls::TlsConfig>,
pub cascade_feeding: bool,
}
impl RepConfig {
pub fn builder(
group_name: &str,
node_name: &str,
node_host: &str,
) -> RepConfigBuilder {
RepConfigBuilder {
group_name: group_name.to_string(),
node_name: node_name.to_string(),
node_host: node_host.to_string(),
node_port: DEFAULT_NODE_PORT,
node_type: NodeType::Electable,
election_timeout: DEFAULT_ELECTION_TIMEOUT,
heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL,
consistency_policy: ConsistencyPolicy::default(),
commit_durability: CommitDurability::default(),
env_home: None,
quorum_policy: QuorumPolicy::SimpleMajority,
phi_threshold: None,
phi_window_size: DEFAULT_PHI_WINDOW_SIZE,
initial_peers: Vec::new(),
election_phase_timeout: DEFAULT_ELECTION_PHASE_TIMEOUT,
reconnect_config: ReconnectConfig::default(),
transport_kind: RepTransportKind::default(),
peer_allowlist: Vec::new(),
tls_config: None,
cascade_feeding: false,
}
}
pub fn new(
group_name: impl Into<String>,
node_name: impl Into<String>,
node_host: impl Into<String>,
node_port: u16,
) -> RepConfig {
let g = group_name.into();
let n = node_name.into();
let h = node_host.into();
RepConfig::builder(&g, &n, &h).node_port(node_port).build()
}
pub fn socket_address(&self) -> String {
format!("{}:{}", self.node_host, self.node_port)
}
}
#[derive(Debug, Clone)]
pub struct RepConfigBuilder {
group_name: String,
node_name: String,
node_host: String,
node_port: u16,
node_type: NodeType,
election_timeout: Duration,
heartbeat_interval: Duration,
consistency_policy: ConsistencyPolicy,
commit_durability: CommitDurability,
env_home: Option<PathBuf>,
quorum_policy: QuorumPolicy,
phi_threshold: Option<f64>,
phi_window_size: usize,
initial_peers: Vec<RepNode>,
election_phase_timeout: Duration,
reconnect_config: ReconnectConfig,
transport_kind: RepTransportKind,
peer_allowlist: Vec<String>,
tls_config: Option<crate::tls::TlsConfig>,
cascade_feeding: bool,
}
impl RepConfigBuilder {
pub fn node_port(mut self, port: u16) -> Self {
self.node_port = port;
self
}
pub fn node_type(mut self, node_type: NodeType) -> Self {
self.node_type = node_type;
self
}
pub fn election_timeout(mut self, timeout: Duration) -> Self {
self.election_timeout = timeout;
self
}
pub fn heartbeat_interval(mut self, interval: Duration) -> Self {
self.heartbeat_interval = interval;
self
}
pub fn consistency_policy(mut self, policy: ConsistencyPolicy) -> Self {
self.consistency_policy = policy;
self
}
pub fn commit_durability(mut self, durability: CommitDurability) -> Self {
self.commit_durability = durability;
self
}
pub fn env_home(mut self, path: impl Into<PathBuf>) -> Self {
self.env_home = Some(path.into());
self
}
pub fn cascade_feeding(mut self, enabled: bool) -> Self {
self.cascade_feeding = enabled;
self
}
pub fn quorum_policy(mut self, policy: QuorumPolicy) -> Self {
self.quorum_policy = policy;
self
}
pub fn phi_threshold(mut self, threshold: Option<f64>) -> Self {
self.phi_threshold = threshold;
self
}
pub fn phi_window_size(mut self, size: usize) -> Self {
self.phi_window_size = size;
self
}
pub fn add_initial_peer(mut self, node: RepNode) -> Self {
self.initial_peers.push(node);
self
}
pub fn election_phase_timeout(mut self, timeout: Duration) -> Self {
self.election_phase_timeout = timeout;
self
}
pub fn reconnect_config(mut self, config: ReconnectConfig) -> Self {
self.reconnect_config = config;
self
}
pub fn transport_kind(mut self, kind: RepTransportKind) -> Self {
self.transport_kind = kind;
self
}
pub fn peer_allowlist(mut self, names: Vec<String>) -> Self {
self.peer_allowlist = names;
self
}
pub fn tls_config(mut self, tls: crate::tls::TlsConfig) -> Self {
self.tls_config = Some(tls);
self
}
pub fn build(self) -> RepConfig {
RepConfig {
group_name: self.group_name,
node_name: self.node_name,
node_host: self.node_host,
node_port: self.node_port,
node_type: self.node_type,
election_timeout: self.election_timeout,
heartbeat_interval: self.heartbeat_interval,
consistency_policy: self.consistency_policy,
commit_durability: self.commit_durability,
env_home: self.env_home,
quorum_policy: self.quorum_policy,
phi_threshold: self.phi_threshold,
phi_window_size: self.phi_window_size,
initial_peers: self.initial_peers,
election_phase_timeout: self.election_phase_timeout,
reconnect_config: self.reconnect_config,
transport_kind: self.transport_kind,
peer_allowlist: self.peer_allowlist,
tls_config: self.tls_config,
cascade_feeding: self.cascade_feeding,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::commit_durability::ReplicaAckPolicy;
#[test]
fn test_builder_defaults() {
let config = RepConfig::builder("group1", "node1", "localhost").build();
assert_eq!(config.group_name, "group1");
assert_eq!(config.node_name, "node1");
assert_eq!(config.node_host, "localhost");
assert_eq!(config.node_port, DEFAULT_NODE_PORT);
assert_eq!(config.node_type, NodeType::Electable);
assert_eq!(config.election_timeout, DEFAULT_ELECTION_TIMEOUT);
assert_eq!(config.heartbeat_interval, DEFAULT_HEARTBEAT_INTERVAL);
assert_eq!(config.consistency_policy, ConsistencyPolicy::NoConsistency);
}
#[test]
fn test_default_port_is_unprivileged() {
let config = RepConfig::builder("g", "n", "h").build();
assert_eq!(config.node_port, 14_001);
}
#[test]
fn test_new_constructor_matches_builder() {
let a = RepConfig::new("g", "n", "h", 6000);
let b = RepConfig::builder("g", "n", "h").node_port(6000).build();
assert_eq!(a.group_name, b.group_name);
assert_eq!(a.node_name, b.node_name);
assert_eq!(a.node_host, b.node_host);
assert_eq!(a.node_port, b.node_port);
assert_eq!(a.node_type, b.node_type);
}
#[test]
fn test_builder_custom_port() {
let config = RepConfig::builder("g", "n", "h").node_port(6000).build();
assert_eq!(config.node_port, 6000);
}
#[test]
fn test_builder_node_type() {
let config = RepConfig::builder("g", "n", "h")
.node_type(NodeType::Secondary)
.build();
assert_eq!(config.node_type, NodeType::Secondary);
}
#[test]
fn test_builder_timeouts() {
let config = RepConfig::builder("g", "n", "h")
.election_timeout(Duration::from_secs(20))
.heartbeat_interval(Duration::from_millis(500))
.build();
assert_eq!(config.election_timeout, Duration::from_secs(20));
assert_eq!(config.heartbeat_interval, Duration::from_millis(500));
}
#[test]
fn test_builder_consistency_policy() {
let policy = ConsistencyPolicy::TimeConsistency {
max_lag: Duration::from_millis(500),
timeout: Duration::from_secs(10),
};
let config = RepConfig::builder("g", "n", "h")
.consistency_policy(policy.clone())
.build();
assert_eq!(config.consistency_policy, policy);
}
#[test]
fn test_builder_commit_durability() {
let durability = CommitDurability::new(
ReplicaAckPolicy::All,
Duration::from_secs(15),
);
let config = RepConfig::builder("g", "n", "h")
.commit_durability(durability)
.build();
assert_eq!(config.commit_durability.ack_policy, ReplicaAckPolicy::All);
assert_eq!(
config.commit_durability.ack_timeout,
Duration::from_secs(15)
);
}
#[test]
fn test_socket_address() {
let config =
RepConfig::builder("g", "n", "192.168.1.1").node_port(7000).build();
assert_eq!(config.socket_address(), "192.168.1.1:7000");
}
#[test]
fn test_builder_chaining() {
let config = RepConfig::builder("mygroup", "node1", "10.0.0.1")
.node_port(5555)
.node_type(NodeType::Arbiter)
.election_timeout(Duration::from_secs(30))
.build();
assert_eq!(config.group_name, "mygroup");
assert_eq!(config.node_name, "node1");
assert_eq!(config.node_host, "10.0.0.1");
assert_eq!(config.node_port, 5555);
assert_eq!(config.node_type, NodeType::Arbiter);
assert_eq!(config.election_timeout, Duration::from_secs(30));
}
#[test]
fn test_config_clone() {
let config = RepConfig::builder("g", "n", "h").build();
let cloned = config.clone();
assert_eq!(config.group_name, cloned.group_name);
assert_eq!(config.node_name, cloned.node_name);
}
#[test]
fn test_config_debug() {
let config = RepConfig::builder("g", "n", "h").build();
let s = format!("{:?}", config);
assert!(s.contains("RepConfig"));
}
}