mod inflights;
mod progress;
mod state;
pub use self::inflights::Inflights;
pub use self::progress::Progress;
pub use self::state::ProgressState;
use slog::Logger;
use crate::confchange::{MapChange, MapChangeType};
use crate::eraftpb::ConfState;
use crate::quorum::{AckedIndexer, Index, VoteResult};
use crate::{DefaultHashBuilder, HashMap, HashSet, JointConfig};
use std::fmt::Debug;
use getset::Getters;
#[derive(Clone, Debug, Default, PartialEq, Getters)]
pub struct Configuration {
#[get = "pub"]
pub(crate) voters: JointConfig,
#[get = "pub"]
pub(crate) learners: HashSet<u64>,
#[get = "pub"]
pub(crate) learners_next: HashSet<u64>,
#[get = "pub"]
pub(crate) auto_leave: bool,
}
#[cfg(test)]
impl std::fmt::Display for Configuration {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
use itertools::Itertools;
if self.voters.outgoing.is_empty() {
write!(f, "voters={}", self.voters.incoming)?
} else {
write!(
f,
"voters={}&&{}",
self.voters.incoming, self.voters.outgoing
)?
}
if !self.learners.is_empty() {
write!(
f,
" learners=({})",
self.learners
.iter()
.sorted_by(|&a, &b| a.cmp(b))
.map(|x| x.to_string())
.collect::<Vec<String>>()
.join(" ")
)?
}
if !self.learners_next.is_empty() {
write!(
f,
" learners_next=({})",
self.learners_next
.iter()
.map(|x| x.to_string())
.collect::<Vec<String>>()
.join(" ")
)?
}
if self.auto_leave {
write!(f, " autoleave")?
}
Ok(())
}
}
impl Configuration {
pub fn new(
voters: impl IntoIterator<Item = u64>,
learners: impl IntoIterator<Item = u64>,
) -> Self {
Self {
voters: JointConfig::new(voters.into_iter().collect()),
auto_leave: false,
learners: learners.into_iter().collect(),
learners_next: HashSet::default(),
}
}
fn with_capacity(voters: usize, learners: usize) -> Self {
Self {
voters: JointConfig::with_capacity(voters),
learners: HashSet::with_capacity_and_hasher(learners, DefaultHashBuilder::default()),
learners_next: HashSet::default(),
auto_leave: false,
}
}
pub fn to_conf_state(&self) -> ConfState {
let mut state = ConfState::default();
state.set_voters(self.voters.incoming.raw_slice());
state.set_voters_outgoing(self.voters.outgoing.raw_slice());
state.set_learners(self.learners.iter().cloned().collect());
state.set_learners_next(self.learners_next.iter().cloned().collect());
state.auto_leave = self.auto_leave;
state
}
fn clear(&mut self) {
self.voters.clear();
self.learners.clear();
self.learners_next.clear();
self.auto_leave = false;
}
}
pub type ProgressMap = HashMap<u64, Progress>;
impl AckedIndexer for ProgressMap {
fn acked_index(&self, voter_id: u64) -> Option<Index> {
self.get(&voter_id).map(|p| Index {
index: p.matched,
group_id: p.commit_group_id,
})
}
}
#[derive(Clone, Getters)]
pub struct ProgressTracker {
progress: ProgressMap,
#[get = "pub"]
conf: Configuration,
#[doc(hidden)]
#[get = "pub"]
votes: HashMap<u64, bool>,
#[get = "pub(crate)"]
max_inflight: usize,
group_commit: bool,
pub(crate) logger: Logger,
}
impl ProgressTracker {
pub fn new(max_inflight: usize, logger: Logger) -> Self {
Self::with_capacity(0, 0, max_inflight, logger)
}
pub fn with_capacity(
voters: usize,
learners: usize,
max_inflight: usize,
logger: Logger,
) -> Self {
ProgressTracker {
progress: HashMap::with_capacity_and_hasher(
voters + learners,
DefaultHashBuilder::default(),
),
conf: Configuration::with_capacity(voters, learners),
votes: HashMap::with_capacity_and_hasher(voters, DefaultHashBuilder::default()),
max_inflight,
group_commit: false,
logger,
}
}
pub fn enable_group_commit(&mut self, enable: bool) {
self.group_commit = enable;
}
pub fn group_commit(&self) -> bool {
self.group_commit
}
pub(crate) fn clear(&mut self) {
self.progress.clear();
self.conf.clear();
self.votes.clear();
}
pub fn is_singleton(&self) -> bool {
self.conf.voters.is_singleton()
}
#[inline]
pub fn get(&self, id: u64) -> Option<&Progress> {
self.progress.get(&id)
}
#[inline]
pub fn get_mut(&mut self, id: u64) -> Option<&mut Progress> {
self.progress.get_mut(&id)
}
#[inline]
pub fn iter(&self) -> impl ExactSizeIterator<Item = (&u64, &Progress)> {
self.progress.iter()
}
#[inline]
pub fn iter_mut(&mut self) -> impl ExactSizeIterator<Item = (&u64, &mut Progress)> {
self.progress.iter_mut()
}
pub fn maximal_committed_index(&mut self) -> (u64, bool) {
self.conf
.voters
.committed_index(self.group_commit, &self.progress)
}
pub fn reset_votes(&mut self) {
self.votes.clear();
}
pub fn record_vote(&mut self, id: u64, vote: bool) {
self.votes.entry(id).or_insert(vote);
}
pub fn tally_votes(&self) -> (usize, usize, VoteResult) {
let (mut granted, mut rejected) = (0, 0);
for (id, vote) in &self.votes {
if !self.conf.voters.contains(*id) {
continue;
}
if *vote {
granted += 1;
} else {
rejected += 1;
}
}
let result = self.vote_result(&self.votes);
(granted, rejected, result)
}
pub fn vote_result(&self, votes: &HashMap<u64, bool>) -> VoteResult {
self.conf.voters.vote_result(|id| votes.get(&id).cloned())
}
pub fn quorum_recently_active(&mut self, perspective_of: u64) -> bool {
let mut active =
HashSet::with_capacity_and_hasher(self.progress.len(), DefaultHashBuilder::default());
for (id, pr) in &mut self.progress {
if *id == perspective_of {
pr.recent_active = true;
active.insert(*id);
} else if pr.recent_active {
active.insert(*id);
pr.recent_active = false;
}
}
self.has_quorum(&active)
}
#[inline]
pub fn has_quorum(&self, potential_quorum: &HashSet<u64>) -> bool {
self.conf
.voters
.vote_result(|id| potential_quorum.get(&id).map(|_| true))
== VoteResult::Won
}
#[inline]
pub(crate) fn progress(&self) -> &ProgressMap {
&self.progress
}
pub fn apply_conf(&mut self, conf: Configuration, changes: MapChange, next_idx: u64) {
self.conf = conf;
for (id, change_type) in changes {
match change_type {
MapChangeType::Add => {
let mut pr = Progress::new(next_idx, self.max_inflight);
pr.recent_active = true;
self.progress.insert(id, pr);
}
MapChangeType::Remove => {
self.progress.remove(&id);
}
}
}
}
}