use std::cmp;
use std::collections::BTreeSet;
use node::NodeId;
pub type ClusterMembers = BTreeSet<NodeId>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ClusterState {
Stable,
CatchUp,
Joint,
}
impl ClusterState {
pub fn is_stable(self) -> bool {
self == ClusterState::Stable
}
pub fn is_joint(self) -> bool {
self == ClusterState::Joint
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ClusterConfig {
new: ClusterMembers,
old: ClusterMembers,
state: ClusterState,
}
impl ClusterConfig {
pub fn state(&self) -> ClusterState {
self.state
}
pub fn new_members(&self) -> &ClusterMembers {
&self.new
}
pub fn old_members(&self) -> &ClusterMembers {
&self.old
}
pub fn primary_members(&self) -> &ClusterMembers {
match self.state {
ClusterState::Stable => &self.new,
ClusterState::CatchUp => &self.old,
ClusterState::Joint => &self.old,
}
}
pub fn members(&self) -> impl Iterator<Item = &NodeId> {
self.new.union(&self.old)
}
pub fn is_known_node(&self, node: &NodeId) -> bool {
self.new.contains(node) || self.old.contains(node)
}
pub fn new(members: ClusterMembers) -> Self {
ClusterConfig {
new: members,
old: ClusterMembers::default(),
state: ClusterState::Stable,
}
}
pub fn with_state(
new_members: ClusterMembers,
old_members: ClusterMembers,
state: ClusterState,
) -> Self {
ClusterConfig {
new: new_members,
old: old_members,
state,
}
}
pub(crate) fn start_config_change(&self, new: ClusterMembers) -> Self {
ClusterConfig {
new,
old: self.primary_members().clone(),
state: ClusterState::CatchUp,
}
}
pub(crate) fn to_next_state(&self) -> Self {
match self.state {
ClusterState::Stable => self.clone(),
ClusterState::CatchUp => {
let mut next = self.clone();
next.state = ClusterState::Joint;
next
}
ClusterState::Joint => {
let mut next = self.clone();
next.old = ClusterMembers::new();
next.state = ClusterState::Stable;
next
}
}
}
pub(crate) fn consensus_value<F, T>(&self, f: F) -> T
where
F: Fn(&NodeId) -> T,
T: Ord + Copy + Default,
{
match self.state {
ClusterState::Stable => median(&self.new, &f),
ClusterState::CatchUp => median(&self.old, &f),
ClusterState::Joint => {
cmp::min(median(&self.new, &f), median(&self.old, &f))
}
}
}
pub(crate) fn full_consensus_value<F, T>(&self, f: F) -> T
where
F: Fn(&NodeId) -> T,
T: Ord + Copy + Default,
{
if self.state.is_stable() {
median(&self.new, &f)
} else {
cmp::min(median(&self.new, &f), median(&self.old, &f))
}
}
}
fn median<F, T>(members: &ClusterMembers, f: F) -> T
where
F: Fn(&NodeId) -> T,
T: Ord + Copy + Default,
{
let mut values = members.iter().map(|n| f(n)).collect::<Vec<_>>();
values.sort();
values.reverse();
if values.is_empty() {
T::default()
} else {
values[members.len() / 2]
}
}