extern crate alloc;
use alloc::vec::Vec;
use rand::{
prelude::{IteratorRandom, SliceRandom},
Rng,
};
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum State {
Alive,
Suspect,
Down,
}
pub type Incarnation = u16;
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct Member<T> {
id: T,
incarnation: Incarnation,
state: State,
}
impl<T> Member<T> {
pub fn new(id: T, incarnation: Incarnation, state: State) -> Self {
Self {
id,
incarnation,
state,
}
}
pub fn alive(id: T) -> Self {
Self::new(id, Incarnation::default(), State::Alive)
}
#[cfg(test)]
pub(crate) fn suspect(id: T) -> Self {
Self::new(id, Incarnation::default(), State::Suspect)
}
pub(crate) fn down(id: T) -> Self {
Self::new(id, Incarnation::default(), State::Down)
}
pub fn incarnation(&self) -> Incarnation {
self.incarnation
}
pub fn state(&self) -> State {
self.state
}
pub fn id(&self) -> &T {
&self.id
}
pub(crate) fn is_active(&self) -> bool {
match self.state {
State::Alive | State::Suspect => true,
State::Down => false,
}
}
pub(crate) fn change_state(&mut self, incarnation: Incarnation, state: State) -> bool {
if self.can_change(incarnation, state) {
self.state = state;
self.incarnation = incarnation;
true
} else {
false
}
}
fn can_change(&self, other_incarnation: Incarnation, other: State) -> bool {
match self.state {
State::Alive => match other {
State::Alive => other_incarnation > self.incarnation,
State::Suspect => other_incarnation >= self.incarnation,
State::Down => true,
},
State::Suspect => match other {
State::Alive => other_incarnation > self.incarnation,
State::Suspect => other_incarnation > self.incarnation,
State::Down => true,
},
State::Down => false,
}
}
pub(crate) fn into_identity(self) -> T {
self.id
}
}
pub(crate) struct Members<T> {
inner: Vec<Member<T>>,
cursor: usize,
num_active: usize,
}
#[cfg(test)]
impl<T> Members<T> {
pub fn len(&self) -> usize {
self.inner.len()
}
}
impl<T: PartialEq + Clone> Members<T> {
pub fn num_active(&self) -> usize {
self.num_active
}
pub fn new(inner: Vec<Member<T>>) -> Self {
let num_active = inner.iter().filter(|member| member.is_active()).count();
Self {
cursor: 0,
num_active,
inner,
}
}
pub fn next(&mut self, mut rng: impl Rng) -> Option<&Member<T>> {
if self.cursor >= self.inner.len() {
self.inner.shuffle(&mut rng);
self.cursor = 0;
}
let position = self
.inner
.iter()
.skip(self.cursor)
.position(|m| m.is_active())
.map(|pos| pos + self.cursor);
let position = position.or_else(|| {
self.inner
.iter()
.take(self.cursor)
.position(|m| m.is_active())
});
if let Some(pos) = position {
if pos < self.cursor {
self.cursor = core::usize::MAX;
} else {
self.cursor = pos.saturating_add(1);
}
self.inner.get(pos)
} else {
None
}
}
pub fn choose_active_members<F>(
&self,
wanted: usize,
output: &mut Vec<Member<T>>,
mut rng: impl Rng,
picker: F,
) where
F: Fn(&T) -> bool,
{
let mut num_chosen = 0;
let mut num_seen = 0;
for member in self.iter_active() {
if !picker(member.id()) {
continue;
}
num_seen += 1;
if num_chosen < wanted {
num_chosen += 1;
output.push(member.clone());
} else {
let replace_at = rng.gen_range(0..num_seen);
if replace_at < wanted {
output[replace_at] = member.clone();
}
}
}
}
pub fn remove_if_down(&mut self, id: &T) -> Option<Member<T>> {
let position = self
.inner
.iter()
.position(|member| &member.id == id && member.state == State::Down);
position.map(|pos| self.inner.swap_remove(pos))
}
pub fn iter_active(&self) -> impl Iterator<Item = &Member<T>> {
self.inner.iter().filter(|m| m.is_active())
}
pub fn apply_existing_if<F: Fn(&Member<T>) -> bool>(
&mut self,
update: Member<T>,
condition: F,
) -> Option<ApplySummary> {
if let Some(known_member) = self
.inner
.iter_mut()
.find(|member| &member.id == update.id())
{
if !condition(known_member) {
return Some(ApplySummary {
is_active_now: known_member.is_active(),
apply_successful: false,
changed_active_set: false,
});
}
let was_active = known_member.is_active();
let apply_successful = known_member.change_state(update.incarnation(), update.state());
let is_active_now = known_member.is_active();
let changed_active_set = is_active_now != was_active;
if changed_active_set {
if is_active_now {
self.num_active = self.num_active.saturating_add(1);
} else {
self.num_active = self.num_active.saturating_sub(1);
}
}
Some(ApplySummary {
is_active_now,
apply_successful,
changed_active_set,
})
} else {
None
}
}
pub fn apply(&mut self, update: Member<T>, mut rng: impl Rng) -> ApplySummary {
self.apply_existing_if(update.clone(), |_member| true)
.unwrap_or_else(|| {
let is_active_now = update.is_active();
self.inner.push(update);
let inserted_at = self.inner.len() - 1;
let swap_idx = (0..self.inner.len())
.choose(&mut rng)
.unwrap_or(inserted_at);
self.inner.swap(swap_idx, inserted_at);
if is_active_now {
self.num_active = self.num_active.saturating_add(1);
}
ApplySummary {
is_active_now,
apply_successful: true,
changed_active_set: is_active_now,
}
})
}
}
#[derive(Debug, Clone, PartialEq)]
#[must_use]
pub(crate) struct ApplySummary {
pub is_active_now: bool,
pub apply_successful: bool,
pub changed_active_set: bool,
}
#[cfg(test)]
mod tests {
use super::*;
use alloc::vec;
use rand::{rngs::SmallRng, SeedableRng};
use State::*;
#[test]
fn alive_transitions() {
let mut member = Member::new("a", 0, Alive);
assert!(
member.change_state(member.incarnation + 1, Alive),
"can transition to a higher incarnation"
);
assert_eq!(1, member.incarnation);
assert_eq!(Alive, member.state);
assert!(
!member.change_state(member.incarnation - 1, Alive),
"cannot transition to a lower incarnation"
);
assert!(
!member.change_state(member.incarnation, Alive),
"cannot transition to same state and incarnation {:?}",
&member
);
assert!(
!member.change_state(member.incarnation - 1, Suspect),
"lower suspect incarnation shouldn't transition"
);
assert!(
member.change_state(member.incarnation, Suspect),
"transition to suspect with same incarnation"
);
assert_eq!(Suspect, member.state);
member = Member::new("b", 0, Alive);
assert!(
member.change_state(member.incarnation + 1, Suspect),
"transition to suspect with higher incarnation"
);
assert_eq!(1, member.incarnation);
assert_eq!(Suspect, member.state);
assert!(
Member::new("c", 1, Alive).change_state(0, Down),
"transitions to down on lower incarnation"
);
assert!(
Member::new("c", 0, Alive).change_state(0, Down),
"transitions to down on same incarnation"
);
assert!(
Member::new("c", 0, Alive).change_state(1, Down),
"transitions to down on higher incarnation"
);
}
#[test]
fn suspect_transitions() {
let mut member = Member::new("a", 0, Suspect);
assert!(
member.change_state(member.incarnation + 1, Suspect),
"can transition to a higher incarnation"
);
assert_eq!(1, member.incarnation);
assert_eq!(Suspect, member.state);
assert!(
!member.change_state(member.incarnation - 1, Suspect),
"cannot transition to a lower incarnation"
);
assert!(
!member.change_state(member.incarnation, Suspect),
"cannot transition to same state and incarnation {:?}",
&member
);
assert!(
!member.change_state(member.incarnation - 1, Alive),
"lower alive incarnation shouldn't transition"
);
assert!(
!member.change_state(member.incarnation, Alive),
"same alive incarnation shouldn't transition"
);
assert!(
member.change_state(member.incarnation + 1, Alive),
"can transition to alive with higher incarnation"
);
assert_eq!(Alive, member.state);
assert!(
Member::new("c", 1, Suspect).change_state(0, Down),
"transitions to down on lower incarnation"
);
assert!(
Member::new("c", 0, Suspect).change_state(0, Down),
"transitions to down on same incarnation"
);
assert!(
Member::new("c", 0, Suspect).change_state(1, Down),
"transitions to down on higher incarnation"
);
}
#[test]
fn down_never_transitions() {
let mut member = Member::new("dead", 1, Down);
for incarnation in 0..=2 {
assert!(!member.change_state(incarnation, Alive));
assert!(!member.change_state(incarnation, Suspect));
assert!(!member.change_state(incarnation, Down));
}
}
#[test]
fn next_walks_sequentially_then_shuffles() {
let ordered_ids = vec![1, 2, 3, 4, 5];
let mut members = Members::new(ordered_ids.iter().cloned().map(Member::alive).collect());
let mut rng = SmallRng::seed_from_u64(0xF0CA);
for wanted in ordered_ids.iter() {
let got = members
.next(&mut rng)
.expect("Non-empty set of Alive members should always yield Some()")
.id;
assert_eq!(wanted, &got);
}
let mut after_shuffle = (0..ordered_ids.len())
.map(|_| members.next(&mut rng).unwrap().id)
.collect::<Vec<_>>();
assert_ne!(ordered_ids, after_shuffle);
after_shuffle.sort_unstable();
assert_eq!(ordered_ids, after_shuffle);
}
#[test]
fn apply_existing_if_behaviour() {
let mut members = Members::new(Vec::new());
assert_eq!(
None,
members.apply_existing_if(Member::alive(1), |_member| true),
"Only yield None only if member is not found"
);
let mut rng = SmallRng::seed_from_u64(0xF0CA);
let _ = members.apply(Member::alive(1), &mut rng);
assert_ne!(
None,
members.apply_existing_if(Member::alive(1), |_member| true),
"Must yield Some() if existing, regardless of condition"
);
assert_ne!(
None,
members.apply_existing_if(Member::alive(1), |_member| false),
"Must yield Some() if existing, regardless of condition"
);
}
#[test]
fn apply_summary_behaviour() {
let mut members = Members::new(Vec::new());
let mut rng = SmallRng::seed_from_u64(0xF0CA);
let res = members.apply(Member::suspect(1), &mut rng);
assert_eq!(
ApplySummary {
is_active_now: true,
apply_successful: true,
changed_active_set: true
},
res,
);
assert_eq!(1, members.len());
assert_eq!(1, members.num_active());
let res = members.apply(Member::alive(1), &mut rng);
assert_eq!(
ApplySummary {
is_active_now: true,
apply_successful: false,
changed_active_set: false
},
res,
);
assert_eq!(1, members.len());
let res = members.apply(Member::new(1, 1, State::Alive), &mut rng);
assert_eq!(
ApplySummary {
is_active_now: true,
apply_successful: true,
changed_active_set: false
},
res,
);
assert_eq!(1, members.len());
let res = members.apply(Member::down(1), &mut rng);
assert_eq!(
ApplySummary {
is_active_now: false,
apply_successful: true,
changed_active_set: true
},
res,
);
assert_eq!(1, members.len());
assert_eq!(0, members.num_active());
let res = members.apply(Member::down(2), &mut rng);
assert_eq!(
ApplySummary {
is_active_now: false,
apply_successful: true,
changed_active_set: false
},
res,
);
assert_eq!(2, members.len());
assert_eq!(0, members.num_active());
}
#[test]
fn remove_if_down_works() {
let mut members = Members::new(Vec::new());
let mut rng = SmallRng::seed_from_u64(0xF0CA);
assert_eq!(
None,
members.remove_if_down(&1),
"cant remove member that does not exist"
);
let _ = members.apply(Member::alive(1), &mut rng);
assert_eq!(
None,
members.remove_if_down(&1),
"cant remove member that isnt down"
);
let _ = members.apply(Member::down(1), &mut rng);
assert_eq!(
Some(Member::down(1)),
members.remove_if_down(&1),
"must return the removed member"
);
}
#[test]
fn next_yields_none_with_no_active_members() {
let mut members = Members::new(Vec::new());
let mut rng = SmallRng::seed_from_u64(0xF0CA);
assert_eq!(
None,
members.next(&mut rng),
"next() should yield None when there are no members"
);
let _ = members.apply(Member::down(-1), &mut rng);
let _ = members.apply(Member::down(-2), &mut rng);
let _ = members.apply(Member::down(-3), &mut rng);
assert_eq!(
None,
members.next(&mut rng),
"next() should yield None when there are no active members"
);
let _ = members.apply(Member::alive(1), &mut rng);
for _i in 0..10 {
assert_eq!(
Some(1),
members.next(&mut rng).map(|m| m.id),
"next() should yield the same member if its the only active"
)
}
}
#[test]
fn choose_active_members_behaviour() {
let members = Members::new(Vec::from([
Member::alive(1),
Member::alive(2),
Member::alive(3),
Member::suspect(4),
Member::suspect(5),
Member::down(6),
Member::down(7),
]));
assert_eq!(7, members.len());
assert_eq!(5, members.num_active());
let mut out = Vec::new();
let mut rng = SmallRng::seed_from_u64(0xF0CA);
out.clear();
members.choose_active_members(0, &mut out, &mut rng, |_| true);
assert_eq!(0, out.len(), "Can pointlessly choose 0 members");
out.clear();
members.choose_active_members(10, &mut out, &mut rng, |_| false);
assert_eq!(0, out.len(), "Filtering works");
out.clear();
members.choose_active_members(members.len(), &mut out, &mut rng, |_| true);
assert_eq!(
members.num_active(),
out.len(),
"Only chooses active members"
);
out.clear();
members.choose_active_members(2, &mut out, &mut rng, |_| true);
assert_eq!(2, out.len(), "Respects `wanted` even if we have more");
out.clear();
members.choose_active_members(usize::MAX, &mut out, &mut rng, |&member_id| member_id > 4);
assert_eq!(vec![Member::suspect(5)], out);
}
}