atomr-cluster 0.1.0

Membership, gossip, reachability, vector clocks, and split-brain resolvers for atomr clusters.
Documentation
//! Split-brain resolvers. akka.net: `Cluster/SBR/`.
//!
//! Five strategies are implemented matching akka.net:
//! * KeepMajority
//! * StaticQuorum
//! * KeepOldest
//! * KeepReferee
//! * LeaseMajority

use crate::member::{Member, MemberStatus};

/// What the resolver recommends the cluster do with the given side.
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum DowningDecision {
    DownUnreachable,
    DownAll,
    DownSelf,
    Stay,
}

pub trait DowningStrategy: Send + Sync {
    fn decide(&self, reachable: &[&Member], unreachable: &[&Member]) -> DowningDecision;
}

/// KeepMajority: the side with strictly more up members survives.
#[derive(Debug, Clone, Copy, Default)]
pub struct KeepMajorityStrategy;

impl DowningStrategy for KeepMajorityStrategy {
    fn decide(&self, r: &[&Member], u: &[&Member]) -> DowningDecision {
        let up = |ms: &[&Member]| ms.iter().filter(|m| m.status == MemberStatus::Up).count();
        let rn = up(r);
        let un = up(u);
        if rn > un {
            DowningDecision::DownUnreachable
        } else if rn < un {
            DowningDecision::DownSelf
        } else {
            DowningDecision::DownAll
        }
    }
}

/// StaticQuorum: requires at least `quorum_size` reachable members to survive.
#[derive(Debug, Clone, Copy)]
pub struct StaticQuorumStrategy {
    pub quorum_size: usize,
}

impl DowningStrategy for StaticQuorumStrategy {
    fn decide(&self, r: &[&Member], _: &[&Member]) -> DowningDecision {
        if r.len() >= self.quorum_size {
            DowningDecision::DownUnreachable
        } else {
            DowningDecision::DownSelf
        }
    }
}

/// KeepOldest: the side containing the oldest (lowest `up_number`) up member survives.
#[derive(Debug, Clone, Copy, Default)]
pub struct KeepOldestStrategy {
    pub down_if_alone: bool,
}

impl DowningStrategy for KeepOldestStrategy {
    fn decide(&self, r: &[&Member], u: &[&Member]) -> DowningDecision {
        fn oldest<'a>(ms: &[&'a Member]) -> Option<&'a Member> {
            ms.iter().min_by_key(|m| m.up_number).copied()
        }
        let rolds = oldest(r);
        let uolds = oldest(u);
        match (rolds, uolds) {
            (Some(ro), Some(uo)) => {
                if ro.up_number <= uo.up_number {
                    if r.len() == 1 && self.down_if_alone {
                        DowningDecision::DownAll
                    } else {
                        DowningDecision::DownUnreachable
                    }
                } else {
                    DowningDecision::DownSelf
                }
            }
            (Some(_), None) => DowningDecision::DownUnreachable,
            (None, Some(_)) => DowningDecision::DownSelf,
            (None, None) => DowningDecision::Stay,
        }
    }
}

/// KeepReferee: the side containing the designated `referee` member survives.
#[derive(Debug, Clone)]
pub struct KeepReferee {
    pub referee: String,
    pub down_all_if_less_than: usize,
}

impl DowningStrategy for KeepReferee {
    fn decide(&self, r: &[&Member], _u: &[&Member]) -> DowningDecision {
        let has_referee = r.iter().any(|m| m.address.to_string() == self.referee);
        if !has_referee {
            return DowningDecision::DownSelf;
        }
        if r.len() < self.down_all_if_less_than {
            DowningDecision::DownAll
        } else {
            DowningDecision::DownUnreachable
        }
    }
}

/// LeaseMajority: majority decision gated by an external lease. In-memory
/// simulation of whether a lease was acquired.
#[derive(Debug, Clone, Copy, Default)]
pub struct LeaseMajorityStrategy {
    pub lease_acquired: bool,
}

impl DowningStrategy for LeaseMajorityStrategy {
    fn decide(&self, r: &[&Member], u: &[&Member]) -> DowningDecision {
        let m = KeepMajorityStrategy.decide(r, u);
        match m {
            DowningDecision::DownAll if self.lease_acquired => DowningDecision::DownUnreachable,
            other => other,
        }
    }
}

/// Facade that holds any of the strategies behind a trait object.
pub struct SplitBrainResolver {
    pub strategy: Box<dyn DowningStrategy>,
}

impl SplitBrainResolver {
    pub fn new(strategy: Box<dyn DowningStrategy>) -> Self {
        Self { strategy }
    }
    pub fn decide(&self, r: &[&Member], u: &[&Member]) -> DowningDecision {
        self.strategy.decide(r, u)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use atomr_core::actor::Address;

    fn up(n: i32) -> Member {
        let mut m = Member::new(Address::local(format!("N{n}")), vec![]);
        m.status = MemberStatus::Up;
        m.up_number = n;
        m
    }

    #[test]
    fn keep_majority_prefers_larger_side() {
        let r = [up(1), up(2), up(3)];
        let u = [up(4)];
        let r_ref: Vec<&Member> = r.iter().collect();
        let u_ref: Vec<&Member> = u.iter().collect();
        assert_eq!(KeepMajorityStrategy.decide(&r_ref, &u_ref), DowningDecision::DownUnreachable);
    }

    #[test]
    fn static_quorum_enforces_size() {
        let r = [up(1)];
        let u = [up(2)];
        let r_ref: Vec<&Member> = r.iter().collect();
        let u_ref: Vec<&Member> = u.iter().collect();
        assert_eq!(StaticQuorumStrategy { quorum_size: 2 }.decide(&r_ref, &u_ref), DowningDecision::DownSelf);
    }

    #[test]
    fn keep_oldest_picks_lowest_up_number() {
        let r = [up(1)];
        let u = [up(2), up(3)];
        let r_ref: Vec<&Member> = r.iter().collect();
        let u_ref: Vec<&Member> = u.iter().collect();
        assert_eq!(KeepOldestStrategy::default().decide(&r_ref, &u_ref), DowningDecision::DownUnreachable);
    }
}