extern crate alloc;
use alloc::vec::Vec;
use rand::{
prelude::{IteratorRandom, SliceRandom},
Rng,
};
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum State {
Alive,
Suspect,
Down,
}
pub type Incarnation = u16;
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct Member<T> {
id: T,
incarnation: Incarnation,
state: State,
}
impl<T> Member<T> {
pub const fn new(id: T, incarnation: Incarnation, state: State) -> Self {
Self {
id,
incarnation,
state,
}
}
pub fn alive(id: T) -> Self {
Self::new(id, Incarnation::default(), State::Alive)
}
#[cfg(test)]
pub(crate) fn suspect(id: T) -> Self {
Self::new(id, Incarnation::default(), State::Suspect)
}
pub(crate) fn down(id: T) -> Self {
Self::new(id, Incarnation::default(), State::Down)
}
pub const fn incarnation(&self) -> Incarnation {
self.incarnation
}
pub const fn state(&self) -> State {
self.state
}
pub const fn id(&self) -> &T {
&self.id
}
pub(crate) const fn is_active(&self) -> bool {
match self.state {
State::Alive | State::Suspect => true,
State::Down => false,
}
}
pub(crate) fn change_state(&mut self, incarnation: Incarnation, state: State) -> bool {
if self.can_change(incarnation, state) {
self.state = state;
self.incarnation = incarnation;
true
} else {
false
}
}
const fn can_change(&self, other_incarnation: Incarnation, other: State) -> bool {
match self.state {
State::Alive => match other {
State::Alive => other_incarnation > self.incarnation,
State::Suspect => other_incarnation >= self.incarnation,
State::Down => true,
},
State::Suspect => match other {
State::Alive | State::Suspect => other_incarnation > self.incarnation,
State::Down => true,
},
State::Down => false,
}
}
pub(crate) fn into_identity(self) -> T {
self.id
}
}
pub(crate) struct Members<T> {
pub(crate) inner: Vec<Member<T>>,
cursor: usize,
num_active: usize,
}
#[cfg(test)]
impl<T> Members<T> {
pub(crate) fn len(&self) -> usize {
self.inner.len()
}
}
impl<T> Members<T>
where
T: PartialEq + Clone + crate::Identity,
{
pub(crate) const fn num_active(&self) -> usize {
self.num_active
}
pub(crate) fn new(inner: Vec<Member<T>>) -> Self {
let num_active = inner.iter().filter(|member| member.is_active()).count();
Self {
cursor: 0,
num_active,
inner,
}
}
pub(crate) fn next(&mut self, mut rng: impl Rng) -> Option<&Member<T>> {
if self.cursor >= self.inner.len() {
self.inner.shuffle(&mut rng);
self.cursor = 0;
}
let position = self
.inner
.iter()
.skip(self.cursor)
.position(|m| m.is_active())
.map(|pos| pos + self.cursor);
let position = position.or_else(|| {
self.inner
.iter()
.take(self.cursor)
.position(|m| m.is_active())
});
if let Some(pos) = position {
if pos < self.cursor {
self.cursor = usize::MAX;
} else {
self.cursor = pos.saturating_add(1);
}
self.inner.get(pos)
} else {
None
}
}
fn choose_members<F>(
&self,
wanted: usize,
output: &mut Vec<Member<T>>,
mut rng: impl Rng,
picker: F,
) where
F: Fn(&Member<T>) -> bool,
{
let mut num_chosen = 0;
let mut num_seen = 0;
for member in &self.inner {
if !picker(member) {
continue;
}
num_seen += 1;
if num_chosen < wanted {
num_chosen += 1;
output.push(member.clone());
} else {
let replace_at = rng.random_range(0..num_seen);
if replace_at < wanted {
output[replace_at] = member.clone();
}
}
}
}
pub(crate) fn choose_down_members(
&self,
wanted: usize,
output: &mut Vec<Member<T>>,
rng: impl Rng,
) {
self.choose_members(wanted, output, rng, |member| !member.is_active());
}
pub(crate) fn choose_active_members<F>(
&self,
wanted: usize,
output: &mut Vec<Member<T>>,
rng: impl Rng,
picker: F,
) where
F: Fn(&T) -> bool,
{
self.choose_members(wanted, output, rng, |member| {
member.is_active() && picker(member.id())
});
}
pub(crate) fn remove_if_down(&mut self, id: &T) -> Option<Member<T>> {
let position = self
.inner
.iter()
.position(|member| &member.id == id && member.state == State::Down);
position.map(|pos| self.inner.swap_remove(pos))
}
pub(crate) fn iter_active(&self) -> impl Iterator<Item = &Member<T>> {
self.inner.iter().filter(|m| m.is_active())
}
pub(crate) fn is_active(&self, id: &T) -> bool {
self.inner
.iter()
.any(|member| &member.id == id && member.is_active())
}
pub(crate) fn apply_existing_if<F: Fn(&Member<T>) -> bool>(
&mut self,
mut update: Member<T>,
condition: F,
) -> Option<ApplySummary<T>> {
if let Some(known_member) = self
.inner
.iter_mut()
.find(|member| member.id.addr() == update.id().addr())
{
let mut force_apply = false;
if known_member.id != update.id {
if known_member.id.win_addr_conflict(&update.id) {
return Some(ApplySummary {
is_active_now: known_member.is_active(),
apply_successful: false,
changed_active_set: false,
conflict: ConflictResult::Lost,
});
}
force_apply = true;
}
if !condition(known_member) {
let conflict = if force_apply {
ConflictResult::FailedCondition
} else {
ConflictResult::NoConflict
};
return Some(ApplySummary {
is_active_now: known_member.is_active(),
apply_successful: false,
changed_active_set: false,
conflict,
});
}
let was_active = known_member.is_active();
let mut conflict = ConflictResult::NoConflict;
let apply_successful = if force_apply {
core::mem::swap(&mut known_member.id, &mut update.id);
conflict = ConflictResult::Replaced(update.id);
known_member.state = update.state;
known_member.incarnation = update.incarnation;
true
} else {
known_member.change_state(update.incarnation, update.state)
};
let is_active_now = known_member.is_active();
let changed_active_set = is_active_now != was_active;
if changed_active_set {
if is_active_now {
self.num_active = self.num_active.saturating_add(1);
} else {
self.num_active = self.num_active.saturating_sub(1);
}
}
Some(ApplySummary {
is_active_now,
apply_successful,
changed_active_set,
conflict,
})
} else {
None
}
}
pub(crate) fn apply(&mut self, update: Member<T>, mut rng: impl Rng) -> ApplySummary<T> {
self.apply_existing_if(update.clone(), |_member| true)
.unwrap_or_else(|| {
let is_active_now = update.is_active();
self.inner.push(update);
let inserted_at = self.inner.len() - 1;
let swap_idx = (0..self.inner.len())
.choose(&mut rng)
.unwrap_or(inserted_at);
self.inner.swap(swap_idx, inserted_at);
if is_active_now {
self.num_active = self.num_active.saturating_add(1);
}
ApplySummary {
is_active_now,
apply_successful: true,
changed_active_set: is_active_now,
conflict: ConflictResult::NoConflict,
}
})
}
}
#[derive(Debug, Clone, PartialEq)]
#[must_use]
pub(crate) struct ApplySummary<T> {
pub(crate) is_active_now: bool,
pub(crate) apply_successful: bool,
pub(crate) changed_active_set: bool,
pub(crate) conflict: ConflictResult<T>,
}
#[derive(Debug, Clone, PartialEq)]
pub(crate) enum ConflictResult<T> {
NoConflict,
Replaced(T),
Lost,
FailedCondition,
}
#[cfg(test)]
mod tests {
use crate::Identity;
use super::*;
use alloc::vec;
use rand::{rngs::SmallRng, SeedableRng};
#[derive(Clone, Debug, PartialEq, Eq, Copy, PartialOrd, Ord)]
struct Id(&'static str);
impl crate::Identity for Id {
type Addr = &'static str;
fn renew(&self) -> Option<Self> {
None
}
fn addr(&self) -> Self::Addr {
self.0
}
fn win_addr_conflict(&self, _adversary: &Self) -> bool {
panic!("addr is self, there'll never be a conflict");
}
}
use State::*;
#[test]
fn alive_transitions() {
let mut member = Member::new(Id("a"), 0, Alive);
assert!(
member.change_state(member.incarnation + 1, Alive),
"can transition to a higher incarnation"
);
assert_eq!(1, member.incarnation);
assert_eq!(Alive, member.state);
assert!(
!member.change_state(member.incarnation - 1, Alive),
"cannot transition to a lower incarnation"
);
assert!(
!member.change_state(member.incarnation, Alive),
"cannot transition to same state and incarnation {:?}",
&member
);
assert!(
!member.change_state(member.incarnation - 1, Suspect),
"lower suspect incarnation shouldn't transition"
);
assert!(
member.change_state(member.incarnation, Suspect),
"transition to suspect with same incarnation"
);
assert_eq!(Suspect, member.state);
member = Member::new(Id("b"), 0, Alive);
assert!(
member.change_state(member.incarnation + 1, Suspect),
"transition to suspect with higher incarnation"
);
assert_eq!(1, member.incarnation);
assert_eq!(Suspect, member.state);
assert!(
Member::new("c", 1, Alive).change_state(0, Down),
"transitions to down on lower incarnation"
);
assert!(
Member::new("c", 0, Alive).change_state(0, Down),
"transitions to down on same incarnation"
);
assert!(
Member::new("c", 0, Alive).change_state(1, Down),
"transitions to down on higher incarnation"
);
}
#[test]
fn suspect_transitions() {
let mut member = Member::new(Id("a"), 0, Suspect);
assert!(
member.change_state(member.incarnation + 1, Suspect),
"can transition to a higher incarnation"
);
assert_eq!(1, member.incarnation);
assert_eq!(Suspect, member.state);
assert!(
!member.change_state(member.incarnation - 1, Suspect),
"cannot transition to a lower incarnation"
);
assert!(
!member.change_state(member.incarnation, Suspect),
"cannot transition to same state and incarnation {:?}",
&member
);
assert!(
!member.change_state(member.incarnation - 1, Alive),
"lower alive incarnation shouldn't transition"
);
assert!(
!member.change_state(member.incarnation, Alive),
"same alive incarnation shouldn't transition"
);
assert!(
member.change_state(member.incarnation + 1, Alive),
"can transition to alive with higher incarnation"
);
assert_eq!(Alive, member.state);
assert!(
Member::new("c", 1, Suspect).change_state(0, Down),
"transitions to down on lower incarnation"
);
assert!(
Member::new("c", 0, Suspect).change_state(0, Down),
"transitions to down on same incarnation"
);
assert!(
Member::new("c", 0, Suspect).change_state(1, Down),
"transitions to down on higher incarnation"
);
}
#[test]
fn down_never_transitions() {
let mut member = Member::new("dead", 1, Down);
for incarnation in 0..=2 {
assert!(!member.change_state(incarnation, Alive));
assert!(!member.change_state(incarnation, Suspect));
assert!(!member.change_state(incarnation, Down));
}
}
#[test]
fn next_walks_sequentially_then_shuffles() {
let ordered_ids = vec![Id("1"), Id("2"), Id("3"), Id("4"), Id("5")];
let mut members = Members::new(ordered_ids.iter().cloned().map(Member::alive).collect());
let mut rng = SmallRng::seed_from_u64(0xF0CA);
for wanted in ordered_ids.iter() {
let got = members
.next(&mut rng)
.expect("Non-empty set of Alive members should always yield Some()")
.id;
assert_eq!(wanted, &got);
}
let mut after_shuffle = (0..ordered_ids.len())
.map(|_| members.next(&mut rng).unwrap().id)
.collect::<Vec<_>>();
assert_ne!(ordered_ids, after_shuffle);
after_shuffle.sort_unstable();
assert_eq!(ordered_ids, after_shuffle);
}
#[test]
fn apply_existing_if_behaviour() {
let mut members = Members::new(Vec::new());
assert_eq!(
None,
members.apply_existing_if(Member::alive(Id("1")), |_member| true),
"Only yield None only if member is not found"
);
let mut rng = SmallRng::seed_from_u64(0xF0CA);
let _ = members.apply(Member::alive(Id("1")), &mut rng);
assert_ne!(
None,
members.apply_existing_if(Member::alive(Id("1")), |_member| true),
"Must yield Some() if existing, regardless of condition"
);
assert_ne!(
None,
members.apply_existing_if(Member::alive(Id("1")), |_member| false),
"Must yield Some() if existing, regardless of condition"
);
}
#[test]
fn apply_summary_behaviour() {
let mut members = Members::new(Vec::new());
let mut rng = SmallRng::seed_from_u64(0xF0CA);
let res = members.apply(Member::suspect(Id("1")), &mut rng);
assert_eq!(
ApplySummary {
is_active_now: true,
apply_successful: true,
changed_active_set: true,
conflict: ConflictResult::NoConflict,
},
res,
);
assert_eq!(1, members.len());
assert_eq!(1, members.num_active());
let res = members.apply(Member::alive(Id("1")), &mut rng);
assert_eq!(
ApplySummary {
is_active_now: true,
apply_successful: false,
changed_active_set: false,
conflict: ConflictResult::NoConflict,
},
res,
);
assert_eq!(1, members.len());
let res = members.apply(Member::new(Id("1"), 1, State::Alive), &mut rng);
assert_eq!(
ApplySummary {
is_active_now: true,
apply_successful: true,
changed_active_set: false,
conflict: ConflictResult::NoConflict,
},
res,
);
assert_eq!(1, members.len());
let res = members.apply(Member::down(Id("1")), &mut rng);
assert_eq!(
ApplySummary {
is_active_now: false,
apply_successful: true,
changed_active_set: true,
conflict: ConflictResult::NoConflict,
},
res,
);
assert_eq!(1, members.len());
assert_eq!(0, members.num_active());
let res = members.apply(Member::down(Id("2")), &mut rng);
assert_eq!(
ApplySummary {
is_active_now: false,
apply_successful: true,
changed_active_set: false,
conflict: ConflictResult::NoConflict,
},
res,
);
assert_eq!(2, members.len());
assert_eq!(0, members.num_active());
}
#[test]
fn remove_if_down_works() {
let mut members = Members::new(Vec::new());
let mut rng = SmallRng::seed_from_u64(0xF0CA);
assert_eq!(
None,
members.remove_if_down(&Id("1")),
"cant remove member that does not exist"
);
let _ = members.apply(Member::alive(Id("1")), &mut rng);
assert_eq!(
None,
members.remove_if_down(&Id("1")),
"cant remove member that isnt down"
);
let _ = members.apply(Member::down(Id("1")), &mut rng);
assert_eq!(
Some(Member::down(Id("1"))),
members.remove_if_down(&Id("1")),
"must return the removed member"
);
}
#[test]
fn next_yields_none_with_no_active_members() {
let mut members = Members::new(Vec::new());
let mut rng = SmallRng::seed_from_u64(0xF0CA);
assert_eq!(
None,
members.next(&mut rng),
"next() should yield None when there are no members"
);
let _ = members.apply(Member::down(Id("-1")), &mut rng);
let _ = members.apply(Member::down(Id("-2")), &mut rng);
let _ = members.apply(Member::down(Id("-3")), &mut rng);
assert_eq!(
None,
members.next(&mut rng),
"next() should yield None when there are no active members"
);
let _ = members.apply(Member::alive(Id("1")), &mut rng);
for _i in 0..10 {
assert_eq!(
Some(Id("1")),
members.next(&mut rng).map(|m| m.id),
"next() should yield the same member if its the only active"
);
}
}
#[test]
fn choose_active_members_behaviour() {
let members = Members::new(Vec::from([
Member::alive(Id("1")),
Member::alive(Id("2")),
Member::alive(Id("3")),
Member::suspect(Id("4")),
Member::suspect(Id("5")),
Member::down(Id("6")),
Member::down(Id("7")),
]));
assert_eq!(7, members.len());
assert_eq!(5, members.num_active());
let mut out = Vec::new();
let mut rng = SmallRng::seed_from_u64(0xF0CA);
out.clear();
members.choose_active_members(0, &mut out, &mut rng, |_| true);
assert_eq!(0, out.len(), "Can pointlessly choose 0 members");
out.clear();
members.choose_active_members(10, &mut out, &mut rng, |_| false);
assert_eq!(0, out.len(), "Filtering works");
out.clear();
members.choose_active_members(members.len(), &mut out, &mut rng, |_| true);
assert_eq!(
members.num_active(),
out.len(),
"Only chooses active members"
);
out.clear();
members.choose_active_members(2, &mut out, &mut rng, |_| true);
assert_eq!(2, out.len(), "Respects `wanted` even if we have more");
out.clear();
members.choose_active_members(usize::MAX, &mut out, &mut rng, |&member_id| {
member_id.0.parse::<usize>().expect("number") > 4
});
assert_eq!(vec![Member::suspect(Id("5"))], out);
out.clear();
members.choose_down_members(3, &mut out, &mut rng);
assert_eq!(2, out.len());
assert!(out.iter().any(|m| m.id == Id("7")));
assert!(out.iter().any(|m| m.id == Id("6")));
}
#[test]
fn sets_replaced_id_on_addr_conflict() {
let id = crate::testing::ID::new(1).rejoinable();
let mut members = Members::new(Vec::from([
Member::alive(id),
]));
let renewed = id.renew().unwrap();
let summary = members
.apply_existing_if(Member::alive(renewed), |_| true)
.expect("member found");
assert!(summary.apply_successful);
assert_eq!(ConflictResult::Replaced(id), summary.conflict);
let another = renewed.renew().unwrap();
let summary = members
.apply_existing_if(Member::alive(another), |_| false)
.expect("member found");
assert!(!summary.apply_successful);
assert_eq!(
ConflictResult::FailedCondition,
summary.conflict,
"must not apply if condition fails"
);
let summary = members
.apply_existing_if(Member::alive(id), |_| true)
.expect("member found");
assert_eq!(ConflictResult::Lost, summary.conflict,);
assert!(!summary.apply_successful, "must not apply if conflict lost");
}
}