pub type MemberId = String;
pub type ControlPlaneLogIndex = u64;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ControlPlaneRole {
Leader,
Follower,
Candidate,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ControlPlaneEntryKind {
MembershipChange,
OwnershipTransition,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ControlPlanePayload(pub Vec<u8>);
impl ControlPlanePayload {
#[must_use]
pub fn as_bytes(&self) -> &[u8] {
&self.0
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ControlPlaneEntry {
MembershipChange(ControlPlanePayload),
OwnershipTransition(ControlPlanePayload),
}
impl ControlPlaneEntry {
#[must_use]
pub fn kind(&self) -> ControlPlaneEntryKind {
match self {
Self::MembershipChange(_) => ControlPlaneEntryKind::MembershipChange,
Self::OwnershipTransition(_) => ControlPlaneEntryKind::OwnershipTransition,
}
}
#[must_use]
pub fn payload(&self) -> &ControlPlanePayload {
match self {
Self::MembershipChange(p) | Self::OwnershipTransition(p) => p,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ProposeRefusal {
NotLeader {
leader: Option<MemberId>,
},
NoQuorum,
}
pub trait ControlPlaneConsensus {
fn role(&self) -> ControlPlaneRole;
fn term(&self) -> u64;
fn leader(&self) -> Option<MemberId>;
fn committed_index(&self) -> ControlPlaneLogIndex;
fn is_leader(&self) -> bool {
self.role() == ControlPlaneRole::Leader
}
fn propose(&mut self, entry: ControlPlaneEntry)
-> Result<ControlPlaneLogIndex, ProposeRefusal>;
}
#[cfg(test)]
mod tests {
use super::*;
fn payload() -> ControlPlanePayload {
ControlPlanePayload(vec![1, 2, 3])
}
#[test]
fn entry_kind_maps_each_variant() {
assert_eq!(
ControlPlaneEntry::MembershipChange(payload()).kind(),
ControlPlaneEntryKind::MembershipChange,
);
assert_eq!(
ControlPlaneEntry::OwnershipTransition(payload()).kind(),
ControlPlaneEntryKind::OwnershipTransition,
);
}
#[test]
fn entry_payload_is_accessible_for_both_kinds() {
let m = ControlPlaneEntry::MembershipChange(payload());
let o = ControlPlaneEntry::OwnershipTransition(payload());
assert_eq!(m.payload().as_bytes(), &[1, 2, 3]);
assert_eq!(o.payload().as_bytes(), &[1, 2, 3]);
}
#[test]
fn entry_set_is_closed_over_control_plane_only() {
let kinds = [
ControlPlaneEntryKind::MembershipChange,
ControlPlaneEntryKind::OwnershipTransition,
];
for kind in kinds {
match kind {
ControlPlaneEntryKind::MembershipChange
| ControlPlaneEntryKind::OwnershipTransition => {}
}
}
}
struct FakeConsensus {
role: ControlPlaneRole,
term: u64,
leader: Option<MemberId>,
committed: ControlPlaneLogIndex,
next_index: ControlPlaneLogIndex,
has_quorum: bool,
}
impl ControlPlaneConsensus for FakeConsensus {
fn role(&self) -> ControlPlaneRole {
self.role
}
fn term(&self) -> u64 {
self.term
}
fn leader(&self) -> Option<MemberId> {
self.leader.clone()
}
fn committed_index(&self) -> ControlPlaneLogIndex {
self.committed
}
fn propose(
&mut self,
_entry: ControlPlaneEntry,
) -> Result<ControlPlaneLogIndex, ProposeRefusal> {
if self.role != ControlPlaneRole::Leader {
return Err(ProposeRefusal::NotLeader {
leader: self.leader.clone(),
});
}
if !self.has_quorum {
return Err(ProposeRefusal::NoQuorum);
}
let idx = self.next_index;
self.next_index += 1;
Ok(idx)
}
}
#[test]
fn follower_propose_is_refused_with_leader_hint() {
let mut node = FakeConsensus {
role: ControlPlaneRole::Follower,
term: 7,
leader: Some("n1".to_string()),
committed: 42,
next_index: 43,
has_quorum: true,
};
assert!(!node.is_leader());
assert_eq!(node.term(), 7);
assert_eq!(node.committed_index(), 42);
let refusal = node
.propose(ControlPlaneEntry::OwnershipTransition(payload()))
.unwrap_err();
assert_eq!(
refusal,
ProposeRefusal::NotLeader {
leader: Some("n1".to_string())
}
);
}
#[test]
fn leader_propose_assigns_increasing_indexes() {
let mut node = FakeConsensus {
role: ControlPlaneRole::Leader,
term: 9,
leader: Some("self".to_string()),
committed: 10,
next_index: 11,
has_quorum: true,
};
assert!(node.is_leader());
let a = node
.propose(ControlPlaneEntry::OwnershipTransition(payload()))
.unwrap();
let b = node
.propose(ControlPlaneEntry::MembershipChange(payload()))
.unwrap();
assert_eq!(a, 11);
assert_eq!(b, 12);
}
#[test]
fn leader_without_quorum_self_fences() {
let mut node = FakeConsensus {
role: ControlPlaneRole::Leader,
term: 9,
leader: Some("self".to_string()),
committed: 10,
next_index: 11,
has_quorum: false,
};
let refusal = node
.propose(ControlPlaneEntry::MembershipChange(payload()))
.unwrap_err();
assert_eq!(refusal, ProposeRefusal::NoQuorum);
}
}