use super::identity::ClusterVoterIdentity;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub struct ControlPlaneTerm(pub u64);
impl ControlPlaneTerm {
pub const GENESIS: ControlPlaneTerm = ControlPlaneTerm(0);
pub fn next(self) -> ControlPlaneTerm {
ControlPlaneTerm(self.0 + 1)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ControlPlaneIndex(pub u64);
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DurableVoteState {
pub term: ControlPlaneTerm,
pub voted_for: Option<ClusterVoterIdentity>,
}
impl DurableVoteState {
pub fn initial() -> Self {
Self {
term: ControlPlaneTerm::GENESIS,
voted_for: None,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct OwnershipTransition {
pub range: String,
pub new_owner: ClusterVoterIdentity,
pub ownership_epoch: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum MembershipChange {
Admit(ClusterVoterIdentity),
Remove(ClusterVoterIdentity),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ControlPlaneEntry {
MembershipChange(MembershipChange),
OwnershipTransition(OwnershipTransition),
LeaderConfiguration {
term: ControlPlaneTerm,
leader: ClusterVoterIdentity,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ControlPlaneError {
NotLeader {
leader: Option<ClusterVoterIdentity>,
},
}
impl std::fmt::Display for ControlPlaneError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::NotLeader { leader: Some(l) } => {
write!(f, "not the control-plane leader; current leader is {l}")
}
Self::NotLeader { leader: None } => {
write!(f, "not the control-plane leader; no leader currently known")
}
}
}
}
impl std::error::Error for ControlPlaneError {}
pub trait ControlPlaneConsensus {
fn current_term(&self) -> ControlPlaneTerm;
fn leader(&self) -> Option<ClusterVoterIdentity>;
fn is_leader(&self) -> bool;
fn durable_vote(&self) -> DurableVoteState;
fn commit_index(&self) -> Option<ControlPlaneIndex>;
fn append(&mut self, entry: ControlPlaneEntry) -> Result<ControlPlaneIndex, ControlPlaneError>;
}
#[derive(Debug)]
pub struct SingleNodeControlPlane {
identity: ClusterVoterIdentity,
term: ControlPlaneTerm,
is_leader: bool,
vote: DurableVoteState,
log: Vec<ControlPlaneEntry>,
}
impl SingleNodeControlPlane {
pub fn bootstrap_leader(identity: ClusterVoterIdentity) -> Self {
let term = ControlPlaneTerm::GENESIS.next();
Self {
term,
is_leader: true,
vote: DurableVoteState {
term,
voted_for: Some(identity.clone()),
},
identity,
log: Vec::new(),
}
}
pub fn entries(&self) -> &[ControlPlaneEntry] {
&self.log
}
}
impl ControlPlaneConsensus for SingleNodeControlPlane {
fn current_term(&self) -> ControlPlaneTerm {
self.term
}
fn leader(&self) -> Option<ClusterVoterIdentity> {
self.is_leader.then(|| self.identity.clone())
}
fn is_leader(&self) -> bool {
self.is_leader
}
fn durable_vote(&self) -> DurableVoteState {
self.vote.clone()
}
fn commit_index(&self) -> Option<ControlPlaneIndex> {
self.log
.len()
.checked_sub(1)
.map(|i| ControlPlaneIndex(i as u64))
}
fn append(&mut self, entry: ControlPlaneEntry) -> Result<ControlPlaneIndex, ControlPlaneError> {
if !self.is_leader {
return Err(ControlPlaneError::NotLeader {
leader: self.leader(),
});
}
self.log.push(entry);
Ok(ControlPlaneIndex(self.log.len() as u64 - 1))
}
}
#[cfg(test)]
mod tests {
use super::*;
fn voter(cn: &str) -> ClusterVoterIdentity {
ClusterVoterIdentity::from_certificate_subject(cn).unwrap()
}
#[test]
fn bootstrap_leader_elects_itself_in_term_one() {
let id = voter("CN=node-a");
let cp = SingleNodeControlPlane::bootstrap_leader(id.clone());
assert_eq!(cp.current_term(), ControlPlaneTerm(1));
assert!(cp.is_leader());
assert_eq!(cp.leader(), Some(id.clone()));
assert_eq!(
cp.durable_vote(),
DurableVoteState {
term: ControlPlaneTerm(1),
voted_for: Some(id),
}
);
assert_eq!(cp.commit_index(), None);
}
#[test]
fn leader_appends_ownership_transition_and_commit_index_advances() {
let mut cp = SingleNodeControlPlane::bootstrap_leader(voter("CN=node-a"));
let idx = cp
.append(ControlPlaneEntry::OwnershipTransition(
OwnershipTransition {
range: "users:[0,1000)".to_string(),
new_owner: voter("CN=node-b"),
ownership_epoch: 7,
},
))
.expect("leader may append");
assert_eq!(idx, ControlPlaneIndex(0));
assert_eq!(cp.commit_index(), Some(ControlPlaneIndex(0)));
assert_eq!(cp.entries().len(), 1);
}
#[test]
fn membership_and_leader_config_entries_are_ordered() {
let mut cp = SingleNodeControlPlane::bootstrap_leader(voter("CN=node-a"));
let first = cp
.append(ControlPlaneEntry::LeaderConfiguration {
term: ControlPlaneTerm(1),
leader: voter("CN=node-a"),
})
.unwrap();
let second = cp
.append(ControlPlaneEntry::MembershipChange(
MembershipChange::Admit(voter("CN=node-b")),
))
.unwrap();
assert!(second > first);
assert_eq!(cp.commit_index(), Some(second));
}
#[test]
fn a_follower_may_not_append() {
let mut cp = SingleNodeControlPlane::bootstrap_leader(voter("CN=node-a"));
cp.is_leader = false;
let err = cp
.append(ControlPlaneEntry::MembershipChange(
MembershipChange::Remove(voter("CN=node-b")),
))
.expect_err("a follower must not append");
assert_eq!(err, ControlPlaneError::NotLeader { leader: None });
}
}