use rand::{thread_rng, Rng};
use serde::{Deserialize, Serialize};
use crate::error::ConfigError;
pub const DEFAULT_ELECTION_TIMEOUT_MIN: u64 = 150;
pub const DEFAULT_ELECTION_TIMEOUT_MAX: u64 = 300;
pub const DEFAULT_HEARTBEAT_INTERVAL: u64 = 50;
pub const DEFAULT_LOGS_SINCE_LAST: u64 = 5000;
pub const DEFAULT_MAX_PAYLOAD_ENTRIES: u64 = 300;
pub const DEFAULT_REPLICATION_LAG_THRESHOLD: u64 = 1000;
pub const DEFAULT_SNAPSHOT_CHUNKSIZE: u64 = 1024 * 1024 * 3;
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum SnapshotPolicy {
LogsSinceLast(u64),
}
impl Default for SnapshotPolicy {
fn default() -> Self {
SnapshotPolicy::LogsSinceLast(DEFAULT_LOGS_SINCE_LAST)
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Config {
pub cluster_name: String,
pub election_timeout_min: u64,
pub election_timeout_max: u64,
pub heartbeat_interval: u64,
pub max_payload_entries: u64,
pub replication_lag_threshold: u64,
pub snapshot_policy: SnapshotPolicy,
pub snapshot_max_chunk_size: u64,
}
impl Config {
pub fn build(cluster_name: String) -> ConfigBuilder {
ConfigBuilder {
cluster_name,
election_timeout_min: None,
election_timeout_max: None,
heartbeat_interval: None,
max_payload_entries: None,
replication_lag_threshold: None,
snapshot_policy: None,
snapshot_max_chunk_size: None,
}
}
pub fn new_rand_election_timeout(&self) -> u64 {
thread_rng().gen_range(self.election_timeout_min, self.election_timeout_max)
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ConfigBuilder {
pub cluster_name: String,
pub election_timeout_min: Option<u64>,
pub election_timeout_max: Option<u64>,
pub heartbeat_interval: Option<u64>,
pub max_payload_entries: Option<u64>,
pub replication_lag_threshold: Option<u64>,
pub snapshot_policy: Option<SnapshotPolicy>,
pub snapshot_max_chunk_size: Option<u64>,
}
impl ConfigBuilder {
pub fn election_timeout_min(mut self, val: u64) -> Self {
self.election_timeout_min = Some(val);
self
}
pub fn election_timeout_max(mut self, val: u64) -> Self {
self.election_timeout_max = Some(val);
self
}
pub fn heartbeat_interval(mut self, val: u64) -> Self {
self.heartbeat_interval = Some(val);
self
}
pub fn max_payload_entries(mut self, val: u64) -> Self {
self.max_payload_entries = Some(val);
self
}
pub fn replication_lag_threshold(mut self, val: u64) -> Self {
self.replication_lag_threshold = Some(val);
self
}
pub fn snapshot_policy(mut self, val: SnapshotPolicy) -> Self {
self.snapshot_policy = Some(val);
self
}
pub fn snapshot_max_chunk_size(mut self, val: u64) -> Self {
self.snapshot_max_chunk_size = Some(val);
self
}
pub fn validate(self) -> Result<Config, ConfigError> {
let election_timeout_min = self.election_timeout_min.unwrap_or(DEFAULT_ELECTION_TIMEOUT_MIN);
let election_timeout_max = self.election_timeout_max.unwrap_or(DEFAULT_ELECTION_TIMEOUT_MAX);
if election_timeout_min >= election_timeout_max {
return Err(ConfigError::InvalidElectionTimeoutMinMax);
}
let heartbeat_interval = self.heartbeat_interval.unwrap_or(DEFAULT_HEARTBEAT_INTERVAL);
let max_payload_entries = self.max_payload_entries.unwrap_or(DEFAULT_MAX_PAYLOAD_ENTRIES);
if max_payload_entries == 0 {
return Err(ConfigError::MaxPayloadEntriesTooSmall);
}
let replication_lag_threshold = self.replication_lag_threshold.unwrap_or(DEFAULT_REPLICATION_LAG_THRESHOLD);
let snapshot_policy = self.snapshot_policy.unwrap_or_else(SnapshotPolicy::default);
let snapshot_max_chunk_size = self.snapshot_max_chunk_size.unwrap_or(DEFAULT_SNAPSHOT_CHUNKSIZE);
Ok(Config {
cluster_name: self.cluster_name,
election_timeout_min,
election_timeout_max,
heartbeat_interval,
max_payload_entries,
replication_lag_threshold,
snapshot_policy,
snapshot_max_chunk_size,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_config_defaults() {
let cfg = Config::build("cluster0".into()).validate().unwrap();
assert!(cfg.election_timeout_min >= DEFAULT_ELECTION_TIMEOUT_MIN as u64);
assert!(cfg.election_timeout_max <= DEFAULT_ELECTION_TIMEOUT_MAX as u64);
assert!(cfg.heartbeat_interval == DEFAULT_HEARTBEAT_INTERVAL as u64);
assert!(cfg.max_payload_entries == DEFAULT_MAX_PAYLOAD_ENTRIES);
assert!(cfg.replication_lag_threshold == DEFAULT_REPLICATION_LAG_THRESHOLD);
assert!(cfg.snapshot_max_chunk_size == DEFAULT_SNAPSHOT_CHUNKSIZE);
assert!(cfg.snapshot_policy == SnapshotPolicy::LogsSinceLast(DEFAULT_LOGS_SINCE_LAST));
}
#[test]
fn test_config_with_specified_values() {
let cfg = Config::build("cluster0".into())
.election_timeout_max(200)
.election_timeout_min(100)
.heartbeat_interval(10)
.max_payload_entries(100)
.replication_lag_threshold(100)
.snapshot_max_chunk_size(200)
.snapshot_policy(SnapshotPolicy::LogsSinceLast(10000))
.validate()
.unwrap();
assert!(cfg.election_timeout_min >= 100);
assert!(cfg.election_timeout_max <= 200);
assert!(cfg.heartbeat_interval == 10);
assert!(cfg.max_payload_entries == 100);
assert!(cfg.replication_lag_threshold == 100);
assert!(cfg.snapshot_max_chunk_size == 200);
assert!(cfg.snapshot_policy == SnapshotPolicy::LogsSinceLast(10000));
}
#[test]
fn test_invalid_election_timeout_config_produces_expected_error() {
let res = Config::build("cluster0".into())
.election_timeout_min(1000)
.election_timeout_max(700)
.validate();
assert!(res.is_err());
let err = res.unwrap_err();
assert_eq!(err, ConfigError::InvalidElectionTimeoutMinMax);
}
}