use super::{
ballot_leader_election::Ballot,
messages::sequence_paxos::*,
storage::{Entry, StopSign, Storage},
util::LeaderState,
};
#[cfg(feature = "logging")]
use crate::utils::logger::create_logger;
use crate::{
storage::{InternalStorage, InternalStorageConfig},
util::{AcceptedMetaData, FlexibleQuorum, NodeId, Quorum, SequenceNumber},
ClusterConfig, CompactionErr, OmniPaxosConfig, ProposeErr,
};
#[cfg(feature = "logging")]
use slog::{debug, info, trace, warn, Logger};
use std::{fmt::Debug, vec};
pub mod follower;
pub mod leader;
pub(crate) struct SequencePaxos<T, B>
where
T: Entry,
B: Storage<T>,
{
pub(crate) internal_storage: InternalStorage<B, T>,
pid: NodeId,
peers: Vec<NodeId>, state: (Role, Phase),
pending_proposals: Vec<T>,
pending_stopsign: Option<StopSign>,
outgoing: Vec<PaxosMessage<T>>,
leader_state: LeaderState<T>,
latest_accepted_meta: Option<(Ballot, usize)>,
current_seq_num: SequenceNumber,
cached_promise_message: Option<Promise<T>>,
buffer_size: usize,
#[cfg(feature = "logging")]
logger: Logger,
}
impl<T, B> SequencePaxos<T, B>
where
T: Entry,
B: Storage<T>,
{
pub(crate) fn with(config: SequencePaxosConfig, storage: B) -> Self {
let pid = config.pid;
let peers = config.peers;
let num_nodes = &peers.len() + 1;
let quorum = Quorum::with(config.flexible_quorum, num_nodes);
let max_peer_pid = peers.iter().max().unwrap();
let max_pid = *std::cmp::max(max_peer_pid, &pid) as usize;
let mut outgoing = Vec::with_capacity(config.buffer_size);
let (state, leader) = match storage
.get_promise()
.expect("storage error while trying to read promise")
{
Some(b) => {
let state = (Role::Follower, Phase::Recover);
for peer_pid in &peers {
let prepreq = PrepareReq { n: b };
outgoing.push(PaxosMessage {
from: pid,
to: *peer_pid,
msg: PaxosMsg::PrepareReq(prepreq),
});
}
(state, b)
}
None => ((Role::Follower, Phase::None), Ballot::default()),
};
let internal_storage_config = InternalStorageConfig {
batch_size: config.batch_size,
};
let mut paxos = SequencePaxos {
internal_storage: InternalStorage::with(
storage,
internal_storage_config,
#[cfg(feature = "unicache")]
pid,
),
pid,
peers,
state,
pending_proposals: vec![],
pending_stopsign: None,
outgoing,
leader_state: LeaderState::<T>::with(leader, max_pid, quorum),
latest_accepted_meta: None,
current_seq_num: SequenceNumber::default(),
cached_promise_message: None,
buffer_size: config.buffer_size,
#[cfg(feature = "logging")]
logger: {
if let Some(logger) = config.custom_logger {
logger
} else {
let s = config
.logger_file_path
.unwrap_or_else(|| format!("logs/paxos_{}.log", pid));
create_logger(s.as_str())
}
},
};
paxos
.internal_storage
.set_promise(leader)
.expect("storage error while trying to write promise");
#[cfg(feature = "logging")]
{
info!(paxos.logger, "Paxos component pid: {} created!", pid);
if let Quorum::Flexible(flex_quorum) = quorum {
if flex_quorum.read_quorum_size > num_nodes - flex_quorum.write_quorum_size + 1 {
warn!(
paxos.logger,
"Unnecessary overlaps in read and write quorums. Read and Write quorums only need to be overlapping by one node i.e., read_quorum_size + write_quorum_size = num_nodes + 1");
}
}
}
paxos
}
pub(crate) fn get_state(&self) -> &(Role, Phase) {
&self.state
}
pub(crate) fn get_promise(&self) -> Ballot {
self.internal_storage.get_promise()
}
pub(crate) fn trim(&mut self, trim_idx: Option<u64>) -> Result<(), CompactionErr> {
match self.state {
(Role::Leader, _) => {
let min_all_accepted_idx = self.leader_state.get_min_all_accepted_idx();
let trimmed_idx = match trim_idx {
Some(idx) if idx <= *min_all_accepted_idx => idx,
None => {
#[cfg(feature = "logging")]
trace!(
self.logger,
"No trim index provided, using min_las_idx: {:?}",
min_all_accepted_idx
);
*min_all_accepted_idx
}
_ => {
return Err(CompactionErr::NotAllDecided(*min_all_accepted_idx));
}
};
let result = self.internal_storage.try_trim(trimmed_idx);
if result.is_ok() {
for pid in &self.peers {
let msg = PaxosMsg::Compaction(Compaction::Trim(trimmed_idx));
self.outgoing.push(PaxosMessage {
from: self.pid,
to: *pid,
msg,
});
}
}
result.map_err(|e| {
*e.downcast()
.expect("storage error while trying to trim log")
})
}
_ => Err(CompactionErr::NotCurrentLeader(self.get_current_leader())),
}
}
pub(crate) fn snapshot(
&mut self,
idx: Option<u64>,
local_only: bool,
) -> Result<(), CompactionErr> {
let result = self.internal_storage.try_snapshot(idx);
if !local_only && result.is_ok() {
for pid in &self.peers {
let msg = PaxosMsg::Compaction(Compaction::Snapshot(idx));
self.outgoing.push(PaxosMessage {
from: self.pid,
to: *pid,
msg,
});
}
}
result.map_err(|e| {
*e.downcast()
.expect("storage error while trying to snapshot log")
})
}
pub(crate) fn get_decided_idx(&self) -> u64 {
self.internal_storage.get_decided_idx()
}
pub(crate) fn get_compacted_idx(&self) -> u64 {
self.internal_storage.get_compacted_idx()
}
fn handle_compaction(&mut self, c: Compaction) {
match c {
Compaction::Trim(idx) => {
let _ = self.internal_storage.try_trim(idx);
}
Compaction::Snapshot(idx) => {
let _ = self.snapshot(idx, true);
}
}
}
pub(crate) fn resend_message_timeout(&mut self) {
match &self.state {
(Role::Leader, Phase::Prepare) => {
let preparable_peers = self.leader_state.get_preparable_peers();
for peer in preparable_peers {
self.send_prepare(peer);
}
}
(Role::Leader, Phase::Accept) => {
if let Some(ss) = self.internal_storage.get_stopsign() {
let decided_idx = self.internal_storage.get_decided_idx();
for follower in self.leader_state.get_promised_followers() {
if self.internal_storage.stopsign_is_decided() {
self.send_decide(follower, decided_idx, true);
} else if self.leader_state.get_accepted_idx(follower)
!= self.internal_storage.get_accepted_idx()
{
self.send_accept_stopsign(follower, ss.clone(), true);
}
}
}
let preparable_peers = self.leader_state.get_preparable_peers();
for peer in preparable_peers {
self.send_prepare(peer);
}
}
(Role::Follower, Phase::Prepare) => {
match &self.cached_promise_message {
Some(promise) => {
self.outgoing.push(PaxosMessage {
from: self.pid,
to: promise.n.pid,
msg: PaxosMsg::Promise(promise.clone()),
});
}
None => {
#[cfg(feature = "logging")]
warn!(self.logger, "In Prepare phase without a cached promise!");
self.state = (Role::Follower, Phase::Recover);
self.send_preparereq_to_all_peers();
}
}
}
(Role::Follower, Phase::Recover) => {
self.send_preparereq_to_all_peers();
}
_ => (),
}
}
fn send_preparereq_to_all_peers(&mut self) {
let prepreq = PrepareReq {
n: self.get_promise(),
};
for peer in &self.peers {
self.outgoing.push(PaxosMessage {
from: self.pid,
to: *peer,
msg: PaxosMsg::PrepareReq(prepreq),
});
}
}
pub(crate) fn get_outgoing_msgs(&mut self) -> Vec<PaxosMessage<T>> {
let mut outgoing = Vec::with_capacity(self.buffer_size);
std::mem::swap(&mut self.outgoing, &mut outgoing);
#[cfg(feature = "batch_accept")]
{
self.leader_state.reset_batch_accept_meta();
}
self.latest_accepted_meta = None;
outgoing
}
pub(crate) fn handle(&mut self, m: PaxosMessage<T>) {
match m.msg {
PaxosMsg::PrepareReq(prepreq) => self.handle_preparereq(prepreq, m.from),
PaxosMsg::Prepare(prep) => self.handle_prepare(prep, m.from),
PaxosMsg::Promise(prom) => match &self.state {
(Role::Leader, Phase::Prepare) => self.handle_promise_prepare(prom, m.from),
(Role::Leader, Phase::Accept) => self.handle_promise_accept(prom, m.from),
_ => {}
},
PaxosMsg::AcceptSync(acc_sync) => self.handle_acceptsync(acc_sync, m.from),
PaxosMsg::AcceptDecide(acc) => self.handle_acceptdecide(acc),
PaxosMsg::NotAccepted(not_acc) => self.handle_notaccepted(not_acc, m.from),
PaxosMsg::Accepted(accepted) => self.handle_accepted(accepted, m.from),
PaxosMsg::Decide(d) => self.handle_decide(d),
PaxosMsg::ProposalForward(proposals) => self.handle_forwarded_proposal(proposals),
PaxosMsg::Compaction(c) => self.handle_compaction(c),
PaxosMsg::AcceptStopSign(acc_ss) => self.handle_accept_stopsign(acc_ss),
PaxosMsg::ForwardStopSign(f_ss) => self.handle_forwarded_stopsign(f_ss),
#[cfg(feature = "unicache")]
PaxosMsg::EncodedAcceptDecide(e) => {
self.handle_encoded_acceptdecide(e);
}
}
}
pub(crate) fn is_reconfigured(&self) -> Option<StopSign> {
match self.internal_storage.get_stopsign() {
Some(ss) if self.internal_storage.stopsign_is_decided() => Some(ss),
_ => None,
}
}
fn pending_reconfiguration(&self) -> bool {
self.internal_storage.get_stopsign().is_some()
}
pub(crate) fn append(&mut self, entry: T) -> Result<(), ProposeErr<T>> {
if self.pending_reconfiguration() {
Err(ProposeErr::PendingReconfigEntry(entry))
} else {
self.propose_entry(entry);
Ok(())
}
}
pub(crate) fn reconfigure(
&mut self,
new_config: ClusterConfig,
metadata: Option<Vec<u8>>,
) -> Result<(), ProposeErr<T>> {
if self.pending_reconfiguration() {
Err(ProposeErr::PendingReconfigConfig(new_config, metadata))
} else {
match self.state {
(Role::Leader, Phase::Prepare) => {
if self.pending_stopsign.is_none() {
let ss = StopSign::with(new_config, metadata);
self.pending_stopsign = Some(ss);
} else {
return Err(ProposeErr::PendingReconfigConfig(new_config, metadata));
}
}
(Role::Leader, Phase::Accept) => {
if !self.pending_reconfiguration() {
#[cfg(feature = "logging")]
info!(
self.logger,
"Propose reconfiguration {:?} with {:?}",
new_config.nodes,
self.leader_state.n_leader
);
let ss = StopSign::with(new_config, metadata);
self.accept_stopsign(ss.clone());
for pid in self.leader_state.get_promised_followers() {
self.send_accept_stopsign(pid, ss.clone(), false);
}
} else {
return Err(ProposeErr::PendingReconfigConfig(new_config, metadata));
}
}
_ => {
let ss = StopSign::with(new_config, metadata);
self.forward_stopsign(ss);
}
}
Ok(())
}
}
fn send_accept_stopsign(&mut self, to: NodeId, ss: StopSign, resend: bool) {
let seq_num = match resend {
true => self.leader_state.get_seq_num(to),
false => self.leader_state.next_seq_num(to),
};
let acc_ss = PaxosMsg::AcceptStopSign(AcceptStopSign {
seq_num,
n: self.leader_state.n_leader,
ss,
});
self.outgoing.push(PaxosMessage {
from: self.pid,
to,
msg: acc_ss,
});
}
fn accept_stopsign(&mut self, ss: StopSign) {
self.internal_storage
.set_stopsign(Some(ss))
.expect("storage error while trying to write stopsign");
if self.state.0 == Role::Leader {
let accepted_idx = self.internal_storage.get_accepted_idx();
self.leader_state.set_accepted_idx(self.pid, accepted_idx);
}
}
fn get_current_leader(&self) -> NodeId {
self.get_promise().pid
}
pub(crate) fn reconnected(&mut self, pid: NodeId) {
if pid == self.pid {
return;
} else if pid == self.get_current_leader() {
self.state = (Role::Follower, Phase::Recover);
}
let prepreq = PrepareReq {
n: self.get_promise(),
};
self.outgoing.push(PaxosMessage {
from: self.pid,
to: pid,
msg: PaxosMsg::PrepareReq(prepreq),
});
}
fn propose_entry(&mut self, entry: T) {
match self.state {
(Role::Leader, Phase::Prepare) => self.pending_proposals.push(entry),
(Role::Leader, Phase::Accept) => self.accept_entry(entry),
_ => self.forward_proposals(vec![entry]),
}
}
pub(crate) fn get_leader_state(&self) -> &LeaderState<T> {
&self.leader_state
}
pub(crate) fn forward_proposals(&mut self, mut entries: Vec<T>) {
let leader = self.get_current_leader();
if leader > 0 && self.pid != leader {
let pf = PaxosMsg::ProposalForward(entries);
let msg = PaxosMessage {
from: self.pid,
to: leader,
msg: pf,
};
self.outgoing.push(msg);
} else {
self.pending_proposals.append(&mut entries);
}
}
pub(crate) fn forward_stopsign(&mut self, ss: StopSign) {
let leader = self.get_current_leader();
if leader > 0 && self.pid != leader {
#[cfg(feature = "logging")]
trace!(self.logger, "Forwarding StopSign to Leader {:?}", leader);
let fs = PaxosMsg::ForwardStopSign(ss);
let msg = PaxosMessage {
from: self.pid,
to: leader,
msg: fs,
};
self.outgoing.push(msg);
} else if self.pending_stopsign.as_mut().is_none() {
self.pending_stopsign = Some(ss);
}
}
}
#[derive(PartialEq, Debug)]
pub(crate) enum Phase {
Prepare,
Accept,
Recover,
None,
}
#[derive(PartialEq, Debug)]
pub(crate) enum Role {
Follower,
Leader,
}
#[derive(Clone, Debug)]
pub(crate) struct SequencePaxosConfig {
pid: NodeId,
peers: Vec<NodeId>,
buffer_size: usize,
pub(crate) batch_size: usize,
flexible_quorum: Option<FlexibleQuorum>,
#[cfg(feature = "logging")]
logger_file_path: Option<String>,
#[cfg(feature = "logging")]
custom_logger: Option<Logger>,
}
impl From<OmniPaxosConfig> for SequencePaxosConfig {
fn from(config: OmniPaxosConfig) -> Self {
let pid = config.server_config.pid;
let peers = config
.cluster_config
.nodes
.into_iter()
.filter(|x| *x != pid)
.collect();
SequencePaxosConfig {
pid,
peers,
flexible_quorum: config.cluster_config.flexible_quorum,
buffer_size: config.server_config.buffer_size,
batch_size: config.server_config.batch_size,
#[cfg(feature = "logging")]
logger_file_path: config.server_config.logger_file_path,
#[cfg(feature = "logging")]
custom_logger: config.server_config.custom_logger,
}
}
}