use std::collections::BTreeSet;
use std::sync::Arc;
use async_trait::async_trait;
use crate::consensus::{MeshShape, Reason, RoleAssignment};
use crate::membership::{MembershipView, NodeRole};
use crate::policy::{Policy, TargetTopology};
use crate::topology::{NodeId as TopologyNodeId, Role, TopologyReactor, Transition};
use crate::NodeId;
pub struct FormationPolicy {
reactor: Arc<TopologyReactor>,
}
impl FormationPolicy {
#[must_use]
pub fn new(reactor: Arc<TopologyReactor>) -> Self {
Self { reactor }
}
fn to_topology_id(id: &NodeId) -> TopologyNodeId {
TopologyNodeId::new(id.to_hex())
}
fn from_topology_id(tid: &TopologyNodeId) -> Option<NodeId> {
NodeId::from_hex(&tid.0).ok()
}
fn master_role_set() -> BTreeSet<NodeRole> {
let mut s = BTreeSet::new();
s.insert(NodeRole::ApiServer);
s.insert(NodeRole::Etcd);
s.insert(NodeRole::Scheduler);
s.insert(NodeRole::ControllerManager);
s
}
fn current_roles(consensus: &MeshShape, node: &NodeId) -> BTreeSet<NodeRole> {
consensus
.assignments
.get(node)
.cloned()
.unwrap_or_default()
}
}
#[async_trait]
impl Policy for FormationPolicy {
fn name(&self) -> &'static str {
"formation"
}
async fn evaluate(
&self,
membership: &MembershipView,
consensus: &MeshShape,
_target: &TargetTopology,
) -> Vec<RoleAssignment> {
let alive: BTreeSet<NodeId> = membership.members.iter().map(|m| m.node_id).collect();
let known_in_consensus: BTreeSet<NodeId> =
consensus.assignments.keys().copied().collect();
let eligible_topo: Vec<TopologyNodeId> =
alive.iter().map(Self::to_topology_id).collect();
let failed_topo: Vec<TopologyNodeId> = known_in_consensus
.iter()
.filter(|n| !alive.contains(n))
.map(Self::to_topology_id)
.collect();
let transitions = self.reactor.observe_membership(&eligible_topo, &failed_topo);
let mut proposals = Vec::new();
for t in transitions {
match t {
Transition::Admit(_) => {
}
Transition::Promote(tid, role) => {
let Some(node) = Self::from_topology_id(&tid) else { continue };
match role {
Role::Master | Role::Bootstrap => {
proposals.push(RoleAssignment::Promote {
node_id: node,
roles: Self::master_role_set(),
reason: Reason::ReplacingFailed,
});
}
Role::Worker | Role::Observer => {
}
}
}
Transition::Demote(tid) => {
let Some(node) = Self::from_topology_id(&tid) else { continue };
let current = Self::current_roles(consensus, &node);
if current.is_empty() {
continue;
}
proposals.push(RoleAssignment::Demote {
node_id: node,
roles_relinquished: current,
reason: Reason::Operator,
});
}
Transition::Reassign(tid, new_role) => {
let Some(node) = Self::from_topology_id(&tid) else { continue };
let current = Self::current_roles(consensus, &node);
match new_role {
Role::Master | Role::Bootstrap => {
proposals.push(RoleAssignment::Promote {
node_id: node,
roles: Self::master_role_set(),
reason: Reason::Rebalance,
});
}
Role::Worker | Role::Observer => {
if !current.is_empty() {
proposals.push(RoleAssignment::Demote {
node_id: node,
roles_relinquished: current,
reason: Reason::Rebalance,
});
}
}
}
}
Transition::Evict(tid) => {
let Some(node) = Self::from_topology_id(&tid) else { continue };
let current = Self::current_roles(consensus, &node);
if current.is_empty() {
continue;
}
proposals.push(RoleAssignment::Demote {
node_id: node,
roles_relinquished: current,
reason: Reason::ReplacingFailed,
});
}
}
}
proposals
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn id_translation_round_trips() {
let original = NodeId::from_hex("ab").unwrap();
let tid = FormationPolicy::to_topology_id(&original);
let back = FormationPolicy::from_topology_id(&tid).unwrap();
assert_eq!(original, back);
}
#[test]
fn master_role_set_contains_all_four_control_plane_roles() {
let s = FormationPolicy::master_role_set();
assert_eq!(s.len(), 4);
assert!(s.contains(&NodeRole::ApiServer));
assert!(s.contains(&NodeRole::Etcd));
assert!(s.contains(&NodeRole::Scheduler));
assert!(s.contains(&NodeRole::ControllerManager));
}
#[test]
fn policy_name_is_stable() {
let reactor = Arc::new(TopologyReactor::new(Box::new(crate::topology::Pair)));
let p = FormationPolicy::new(reactor);
assert_eq!(p.name(), "formation");
}
#[test]
fn current_roles_returns_empty_for_unknown_node() {
let consensus = MeshShape::default();
let node = NodeId::from_hex("ff").unwrap();
assert!(FormationPolicy::current_roles(&consensus, &node).is_empty());
}
}