use std::collections::BTreeMap;
use super::identity::NodeIdentity;
pub const RESILIENT_DATA_MEMBER_BASELINE: usize = 3;
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ClusterId(String);
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ClusterIdError;
impl std::fmt::Display for ClusterIdError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "cluster id is empty")
}
}
impl std::error::Error for ClusterIdError {}
impl ClusterId {
pub fn new(value: impl AsRef<str>) -> Result<Self, ClusterIdError> {
let value = value.as_ref().trim();
if value.is_empty() {
return Err(ClusterIdError);
}
Ok(Self(value.to_string()))
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl std::fmt::Display for ClusterId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MemberKind {
Data,
Witness,
}
impl MemberKind {
pub fn holds_data(self) -> bool {
matches!(self, MemberKind::Data)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MemberState {
Active,
Draining,
}
impl MemberState {
pub fn accepts_new_placements(self) -> bool {
matches!(self, MemberState::Active)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ClusterMember {
identity: NodeIdentity,
kind: MemberKind,
state: MemberState,
owned_range_count: usize,
}
impl ClusterMember {
pub fn joined_empty(identity: NodeIdentity, kind: MemberKind) -> Self {
Self {
identity,
kind,
state: MemberState::Active,
owned_range_count: 0,
}
}
pub fn identity(&self) -> &NodeIdentity {
&self.identity
}
pub fn kind(&self) -> MemberKind {
self.kind
}
pub fn state(&self) -> MemberState {
self.state
}
pub fn is_draining(&self) -> bool {
self.state == MemberState::Draining
}
pub fn begin_drain(&mut self) -> bool {
let changed = self.state == MemberState::Active;
self.state = MemberState::Draining;
changed
}
pub fn is_placement_eligible(&self) -> bool {
self.kind.holds_data() && self.state.accepts_new_placements()
}
pub fn owned_range_count(&self) -> usize {
self.owned_range_count
}
pub fn holds_user_ranges(&self) -> bool {
self.owned_range_count > 0
}
pub fn assign_ranges(&mut self, count: usize) {
self.owned_range_count = count;
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AdmissionOutcome {
Admitted,
AlreadyMember,
}
#[derive(Debug, Clone)]
pub struct MembershipCatalog {
cluster_id: ClusterId,
members: BTreeMap<NodeIdentity, ClusterMember>,
}
impl MembershipCatalog {
pub fn new(cluster_id: ClusterId, founders: impl IntoIterator<Item = ClusterMember>) -> Self {
let members = founders
.into_iter()
.map(|m| (m.identity().clone(), m))
.collect();
Self {
cluster_id,
members,
}
}
pub fn cluster_id(&self) -> &ClusterId {
&self.cluster_id
}
pub fn is_authorized(&self, identity: &NodeIdentity) -> bool {
self.members.contains_key(identity)
}
pub fn member(&self, identity: &NodeIdentity) -> Option<&ClusterMember> {
self.members.get(identity)
}
pub fn member_mut(&mut self, identity: &NodeIdentity) -> Option<&mut ClusterMember> {
self.members.get_mut(identity)
}
pub fn admit(&mut self, member: ClusterMember) -> AdmissionOutcome {
if self.members.contains_key(member.identity()) {
return AdmissionOutcome::AlreadyMember;
}
self.members.insert(member.identity().clone(), member);
AdmissionOutcome::Admitted
}
pub fn begin_drain(&mut self, identity: &NodeIdentity) -> Option<bool> {
self.members
.get_mut(identity)
.map(ClusterMember::begin_drain)
}
pub fn remove(&mut self, identity: &NodeIdentity) -> Option<ClusterMember> {
self.members.remove(identity)
}
pub fn members(&self) -> impl Iterator<Item = &ClusterMember> {
self.members.values()
}
pub fn placement_eligible_members(&self) -> impl Iterator<Item = &ClusterMember> {
self.members().filter(|m| m.is_placement_eligible())
}
pub fn autodetect_candidates(&self) -> impl Iterator<Item = &ClusterMember> {
self.members()
}
pub fn is_autodetect_eligible(&self, identity: &NodeIdentity) -> bool {
self.is_authorized(identity)
}
pub fn len(&self) -> usize {
self.members.len()
}
pub fn is_empty(&self) -> bool {
self.members.is_empty()
}
pub fn data_member_count(&self) -> usize {
self.members().filter(|m| m.kind().holds_data()).count()
}
pub fn assess_baseline(&self) -> BaselineAssessment {
BaselineAssessment::evaluate(self.data_member_count())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct BaselineAssessment {
pub recommended_data_members: usize,
pub data_members: usize,
}
impl BaselineAssessment {
fn evaluate(data_members: usize) -> Self {
Self {
recommended_data_members: RESILIENT_DATA_MEMBER_BASELINE,
data_members,
}
}
pub fn meets_baseline(&self) -> bool {
self.data_members >= self.recommended_data_members
}
pub fn shortfall(&self) -> usize {
self.recommended_data_members
.saturating_sub(self.data_members)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn ident(cn: &str) -> NodeIdentity {
NodeIdentity::from_certificate_subject(cn).unwrap()
}
fn data_member(cn: &str) -> ClusterMember {
ClusterMember::joined_empty(ident(cn), MemberKind::Data)
}
#[test]
fn cluster_id_rejects_empty() {
assert!(ClusterId::new(" ").is_err());
assert_eq!(ClusterId::new(" cluster-x ").unwrap().as_str(), "cluster-x");
}
#[test]
fn member_identity_is_distinct_from_range_ownership() {
let mut m = data_member("CN=node-a");
assert!(!m.holds_user_ranges());
assert_eq!(m.owned_range_count(), 0);
m.assign_ranges(4);
assert!(m.holds_user_ranges());
assert_eq!(m.identity(), &ident("CN=node-a")); }
#[test]
fn data_member_count_excludes_witnesses() {
let cid = ClusterId::new("cluster-x").unwrap();
let catalog = MembershipCatalog::new(
cid,
[
data_member("CN=node-a"),
data_member("CN=node-b"),
ClusterMember::joined_empty(ident("CN=witness"), MemberKind::Witness),
],
);
assert_eq!(catalog.len(), 3);
assert_eq!(catalog.data_member_count(), 2);
}
#[test]
fn three_data_members_meet_resilient_baseline() {
let cid = ClusterId::new("cluster-x").unwrap();
let catalog = MembershipCatalog::new(
cid,
[
data_member("CN=node-a"),
data_member("CN=node-b"),
data_member("CN=node-c"),
],
);
let baseline = catalog.assess_baseline();
assert_eq!(baseline.recommended_data_members, 3);
assert!(baseline.meets_baseline());
assert_eq!(baseline.shortfall(), 0);
}
#[test]
fn two_data_plus_witness_does_not_meet_baseline() {
let cid = ClusterId::new("cluster-x").unwrap();
let catalog = MembershipCatalog::new(
cid,
[
data_member("CN=node-a"),
data_member("CN=node-b"),
ClusterMember::joined_empty(ident("CN=witness"), MemberKind::Witness),
],
);
let baseline = catalog.assess_baseline();
assert!(!baseline.meets_baseline());
assert_eq!(baseline.shortfall(), 1);
}
#[test]
fn admit_is_idempotent_and_preserves_ranges() {
let cid = ClusterId::new("cluster-x").unwrap();
let mut catalog = MembershipCatalog::new(cid, [data_member("CN=node-a")]);
catalog
.member_mut(&ident("CN=node-a"))
.unwrap()
.assign_ranges(3);
let outcome = catalog.admit(data_member("CN=node-a"));
assert_eq!(outcome, AdmissionOutcome::AlreadyMember);
assert_eq!(
catalog
.member(&ident("CN=node-a"))
.unwrap()
.owned_range_count(),
3
);
let outcome = catalog.admit(data_member("CN=node-b"));
assert_eq!(outcome, AdmissionOutcome::Admitted);
assert_eq!(catalog.len(), 2);
}
#[test]
fn autodetect_is_limited_to_authorized_members() {
let cid = ClusterId::new("cluster-x").unwrap();
let catalog = MembershipCatalog::new(cid, [data_member("CN=node-a")]);
assert!(catalog.is_autodetect_eligible(&ident("CN=node-a")));
assert!(!catalog.is_autodetect_eligible(&ident("CN=random-peer")));
assert_eq!(catalog.autodetect_candidates().count(), 1);
}
}