use crate::{
ballot_leader_election::{Ballot, BallotLeaderElection},
errors::{valid_config, ConfigError},
messages::Message,
sequence_paxos::SequencePaxos,
storage::{Entry, StopSign, Storage},
util::{
defaults::{BUFFER_SIZE, ELECTION_TIMEOUT, RESEND_MESSAGE_TIMEOUT},
ConfigurationId, FlexibleQuorum, LogEntry, LogicalClock, NodeId,
},
utils::{ui, ui::ClusterState},
};
#[cfg(any(feature = "toml_config", feature = "serde"))]
use serde::Deserialize;
#[cfg(feature = "serde")]
use serde::Serialize;
#[cfg(feature = "toml_config")]
use std::fs;
use std::{
error::Error,
fmt::{Debug, Display},
ops::RangeBounds,
};
#[cfg(feature = "toml_config")]
use toml;
#[allow(missing_docs)]
#[derive(Clone, Debug, Default)]
#[cfg_attr(feature = "toml_config", derive(Deserialize), serde(default))]
pub struct OmniPaxosConfig {
pub cluster_config: ClusterConfig,
pub server_config: ServerConfig,
}
impl OmniPaxosConfig {
pub fn validate(&self) -> Result<(), ConfigError> {
self.cluster_config.validate()?;
self.server_config.validate()?;
valid_config!(
self.cluster_config.nodes.contains(&self.server_config.pid),
"Nodes must include own server pid"
);
Ok(())
}
#[cfg(feature = "toml_config")]
pub fn with_toml(file_path: &str) -> Result<Self, ConfigError> {
let config_file = fs::read_to_string(file_path)?;
let config: OmniPaxosConfig = toml::from_str(&config_file)?;
config.validate()?;
Ok(config)
}
pub fn build<T, B>(self, storage: B) -> Result<OmniPaxos<T, B>, ConfigError>
where
T: Entry,
B: Storage<T>,
{
self.validate()?;
let recovered_leader = storage
.get_promise()
.expect("storage error while trying to read promise");
Ok(OmniPaxos {
ble: BallotLeaderElection::with(self.clone().into(), recovered_leader),
election_clock: LogicalClock::with(self.server_config.election_tick_timeout),
resend_message_clock: LogicalClock::with(
self.server_config.resend_message_tick_timeout,
),
seq_paxos: SequencePaxos::with(self.into(), storage),
})
}
}
#[derive(Clone, Debug, PartialEq, Default)]
#[cfg_attr(any(feature = "serde", feature = "toml_config"), derive(Deserialize))]
#[cfg_attr(feature = "toml_config", serde(default))]
#[cfg_attr(feature = "serde", derive(Serialize))]
pub struct ClusterConfig {
pub configuration_id: ConfigurationId,
pub nodes: Vec<NodeId>,
pub flexible_quorum: Option<FlexibleQuorum>,
}
impl ClusterConfig {
pub fn validate(&self) -> Result<(), ConfigError> {
let num_nodes = self.nodes.len();
valid_config!(num_nodes > 1, "Need more than 1 node");
valid_config!(self.configuration_id != 0, "Configuration ID cannot be 0");
if let Some(FlexibleQuorum {
read_quorum_size,
write_quorum_size,
}) = self.flexible_quorum
{
valid_config!(
read_quorum_size + write_quorum_size > num_nodes,
"The quorums must overlap i.e., the sum of their sizes must exceed the # of nodes"
);
valid_config!(
read_quorum_size >= 2 && read_quorum_size <= num_nodes,
"Read quorum must be in range 2 to # of nodes in the cluster"
);
valid_config!(
write_quorum_size >= 2 && write_quorum_size <= num_nodes,
"Write quorum must be in range 2 to # of nodes in the cluster"
);
valid_config!(
read_quorum_size >= write_quorum_size,
"Read quorum size must be >= the write quorum size."
);
}
Ok(())
}
pub fn build_for_server<T, B>(
self,
server_config: ServerConfig,
with_storage: B,
) -> Result<OmniPaxos<T, B>, ConfigError>
where
T: Entry,
B: Storage<T>,
{
let op_config = OmniPaxosConfig {
cluster_config: self,
server_config,
};
op_config.build(with_storage)
}
}
#[derive(Clone, Debug)]
#[cfg_attr(feature = "toml_config", derive(Deserialize), serde(default))]
pub struct ServerConfig {
pub pid: NodeId,
pub election_tick_timeout: u64,
pub resend_message_tick_timeout: u64,
pub buffer_size: usize,
pub batch_size: usize,
pub leader_priority: u32,
#[cfg(feature = "logging")]
pub logger_file_path: Option<String>,
#[cfg(feature = "logging")]
#[cfg_attr(feature = "toml_config", serde(skip_deserializing))]
pub custom_logger: Option<slog::Logger>,
}
impl ServerConfig {
pub fn validate(&self) -> Result<(), ConfigError> {
valid_config!(self.pid != 0, "Server pid cannot be 0");
valid_config!(self.buffer_size != 0, "Buffer size must be greater than 0");
valid_config!(self.batch_size != 0, "Batch size must be greater than 0");
valid_config!(
self.election_tick_timeout != 0,
"Election tick timeout must be greater than 0"
);
valid_config!(
self.resend_message_tick_timeout != 0,
"Resend message tick timeout must be greater than 0"
);
Ok(())
}
}
impl Default for ServerConfig {
fn default() -> Self {
Self {
pid: 0,
election_tick_timeout: ELECTION_TIMEOUT,
resend_message_tick_timeout: RESEND_MESSAGE_TIMEOUT,
buffer_size: BUFFER_SIZE,
batch_size: 1,
leader_priority: 0,
#[cfg(feature = "logging")]
logger_file_path: None,
#[cfg(feature = "logging")]
custom_logger: None,
}
}
}
pub struct OmniPaxos<T, B>
where
T: Entry,
B: Storage<T>,
{
seq_paxos: SequencePaxos<T, B>,
ble: BallotLeaderElection,
election_clock: LogicalClock,
resend_message_clock: LogicalClock,
}
impl<T, B> OmniPaxos<T, B>
where
T: Entry,
B: Storage<T>,
{
pub fn trim(&mut self, trim_index: Option<u64>) -> Result<(), CompactionErr> {
self.seq_paxos.trim(trim_index)
}
pub fn snapshot(
&mut self,
compact_idx: Option<u64>,
local_only: bool,
) -> Result<(), CompactionErr> {
self.seq_paxos.snapshot(compact_idx, local_only)
}
pub fn get_decided_idx(&self) -> u64 {
self.seq_paxos.get_decided_idx()
}
pub fn get_compacted_idx(&self) -> u64 {
self.seq_paxos.get_compacted_idx()
}
pub fn get_current_leader(&self) -> Option<NodeId> {
let promised_pid = self.seq_paxos.get_promise().pid;
if promised_pid == 0 {
None
} else {
Some(promised_pid)
}
}
pub fn get_promise(&self) -> Ballot {
self.seq_paxos.get_promise()
}
pub fn outgoing_messages(&mut self) -> Vec<Message<T>> {
let paxos_msgs = self
.seq_paxos
.get_outgoing_msgs()
.into_iter()
.map(|p| Message::SequencePaxos(p));
let ble_msgs = self
.ble
.get_outgoing_msgs()
.into_iter()
.map(|b| Message::BLE(b));
ble_msgs.chain(paxos_msgs).collect()
}
pub fn read(&self, idx: u64) -> Option<LogEntry<T>> {
match self
.seq_paxos
.internal_storage
.read(idx..idx + 1)
.expect("storage error while trying to read log entries")
{
Some(mut v) => v.pop(),
None => None,
}
}
pub fn read_entries<R>(&self, r: R) -> Option<Vec<LogEntry<T>>>
where
R: RangeBounds<u64>,
{
self.seq_paxos
.internal_storage
.read(r)
.expect("storage error while trying to read log entries")
}
pub fn read_decided_suffix(&self, from_idx: u64) -> Option<Vec<LogEntry<T>>> {
self.seq_paxos
.internal_storage
.read_decided_suffix(from_idx)
.expect("storage error while trying to read decided log suffix")
}
pub fn handle_incoming(&mut self, m: Message<T>) {
match m {
Message::SequencePaxos(p) => self.seq_paxos.handle(p),
Message::BLE(b) => self.ble.handle(b),
}
}
pub fn is_reconfigured(&self) -> Option<StopSign> {
self.seq_paxos.is_reconfigured()
}
pub fn append(&mut self, entry: T) -> Result<(), ProposeErr<T>> {
self.seq_paxos.append(entry)
}
pub fn reconfigure(
&mut self,
new_configuration: ClusterConfig,
metadata: Option<Vec<u8>>,
) -> Result<(), ProposeErr<T>> {
if let Err(config_error) = new_configuration.validate() {
return Err(ProposeErr::ConfigError(
config_error,
new_configuration,
metadata,
));
}
self.seq_paxos.reconfigure(new_configuration, metadata)
}
pub fn reconnected(&mut self, pid: NodeId) {
self.seq_paxos.reconnected(pid)
}
pub fn tick(&mut self) {
if self.election_clock.tick_and_check_timeout() {
self.election_timeout();
}
if self.resend_message_clock.tick_and_check_timeout() {
self.seq_paxos.resend_message_timeout();
}
}
pub fn set_priority(&mut self, p: u32) {
self.ble.set_priority(p)
}
fn election_timeout(&mut self) {
if let Some(new_leader) = self
.ble
.hb_timeout(self.seq_paxos.get_state(), self.seq_paxos.get_promise())
{
self.seq_paxos.handle_leader(new_leader);
}
}
pub fn get_ui_states(&self) -> ui::OmniPaxosStates {
let mut cluster_state = ClusterState::from(self.seq_paxos.get_leader_state());
cluster_state.heartbeats = self.ble.get_ballots();
ui::OmniPaxosStates {
current_ballot: self.ble.get_current_ballot(),
current_leader: self.get_current_leader(),
decided_idx: self.get_decided_idx(),
heartbeats: self.ble.get_ballots(),
cluster_state,
}
}
}
#[derive(Debug)]
pub enum ProposeErr<T>
where
T: Entry,
{
PendingReconfigEntry(T),
PendingReconfigConfig(ClusterConfig, Option<Vec<u8>>),
ConfigError(ConfigError, ClusterConfig, Option<Vec<u8>>),
}
#[derive(Copy, Clone, Debug)]
pub enum CompactionErr {
UndecidedIndex(u64),
TrimmedIndex(u64),
NotAllDecided(u64),
NotCurrentLeader(NodeId),
}
impl Error for CompactionErr {}
impl Display for CompactionErr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Debug::fmt(self, f)
}
}