use std::ops::Deref;
use std::time::Duration;
use backoff_series::BackoffSeries;
#[cfg(feature = "clap")]
use clap::Parser;
use openraft_macros::since;
use rand::RngExt;
use crate::AsyncRuntime;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::config::StepDownPolicy;
use crate::config::error::ConfigError;
#[cfg(feature = "clap")]
use crate::config::parser::parse_bytes_with_unit;
#[cfg(feature = "clap")]
use crate::config::parser::parse_snapshot_policy;
#[cfg(feature = "clap")]
use crate::config::parser::parse_step_down_policy;
use crate::network::Backoff;
use crate::raft_state::LogStateReader;
use crate::vote::RaftCommittedLeaderId;
pub(crate) struct Defaults {
pub cluster_name: &'static str,
pub election_timeout_min: u64,
pub election_timeout_max: u64,
pub heartbeat_interval: u64,
pub install_snapshot_timeout: u64,
pub send_snapshot_timeout: u64,
pub max_payload_entries: u64,
pub max_append_entries: u64,
pub replication_lag_threshold: u64,
pub snapshot_policy: SnapshotPolicy,
pub snapshot_max_chunk_size: u64,
pub max_in_snapshot_log_to_keep: u64,
pub purge_batch_size: u64,
pub api_channel_size: u64,
pub api_batch_capacity: u64,
pub api_batch_linger_ms: u64,
pub notification_channel_size: u64,
pub state_machine_channel_size: u64,
pub backoff: &'static str,
pub enable_tick: bool,
pub enable_heartbeat: bool,
pub enable_elect: bool,
pub removed_leader_step_down: StepDownPolicy,
pub enable_pre_vote: Option<bool>,
}
pub(crate) const DEFAULTS: Defaults = Defaults {
cluster_name: "foo",
election_timeout_min: 150,
election_timeout_max: 300,
heartbeat_interval: 50,
install_snapshot_timeout: 200,
send_snapshot_timeout: 0,
max_payload_entries: 300,
max_append_entries: 4096,
replication_lag_threshold: 5000,
snapshot_policy: SnapshotPolicy::LogsSinceLast(5000),
snapshot_max_chunk_size: 3 * 1024 * 1024,
max_in_snapshot_log_to_keep: 1000,
purge_batch_size: 1,
api_channel_size: 65536,
api_batch_capacity: 4096,
api_batch_linger_ms: 0,
notification_channel_size: 65536,
state_machine_channel_size: 1024,
backoff: "200ms",
enable_tick: true,
enable_heartbeat: true,
enable_elect: true,
removed_leader_step_down: StepDownPolicy::After(150),
enable_pre_vote: None,
};
#[cfg(feature = "serde")]
fn default_removed_leader_step_down() -> StepDownPolicy {
DEFAULTS.removed_leader_step_down.clone()
}
#[derive(Clone, Debug)]
#[derive(PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub enum SnapshotPolicy {
LogsSinceLast(u64),
Never,
}
impl SnapshotPolicy {
pub(crate) fn should_snapshot<CLID>(
&self,
state: &impl Deref<Target = impl LogStateReader<CLID>>,
last_tried_at: Option<&LogId<CLID>>,
) -> Option<LogId<CLID>>
where
CLID: RaftCommittedLeaderId,
{
match self {
SnapshotPolicy::LogsSinceLast(threshold) => {
let committed_next = state.committed().next_index();
let base_log_id = last_tried_at.max(state.snapshot_last_log_id());
if committed_next >= base_log_id.next_index() + threshold {
state.committed().cloned()
} else {
None
}
}
SnapshotPolicy::Never => None,
}
}
}
#[since]
#[derive(Clone, Debug)]
#[cfg_attr(feature = "clap", derive(Parser))]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub struct Config {
#[cfg_attr(feature = "clap", clap(long, default_value = DEFAULTS.cluster_name))]
pub cluster_name: String,
#[cfg_attr(feature = "clap", clap(long, default_value_t = DEFAULTS.election_timeout_min))]
pub election_timeout_min: u64,
#[cfg_attr(feature = "clap", clap(long, default_value_t = DEFAULTS.election_timeout_max))]
pub election_timeout_max: u64,
#[cfg_attr(feature = "clap", clap(long, default_value_t = DEFAULTS.heartbeat_interval))]
pub heartbeat_interval: u64,
#[cfg_attr(feature = "clap", clap(long, default_value_t = DEFAULTS.install_snapshot_timeout))]
pub install_snapshot_timeout: u64,
#[deprecated(
since = "0.9.0",
note = "Sending snapshot by chunks is deprecated; Use `install_snapshot_timeout` instead"
)]
#[cfg_attr(feature = "clap", clap(long, default_value_t = DEFAULTS.send_snapshot_timeout))]
pub send_snapshot_timeout: u64,
#[cfg_attr(feature = "clap", clap(long, default_value_t = DEFAULTS.max_payload_entries))]
pub max_payload_entries: u64,
#[since(version = "0.10.0")]
#[cfg_attr(feature = "clap", clap(long, default_value = "4096"))]
pub max_append_entries: Option<u64>,
#[cfg_attr(feature = "clap", clap(long, default_value_t = DEFAULTS.replication_lag_threshold))]
pub replication_lag_threshold: u64,
#[cfg_attr(feature = "clap", clap(
long,
default_value = "since_last:5000",
value_parser=parse_snapshot_policy
))]
pub snapshot_policy: SnapshotPolicy,
#[cfg_attr(feature = "clap", clap(long, default_value = "3MiB", value_parser=parse_bytes_with_unit))]
pub snapshot_max_chunk_size: u64,
#[cfg_attr(feature = "clap", clap(long, default_value_t = DEFAULTS.max_in_snapshot_log_to_keep))]
pub max_in_snapshot_log_to_keep: u64,
#[cfg_attr(feature = "clap", clap(long, default_value_t = DEFAULTS.purge_batch_size))]
pub purge_batch_size: u64,
#[since(version = "0.10.0")]
#[cfg_attr(feature = "clap", clap(long, default_value = "65536"))]
pub api_channel_size: Option<u64>,
#[since(version = "0.10.0")]
#[cfg_attr(feature = "clap", clap(long, default_value = "4096"))]
pub api_batch_capacity: u64,
#[since(version = "0.10.0")]
#[cfg_attr(feature = "clap", clap(long, default_value = "0"))]
pub api_batch_linger_ms: u64,
#[since(version = "0.10.0")]
#[cfg_attr(feature = "clap", clap(long, default_value = "65536"))]
pub notification_channel_size: Option<u64>,
#[since(version = "0.10.0")]
#[cfg_attr(feature = "clap", clap(long, default_value = "1024"))]
pub state_machine_channel_size: Option<u64>,
#[since(version = "0.10.0")]
#[cfg_attr(feature = "clap", clap(long))]
pub log_stage_capacity: Option<u64>,
#[cfg_attr(feature = "clap", clap(long,
default_value_t = true,
action = clap::ArgAction::Set,
num_args = 0..=1,
default_missing_value = "true"
))]
pub enable_tick: bool,
#[cfg_attr(feature = "clap", clap(long,
default_value_t = true,
action = clap::ArgAction::Set,
num_args = 0..=1,
default_missing_value = "true"
))]
pub enable_heartbeat: bool,
#[cfg_attr(feature = "clap", clap(long,
default_value_t = true,
action = clap::ArgAction::Set,
num_args = 0..=1,
default_missing_value = "true"
))]
pub enable_elect: bool,
#[since(version = "0.10.0")]
#[cfg_attr(feature = "clap", clap(long, default_value = "150", value_parser = parse_step_down_policy))]
#[cfg_attr(feature = "serde", serde(default = "default_removed_leader_step_down"))]
pub removed_leader_step_down: StepDownPolicy,
#[since(version = "0.10.0")]
#[cfg_attr(feature = "clap", clap(long,
action = clap::ArgAction::Set,
num_args = 0..=1,
default_missing_value = "true"
))]
pub enable_pre_vote: Option<bool>,
#[since(version = "0.10.0")]
#[cfg_attr(feature = "clap", clap(long, default_value = DEFAULTS.backoff))]
pub backoff: String,
#[since(version = "0.10.0")]
#[cfg_attr(feature = "clap", clap(long,
action = clap::ArgAction::Set,
num_args = 0..=1,
default_missing_value = "true"
))]
pub allow_log_reversion: Option<bool>,
#[since(version = "0.10.0")]
#[cfg_attr(feature = "clap", clap(long,
action = clap::ArgAction::Set,
num_args = 0..=1,
default_missing_value = "true"
))]
pub enable_leader_restore: Option<bool>,
}
impl Default for Config {
#[allow(deprecated)]
fn default() -> Self {
Self {
cluster_name: DEFAULTS.cluster_name.to_string(),
election_timeout_min: DEFAULTS.election_timeout_min,
election_timeout_max: DEFAULTS.election_timeout_max,
heartbeat_interval: DEFAULTS.heartbeat_interval,
install_snapshot_timeout: DEFAULTS.install_snapshot_timeout,
send_snapshot_timeout: DEFAULTS.send_snapshot_timeout,
max_payload_entries: DEFAULTS.max_payload_entries,
max_append_entries: Some(DEFAULTS.max_append_entries),
replication_lag_threshold: DEFAULTS.replication_lag_threshold,
snapshot_policy: DEFAULTS.snapshot_policy.clone(),
snapshot_max_chunk_size: DEFAULTS.snapshot_max_chunk_size,
max_in_snapshot_log_to_keep: DEFAULTS.max_in_snapshot_log_to_keep,
purge_batch_size: DEFAULTS.purge_batch_size,
api_channel_size: Some(DEFAULTS.api_channel_size),
api_batch_capacity: DEFAULTS.api_batch_capacity,
api_batch_linger_ms: DEFAULTS.api_batch_linger_ms,
notification_channel_size: Some(DEFAULTS.notification_channel_size),
state_machine_channel_size: Some(DEFAULTS.state_machine_channel_size),
log_stage_capacity: None,
enable_tick: DEFAULTS.enable_tick,
enable_heartbeat: DEFAULTS.enable_heartbeat,
enable_elect: DEFAULTS.enable_elect,
removed_leader_step_down: DEFAULTS.removed_leader_step_down.clone(),
enable_pre_vote: DEFAULTS.enable_pre_vote,
backoff: DEFAULTS.backoff.to_string(),
allow_log_reversion: None,
enable_leader_restore: None,
}
}
}
impl Config {
pub fn new_rand_election_timeout<RT: AsyncRuntime>(&self) -> u64 {
RT::thread_rng().random_range(self.election_timeout_min..self.election_timeout_max)
}
pub fn install_snapshot_timeout(&self) -> Duration {
Duration::from_millis(self.install_snapshot_timeout)
}
#[deprecated(
since = "0.9.0",
note = "Sending snapshot by chunks is deprecated; Use `install_snapshot_timeout()` instead"
)]
pub fn send_snapshot_timeout(&self) -> Duration {
#[allow(deprecated)]
if self.send_snapshot_timeout > 0 {
Duration::from_millis(self.send_snapshot_timeout)
} else {
self.install_snapshot_timeout()
}
}
#[since(version = "0.10.0")]
pub(crate) fn get_allow_log_reversion(&self) -> bool {
self.allow_log_reversion.unwrap_or(false)
}
#[since(version = "0.10.0")]
pub(crate) fn enable_leader_restore(&self) -> bool {
self.enable_leader_restore.unwrap_or(true)
}
pub(crate) fn get_enable_pre_vote(&self) -> bool {
self.enable_pre_vote.unwrap_or(false)
}
pub(crate) fn api_channel_size(&self) -> usize {
self.api_channel_size.unwrap_or(65536) as usize
}
pub(crate) fn notification_channel_size(&self) -> usize {
self.notification_channel_size.unwrap_or(65536) as usize
}
pub(crate) fn state_machine_channel_size(&self) -> usize {
self.state_machine_channel_size.unwrap_or(1024) as usize
}
#[allow(dead_code)]
pub(crate) fn log_stage_capacity(&self) -> usize {
self.log_stage_capacity.unwrap_or(1024) as usize
}
pub(crate) fn max_append_entries(&self) -> u64 {
self.max_append_entries.unwrap_or(4096)
}
pub fn validate(self) -> Result<Config, ConfigError> {
if self.election_timeout_min >= self.election_timeout_max {
return Err(ConfigError::ElectionTimeout {
min: self.election_timeout_min,
max: self.election_timeout_max,
});
}
if self.election_timeout_min <= self.heartbeat_interval {
return Err(ConfigError::ElectionTimeoutLTHeartBeat {
election_timeout_min: self.election_timeout_min,
heartbeat_interval: self.heartbeat_interval,
});
}
if self.max_payload_entries == 0 {
return Err(ConfigError::MaxPayloadIs0);
}
BackoffSeries::parse(&self.backoff)?;
Ok(self)
}
#[since(version = "0.10.0")]
pub fn build_backoff(&self) -> Backoff {
let series = BackoffSeries::parse(&self.backoff).expect("backoff policy must be validated by Config::validate");
Backoff::new(series.build())
}
}