use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::sync::Arc;
use openraft::impls::OneshotResponder;
use openraft::{BasicNode, Raft, RaftMetrics, RaftTypeConfig};
use tracing::info;
use crate::config::ConsensusConfig;
use crate::error::{ConsensusError, Result};
use crate::types::NodeId;
pub struct ConsensusNode<C>
where
C: RaftTypeConfig<NodeId = NodeId, Node = BasicNode, Responder = OneshotResponder<C>>,
{
raft: Raft<C>,
node_id: NodeId,
address: String,
}
impl<C> ConsensusNode<C>
where
C: RaftTypeConfig<NodeId = NodeId, Node = BasicNode, Responder = OneshotResponder<C>>,
{
#[must_use]
pub fn from_raft(raft: Raft<C>, node_id: NodeId, address: String) -> Self {
Self {
raft,
node_id,
address,
}
}
#[must_use]
pub fn node_id(&self) -> NodeId {
self.node_id
}
#[must_use]
pub fn address(&self) -> &str {
&self.address
}
#[must_use]
pub fn raft(&self) -> &Raft<C> {
&self.raft
}
#[must_use]
pub fn raft_clone(&self) -> Raft<C> {
self.raft.clone()
}
pub async fn propose(&self, request: C::D) -> Result<C::R> {
let result = self
.raft
.client_write(request)
.await
.map_err(|e| ConsensusError::Write(e.to_string()))?;
Ok(result.data)
}
pub async fn ensure_linearizable(&self) -> Result<()> {
self.raft
.ensure_linearizable()
.await
.map_err(|e| ConsensusError::Write(format!("linearizable check failed: {e}")))?;
Ok(())
}
#[must_use]
pub fn is_leader(&self) -> bool {
let metrics = self.raft.metrics().borrow().clone();
metrics.current_leader == Some(self.node_id)
}
#[must_use]
pub fn leader_id(&self) -> Option<NodeId> {
self.raft.metrics().borrow().current_leader
}
#[must_use]
pub fn voter_ids(&self) -> BTreeSet<NodeId> {
let metrics = self.raft.metrics().borrow().clone();
metrics.membership_config.membership().voter_ids().collect()
}
#[must_use]
pub fn voter_count(&self) -> usize {
self.voter_ids().len()
}
#[must_use]
pub fn learner_ids(&self) -> BTreeSet<NodeId> {
let metrics = self.raft.metrics().borrow().clone();
metrics
.membership_config
.membership()
.learner_ids()
.collect()
}
#[must_use]
pub fn all_member_ids(&self) -> BTreeSet<NodeId> {
let metrics = self.raft.metrics().borrow().clone();
let membership = metrics.membership_config.membership();
let mut ids: BTreeSet<NodeId> = membership.voter_ids().collect();
ids.extend(membership.learner_ids());
ids
}
pub async fn bootstrap(&self) -> Result<()> {
let mut members = BTreeMap::new();
members.insert(
self.node_id,
BasicNode {
addr: self.address.clone(),
},
);
self.raft
.initialize(members)
.await
.map_err(|e| ConsensusError::Init(format!("bootstrap failed: {e}")))?;
info!(node_id = self.node_id, "Bootstrapped single-node cluster");
Ok(())
}
pub async fn add_learner(
&self,
node_id: NodeId,
address: String,
blocking: bool,
) -> Result<()> {
let node = BasicNode { addr: address };
self.raft
.add_learner(node_id, node, blocking)
.await
.map_err(|e| {
ConsensusError::Membership(format!("add_learner({node_id}) failed: {e}"))
})?;
info!(node_id, "Added learner");
Ok(())
}
pub async fn change_membership(&self, voter_ids: BTreeSet<NodeId>, retain: bool) -> Result<()> {
self.raft
.change_membership(voter_ids, retain)
.await
.map_err(|e| ConsensusError::Membership(format!("change_membership failed: {e}")))?;
info!("Membership change committed");
Ok(())
}
pub async fn add_voter(&self, node_id: NodeId, address: String) -> Result<()> {
self.add_learner(node_id, address, true).await?;
let metrics = self.raft.metrics().borrow().clone();
let mut voter_ids = BTreeSet::new();
if let Some(membership) = metrics
.membership_config
.membership()
.get_joint_config()
.last()
{
for id in membership {
voter_ids.insert(*id);
}
}
voter_ids.insert(self.node_id);
voter_ids.insert(node_id);
self.change_membership(voter_ids, false).await?;
info!(node_id, "Promoted learner to voter");
Ok(())
}
#[must_use]
pub fn metrics(&self) -> RaftMetrics<NodeId, BasicNode> {
self.raft.metrics().borrow().clone()
}
pub async fn shutdown(&self) -> Result<()> {
self.raft
.shutdown()
.await
.map_err(|e| ConsensusError::Fatal(format!("shutdown failed: {e}")))?;
info!(node_id = self.node_id, "Raft node shut down");
Ok(())
}
}
pub struct ConsensusNodeBuilder {
node_id: NodeId,
address: String,
config: ConsensusConfig,
}
impl ConsensusNodeBuilder {
#[must_use]
pub fn new(node_id: NodeId, address: String) -> Self {
Self {
node_id,
address,
config: ConsensusConfig::default(),
}
}
#[must_use]
pub fn with_config(mut self, config: ConsensusConfig) -> Self {
self.config = config;
self
}
pub async fn build_with<C, LS, SM, N>(
self,
log_store: LS,
state_machine: SM,
network: N,
) -> Result<ConsensusNode<C>>
where
C: RaftTypeConfig<NodeId = NodeId, Node = BasicNode, Responder = OneshotResponder<C>>,
LS: openraft::storage::RaftLogStorage<C>,
SM: openraft::storage::RaftStateMachine<C>,
N: openraft::network::RaftNetworkFactory<C>,
{
let raft_config = Arc::new(self.config.to_openraft_config()?);
let raft = Raft::new(self.node_id, raft_config, network, log_store, state_machine)
.await
.map_err(|e| ConsensusError::Fatal(format!("Failed to create Raft: {e}")))?;
info!(
node_id = self.node_id,
address = %self.address,
"Created ConsensusNode"
);
Ok(ConsensusNode {
raft,
node_id: self.node_id,
address: self.address,
})
}
}