use crate::eraftpb::{ConfChangeSingle, ConfChangeType};
use crate::tracker::{Configuration, ProgressMap, ProgressTracker};
use crate::{Error, Result};
pub enum MapChangeType {
Add,
Remove,
}
pub type MapChange = Vec<(u64, MapChangeType)>;
pub struct IncrChangeMap<'a> {
changes: MapChange,
base: &'a ProgressMap,
}
impl IncrChangeMap<'_> {
pub fn into_changes(self) -> MapChange {
self.changes
}
fn contains(&self, id: u64) -> bool {
match self.changes.iter().rfind(|(i, _)| *i == id) {
Some((_, MapChangeType::Remove)) => false,
Some((_, MapChangeType::Add)) => true,
None => self.base.contains_key(&id),
}
}
}
pub struct Changer<'a> {
tracker: &'a ProgressTracker,
}
impl Changer<'_> {
pub fn new(tracker: &ProgressTracker) -> Changer {
Changer { tracker }
}
pub fn enter_joint(
&self,
auto_leave: bool,
ccs: &[ConfChangeSingle],
) -> Result<(Configuration, MapChange)> {
if super::joint(self.tracker.conf()) {
return Err(Error::ConfChangeError("config is already joint".to_owned()));
}
let (mut cfg, mut prs) = self.check_and_copy()?;
if cfg.voters().incoming.is_empty() {
return Err(Error::ConfChangeError(
"can't make a zero-voter config joint".to_owned(),
));
}
cfg.voters
.outgoing
.extend(cfg.voters.incoming.iter().cloned());
self.apply(&mut cfg, &mut prs, ccs)?;
cfg.auto_leave = auto_leave;
check_invariants(&cfg, &prs)?;
Ok((cfg, prs.into_changes()))
}
pub fn leave_joint(&self) -> Result<(Configuration, MapChange)> {
if !super::joint(self.tracker.conf()) {
return Err(Error::ConfChangeError(
"can't leave a non-joint config".to_owned(),
));
}
let (mut cfg, mut prs) = self.check_and_copy()?;
if cfg.voters().outgoing.is_empty() {
return Err(Error::ConfChangeError(format!(
"configuration is not joint: {:?}",
cfg
)));
}
cfg.learners.extend(cfg.learners_next.drain());
for id in &*cfg.voters.outgoing {
if !cfg.voters.incoming.contains(id) && !cfg.learners.contains(id) {
prs.changes.push((*id, MapChangeType::Remove));
}
}
cfg.voters.outgoing.clear();
cfg.auto_leave = false;
check_invariants(&cfg, &prs)?;
Ok((cfg, prs.into_changes()))
}
pub fn simple(&mut self, ccs: &[ConfChangeSingle]) -> Result<(Configuration, MapChange)> {
if super::joint(self.tracker.conf()) {
return Err(Error::ConfChangeError(
"can't apply simple config change in joint config".to_owned(),
));
}
let (mut cfg, mut prs) = self.check_and_copy()?;
self.apply(&mut cfg, &mut prs, ccs)?;
if cfg
.voters
.incoming
.symmetric_difference(&self.tracker.conf().voters.incoming)
.count()
> 1
{
return Err(Error::ConfChangeError(
"more than one voter changed without entering joint config".to_owned(),
));
}
check_invariants(&cfg, &prs)?;
Ok((cfg, prs.into_changes()))
}
fn apply(
&self,
cfg: &mut Configuration,
prs: &mut IncrChangeMap,
ccs: &[ConfChangeSingle],
) -> Result<()> {
for cc in ccs {
if cc.node_id == 0 {
continue;
}
match cc.get_change_type() {
ConfChangeType::AddNode => self.make_voter(cfg, prs, cc.node_id),
ConfChangeType::AddLearnerNode => self.make_learner(cfg, prs, cc.node_id),
ConfChangeType::RemoveNode => self.remove(cfg, prs, cc.node_id),
}
}
if cfg.voters().incoming.is_empty() {
return Err(Error::ConfChangeError("removed all voters".to_owned()));
}
Ok(())
}
fn make_voter(&self, cfg: &mut Configuration, prs: &mut IncrChangeMap, id: u64) {
if !prs.contains(id) {
self.init_progress(cfg, prs, id, false);
return;
}
cfg.voters.incoming.insert(id);
cfg.learners.remove(&id);
cfg.learners_next.remove(&id);
}
fn make_learner(&self, cfg: &mut Configuration, prs: &mut IncrChangeMap, id: u64) {
if !prs.contains(id) {
self.init_progress(cfg, prs, id, true);
return;
}
if cfg.learners.contains(&id) {
return;
}
cfg.voters.incoming.remove(&id);
cfg.learners.remove(&id);
cfg.learners_next.remove(&id);
if cfg.voters().outgoing.contains(&id) {
cfg.learners_next.insert(id);
} else {
cfg.learners.insert(id);
}
}
fn remove(&self, cfg: &mut Configuration, prs: &mut IncrChangeMap, id: u64) {
if !prs.contains(id) {
return;
}
cfg.voters.incoming.remove(&id);
cfg.learners.remove(&id);
cfg.learners_next.remove(&id);
if !cfg.voters.outgoing.contains(&id) {
prs.changes.push((id, MapChangeType::Remove));
}
}
fn init_progress(
&self,
cfg: &mut Configuration,
prs: &mut IncrChangeMap,
id: u64,
is_learner: bool,
) {
if !is_learner {
cfg.voters.incoming.insert(id);
} else {
cfg.learners.insert(id);
}
prs.changes.push((id, MapChangeType::Add));
}
fn check_and_copy(&self) -> Result<(Configuration, IncrChangeMap)> {
let prs = IncrChangeMap {
changes: vec![],
base: self.tracker.progress(),
};
check_invariants(self.tracker.conf(), &prs)?;
Ok((self.tracker.conf().clone(), prs))
}
}
fn check_invariants(cfg: &Configuration, prs: &IncrChangeMap) -> Result<()> {
for id in cfg.voters().ids().iter() {
if !prs.contains(id) {
return Err(Error::ConfChangeError(format!(
"no progress for voter {}",
id
)));
}
}
for id in &cfg.learners {
if !prs.contains(*id) {
return Err(Error::ConfChangeError(format!(
"no progress for learner {}",
id
)));
}
if cfg.voters().outgoing.contains(id) {
return Err(Error::ConfChangeError(format!(
"{} is in learners and outgoing voters",
id
)));
}
if cfg.voters().incoming.contains(id) {
return Err(Error::ConfChangeError(format!(
"{} is in learners and incoming voters",
id
)));
}
}
for id in &cfg.learners_next {
if !prs.contains(*id) {
return Err(Error::ConfChangeError(format!(
"no progress for learner(next) {}",
id
)));
}
if !cfg.voters().outgoing.contains(id) {
return Err(Error::ConfChangeError(format!(
"{} is in learners_next and outgoing voters",
id
)));
}
}
if !super::joint(cfg) {
if !cfg.learners_next().is_empty() {
return Err(Error::ConfChangeError(
"learners_next must be empty when not joint".to_owned(),
));
}
if cfg.auto_leave {
return Err(Error::ConfChangeError(
"auto_leave must be false when not joint".to_owned(),
));
}
}
Ok(())
}