use crate::{CfgLE, CfgLEExt, Message, Opinion, StateLE, StateLEExt};
use im::OrdSet as ArcOrdSet;
use itertools::Itertools;
use pergola::{DefTraits, LatticeDef};
use std::fmt::Debug;
use std::hash::Hash;
use tracing::trace;
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub enum ProposeStage {
Init, Send, Recv, Pick, Fini, }
#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd)]
pub struct Participant<ObjLD: LatticeDef, Peer: DefTraits> {
pub id: Peer,
pub(crate) sequence: u64,
pub(crate) new_opinion: Opinion<ObjLD, Peer>,
pub(crate) propose_stage: ProposeStage,
pub(crate) lower_bound: Option<StateLE<ObjLD, Peer>>,
pub final_state: Option<StateLE<ObjLD, Peer>>,
pub(crate) old_opinion: Opinion<ObjLD, Peer>,
pub(crate) all_possible_cfgs: ArcOrdSet<CfgLE<Peer>>,
pub(crate) sequence_responses: ArcOrdSet<Peer>,
#[cfg(test)]
pub(crate) proposed_history: Vec<StateLE<ObjLD, Peer>>,
#[cfg(test)]
pub(crate) learned_history: Vec<StateLE<ObjLD, Peer>>,
}
impl<ObjLD: LatticeDef + 'static, Peer: DefTraits + 'static> Participant<ObjLD, Peer> {
#[cfg(test)]
pub fn new(id: Peer) -> Self {
Participant {
id: id,
sequence: 0,
new_opinion: Opinion::default(),
propose_stage: ProposeStage::Init,
lower_bound: None,
final_state: None,
old_opinion: Opinion::default(),
all_possible_cfgs: ArcOrdSet::new(),
sequence_responses: ArcOrdSet::new(),
proposed_history: Vec::new(),
learned_history: Vec::new(),
}
}
#[cfg(not(test))]
pub fn new(id: Peer) -> Self {
Participant {
id: id,
sequence: 0,
new_opinion: Opinion::default(),
propose_stage: ProposeStage::Init,
lower_bound: None,
final_state: None,
old_opinion: Opinion::default(),
all_possible_cfgs: ArcOrdSet::new(),
sequence_responses: ArcOrdSet::new(),
}
}
fn update_state(&mut self, new_opinion: &Opinion<ObjLD, Peer>) {
self.new_opinion.estimated_commit =
&self.new_opinion.estimated_commit + &new_opinion.estimated_commit;
self.new_opinion.candidate_object =
&self.new_opinion.candidate_object + &new_opinion.candidate_object;
self.new_opinion.proposed_configs = self
.new_opinion
.proposed_configs
.clone()
.union(new_opinion.proposed_configs.clone());
let commit_cfg: &CfgLE<Peer> = self.new_opinion.estimated_commit.config();
self.new_opinion.proposed_configs = self
.new_opinion
.proposed_configs
.iter()
.filter(|u| *u >= commit_cfg)
.cloned()
.collect();
}
fn receive(&mut self, m: &Message<ObjLD, Peer>, outgoing: &mut Vec<Message<ObjLD, Peer>>) {
match m {
Message::Request {
seq, from, opinion, ..
} => {
self.update_state(opinion);
let resp = Message::Response {
seq: *seq,
from: self.id.clone(),
to: from.clone(),
opinion: self.new_opinion.clone(),
};
outgoing.push(resp);
}
Message::Response {
seq, from, opinion, ..
} => {
self.update_state(opinion);
if *seq == self.sequence {
self.sequence_responses.insert(from.clone());
}
}
Message::Commit { state, .. } => {
let mut committed_opinion = Opinion::default();
committed_opinion.estimated_commit = state.clone();
self.update_state(&committed_opinion);
}
}
}
fn every_possible_config_join(&self) -> ArcOrdSet<CfgLE<Peer>> {
let mut joins: ArcOrdSet<CfgLE<Peer>> = ArcOrdSet::new();
for sz in 0..=self.new_opinion.proposed_configs.len() {
for subset in self.new_opinion.proposed_configs.iter().combinations(sz) {
let mut cfg: CfgLE<Peer> = self.new_opinion.estimated_commit.config().clone();
for s in subset {
cfg = cfg + s
}
joins.insert(cfg);
}
}
joins
}
fn commit_cfg(&self) -> CfgLE<Peer> {
let mut cfg: CfgLE<Peer> = self.new_opinion.estimated_commit.config().clone();
for c in self.new_opinion.proposed_configs.iter() {
cfg = cfg + c;
}
cfg
}
fn commit_state(&self) -> StateLE<ObjLD, Peer> {
StateLE::new_from((self.new_opinion.candidate_object.clone(), self.commit_cfg()))
}
fn advance_seq(&mut self) {
self.sequence += 1;
trace!("peer {:?} advanced seq to #{}", self.id, self.sequence);
self.sequence_responses.clear();
}
fn have_quorum_in_all_possible_cfgs(&self) -> bool {
for cfg in self.all_possible_cfgs.iter() {
let members = cfg.members();
let quorum_size = (members.len() / 2) + 1;
let members_responded = self.sequence_responses.clone().intersection(members).len();
let members_concurring = 1 + members_responded;
if members_concurring < quorum_size {
trace!("peer {:?} does not have quorum", self.id);
return false;
}
}
trace!("peer {:?} has quorum", self.id);
true
}
fn send_request_to_peer(&mut self, peer: Peer, outgoing: &mut Vec<Message<ObjLD, Peer>>) {
if peer == self.id {
return;
}
let req = Message::Request {
seq: self.sequence,
from: self.id.clone(),
to: peer.clone(),
opinion: self.new_opinion.clone(),
};
outgoing.push(req);
}
fn propose_send(&mut self, outgoing: &mut Vec<Message<ObjLD, Peer>>) {
self.advance_seq();
self.old_opinion = self.new_opinion.clone();
self.all_possible_cfgs = self.every_possible_config_join();
let all_members = members_of_cfgs(&self.all_possible_cfgs);
for peer in all_members {
self.send_request_to_peer(peer, outgoing);
}
self.propose_stage = ProposeStage::Recv;
}
fn propose_recv(&mut self) {
if !self
.new_opinion
.same_estimated_commit_config(&self.old_opinion)
|| self.have_quorum_in_all_possible_cfgs()
{
self.propose_stage = ProposeStage::Pick;
}
}
#[cfg(test)]
fn maybe_save_learned_state(&mut self, state: &StateLE<ObjLD, Peer>) {
self.learned_history.push(state.clone());
}
#[cfg(not(test))]
fn maybe_save_learned_state(&mut self, _state: &StateLE<ObjLD, Peer>) {}
fn learn_state(&mut self, state: StateLE<ObjLD, Peer>) {
self.maybe_save_learned_state(&state);
self.final_state = Some(state);
self.propose_stage = ProposeStage::Fini;
}
fn propose_pick(&mut self, outgoing: &mut Vec<Message<ObjLD, Peer>>) {
if self
.new_opinion
.same_estimated_and_proposed_configs(&self.old_opinion)
{
trace!("peer {:?} found stable configuration", self.id);
let cstate = self.commit_state();
if self.lower_bound == None {
self.lower_bound = Some(cstate.clone())
}
if self.old_opinion.candidate_object == self.new_opinion.candidate_object {
trace!(
"peer {:?} stable config has stable object, broadcasting and finishing",
self.id
);
let broadcast = Message::Commit {
from: self.id.clone(),
state: cstate.clone(),
};
outgoing.push(broadcast);
self.learn_state(cstate);
return;
}
}
match &self.lower_bound {
Some(state) if state <= &self.new_opinion.estimated_commit => {
trace!(
"peer {:?} stable config has acceptable lower bound, finishing",
self.id
);
self.learn_state(self.new_opinion.estimated_commit.clone());
return;
}
_ => (),
}
self.propose_stage = ProposeStage::Send;
}
pub fn propose_step<'a, MI>(&mut self, incoming: MI, outgoing: &mut Vec<Message<ObjLD, Peer>>)
where
MI: std::iter::Iterator<Item = &'a Message<ObjLD, Peer>>,
{
for msg in incoming {
self.receive(msg, outgoing);
}
loop {
let pre_stage = self.propose_stage;
let pre_len = outgoing.len();
match self.propose_stage {
ProposeStage::Init => return,
ProposeStage::Send => self.propose_send(outgoing),
ProposeStage::Recv => self.propose_recv(),
ProposeStage::Pick => self.propose_pick(outgoing),
ProposeStage::Fini => return,
}
if outgoing.len() != pre_len {
return;
}
if pre_stage == self.propose_stage {
return;
}
}
}
pub fn propose_is_fini(&self) -> bool {
match self.propose_stage {
ProposeStage::Fini => true,
_ => false,
}
}
pub fn propose(&mut self, prop: &StateLE<ObjLD, Peer>) {
self.lower_bound = None;
self.final_state = None;
self.old_opinion = Opinion::default();
self.all_possible_cfgs = ArcOrdSet::new();
self.sequence_responses = ArcOrdSet::new();
let prop_opinion = Opinion {
estimated_commit: self.new_opinion.estimated_commit.clone(),
proposed_configs: singleton_set(prop.config().clone()),
candidate_object: prop.object().clone(),
};
self.update_state(&prop_opinion);
self.propose_stage = ProposeStage::Send;
self.maybe_save_proposal(prop);
}
#[cfg(test)]
fn maybe_save_proposal(&mut self, prop: &StateLE<ObjLD, Peer>) {
self.proposed_history.push(prop.clone());
}
#[cfg(not(test))]
fn maybe_save_proposal(&mut self, _prop: &StateLE<ObjLD, Peer>) {}
}
fn singleton_set<T: Ord + Clone>(t: T) -> ArcOrdSet<T> {
let mut s = ArcOrdSet::new();
s.insert(t);
s
}
fn members_of_cfgs<Peer: DefTraits>(cfgs: &ArcOrdSet<CfgLE<Peer>>) -> ArcOrdSet<Peer> {
let mut u = ArcOrdSet::<Peer>::new();
for c in cfgs.iter() {
u = u.union(c.members().clone())
}
u
}