use super::identity::NodeIdentity;
use super::membership::{
AdmissionOutcome, ClusterId, ClusterMember, MemberKind, MembershipCatalog,
};
use std::collections::BTreeMap;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct JoinRequest {
pub target_cluster: ClusterId,
pub identity: NodeIdentity,
pub kind: MemberKind,
}
impl JoinRequest {
pub fn authenticated(
target_cluster: ClusterId,
identity: NodeIdentity,
kind: MemberKind,
) -> Self {
Self {
target_cluster,
identity,
kind,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum JoinRejection {
WrongCluster {
expected: ClusterId,
presented: ClusterId,
},
UnauthorizedPeer(NodeIdentity),
KindMismatch {
identity: NodeIdentity,
allowed: MemberKind,
requested: MemberKind,
},
}
impl std::fmt::Display for JoinRejection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::WrongCluster {
expected,
presented,
} => write!(
f,
"join targets cluster {presented}, but this seed serves {expected}"
),
Self::UnauthorizedPeer(id) => {
write!(f, "peer {id} is not an authorized cluster member")
}
Self::KindMismatch {
identity,
allowed,
requested,
} => write!(
f,
"peer {identity} is allowed as {allowed:?} but requested {requested:?}"
),
}
}
}
impl std::error::Error for JoinRejection {}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ControlPlaneSnapshot {
pub cluster_id: ClusterId,
pub members: Vec<ClusterMember>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct JoinGrant {
pub outcome: AdmissionOutcome,
pub snapshot: ControlPlaneSnapshot,
}
#[derive(Debug, Clone)]
pub struct SeedAuthority {
allowlist: BTreeMap<NodeIdentity, MemberKind>,
catalog: MembershipCatalog,
}
impl SeedAuthority {
pub fn new(
catalog: MembershipCatalog,
allowlist: impl IntoIterator<Item = (NodeIdentity, MemberKind)>,
) -> Self {
let mut allow: BTreeMap<NodeIdentity, MemberKind> = allowlist.into_iter().collect();
for member in catalog.members() {
allow
.entry(member.identity().clone())
.or_insert(member.kind());
}
Self {
allowlist: allow,
catalog,
}
}
pub fn cluster_id(&self) -> &ClusterId {
self.catalog.cluster_id()
}
pub fn catalog(&self) -> &MembershipCatalog {
&self.catalog
}
pub fn evaluate_join(&mut self, request: JoinRequest) -> Result<JoinGrant, JoinRejection> {
if &request.target_cluster != self.catalog.cluster_id() {
return Err(JoinRejection::WrongCluster {
expected: self.catalog.cluster_id().clone(),
presented: request.target_cluster,
});
}
let allowed_kind = match self.allowlist.get(&request.identity) {
Some(kind) => *kind,
None => return Err(JoinRejection::UnauthorizedPeer(request.identity)),
};
if allowed_kind != request.kind {
return Err(JoinRejection::KindMismatch {
identity: request.identity,
allowed: allowed_kind,
requested: request.kind,
});
}
let member = ClusterMember::joined_empty(request.identity, request.kind);
let outcome = self.catalog.admit(member);
Ok(JoinGrant {
outcome,
snapshot: self.snapshot(),
})
}
fn snapshot(&self) -> ControlPlaneSnapshot {
ControlPlaneSnapshot {
cluster_id: self.catalog.cluster_id().clone(),
members: self.catalog.members().cloned().collect(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn ident(cn: &str) -> NodeIdentity {
NodeIdentity::from_certificate_subject(cn).unwrap()
}
fn cid() -> ClusterId {
ClusterId::new("cluster-prod").unwrap()
}
fn seed_with_pending_node_c() -> SeedAuthority {
let catalog = MembershipCatalog::new(
cid(),
[
ClusterMember::joined_empty(ident("CN=node-a"), MemberKind::Data),
ClusterMember::joined_empty(ident("CN=node-b"), MemberKind::Data),
],
);
SeedAuthority::new(catalog, [(ident("CN=node-c"), MemberKind::Data)])
}
#[test]
fn successful_join_admits_authorized_data_member() {
let mut seed = seed_with_pending_node_c();
let req = JoinRequest::authenticated(cid(), ident("CN=node-c"), MemberKind::Data);
let grant = seed
.evaluate_join(req)
.expect("authorized join should succeed");
assert_eq!(grant.outcome, AdmissionOutcome::Admitted);
assert!(seed.catalog().is_authorized(&ident("CN=node-c")));
assert_eq!(grant.snapshot.cluster_id, cid());
assert_eq!(grant.snapshot.members.len(), 3);
assert!(seed.catalog().assess_baseline().meets_baseline());
}
#[test]
fn joined_data_member_starts_with_no_user_ranges() {
let mut seed = seed_with_pending_node_c();
let req = JoinRequest::authenticated(cid(), ident("CN=node-c"), MemberKind::Data);
seed.evaluate_join(req).unwrap();
let joined = seed.catalog().member(&ident("CN=node-c")).unwrap();
assert!(!joined.holds_user_ranges());
assert_eq!(joined.owned_range_count(), 0);
}
#[test]
fn unauthorized_peer_is_rejected() {
let mut seed = seed_with_pending_node_c();
let req = JoinRequest::authenticated(cid(), ident("CN=node-x"), MemberKind::Data);
let err = seed
.evaluate_join(req)
.expect_err("unknown peer must be rejected");
assert_eq!(err, JoinRejection::UnauthorizedPeer(ident("CN=node-x")));
assert!(!seed.catalog().is_autodetect_eligible(&ident("CN=node-x")));
assert_eq!(seed.catalog().len(), 2);
}
#[test]
fn wrong_cluster_join_is_rejected() {
let mut seed = seed_with_pending_node_c();
let other = ClusterId::new("cluster-staging").unwrap();
let req = JoinRequest::authenticated(other.clone(), ident("CN=node-c"), MemberKind::Data);
let err = seed
.evaluate_join(req)
.expect_err("wrong-cluster join must be rejected");
assert_eq!(
err,
JoinRejection::WrongCluster {
expected: cid(),
presented: other,
}
);
assert!(!seed.catalog().is_authorized(&ident("CN=node-c")));
}
#[test]
fn kind_mismatch_is_rejected() {
let mut seed = seed_with_pending_node_c();
let req = JoinRequest::authenticated(cid(), ident("CN=node-c"), MemberKind::Witness);
let err = seed
.evaluate_join(req)
.expect_err("kind mismatch must be rejected");
assert_eq!(
err,
JoinRejection::KindMismatch {
identity: ident("CN=node-c"),
allowed: MemberKind::Data,
requested: MemberKind::Witness,
}
);
}
#[test]
fn rejoin_is_idempotent() {
let mut seed = seed_with_pending_node_c();
let req = || JoinRequest::authenticated(cid(), ident("CN=node-c"), MemberKind::Data);
let first = seed.evaluate_join(req()).unwrap();
assert_eq!(first.outcome, AdmissionOutcome::Admitted);
let second = seed.evaluate_join(req()).unwrap();
assert_eq!(second.outcome, AdmissionOutcome::AlreadyMember);
assert_eq!(seed.catalog().len(), 3);
}
#[test]
fn autodetect_adopts_only_members_after_join() {
let mut seed = seed_with_pending_node_c();
assert!(!seed.catalog().is_autodetect_eligible(&ident("CN=node-c")));
seed.evaluate_join(JoinRequest::authenticated(
cid(),
ident("CN=node-c"),
MemberKind::Data,
))
.unwrap();
assert!(seed.catalog().is_autodetect_eligible(&ident("CN=node-c")));
assert!(!seed.catalog().is_autodetect_eligible(&ident("CN=stranger")));
}
}