#![allow(dead_code)]
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use crate::config::{ClusterConfig, ClusterMember, ClusterMode};
use crate::resources::ChangeLog;
pub type NodeId = u64;
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ConsensusResponse {
pub success: bool,
pub message: Option<String>,
}
impl ConsensusResponse {
pub fn ok() -> Self {
Self {
success: true,
message: None,
}
}
pub fn ok_with_message(msg: impl Into<String>) -> Self {
Self {
success: true,
message: Some(msg.into()),
}
}
pub fn error(msg: impl Into<String>) -> Self {
Self {
success: false,
message: Some(msg.into()),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
#[derive(Default)]
pub enum NodeState {
Leader,
#[default]
Follower,
Candidate,
Learner,
Shutdown,
Active,
}
impl std::fmt::Display for NodeState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
NodeState::Leader => write!(f, "leader"),
NodeState::Follower => write!(f, "follower"),
NodeState::Candidate => write!(f, "candidate"),
NodeState::Learner => write!(f, "learner"),
NodeState::Shutdown => write!(f, "shutdown"),
NodeState::Active => write!(f, "active"),
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct ConsensusMetrics {
pub id: NodeId,
pub mode: ClusterMode,
pub can_write: bool,
pub leader_id: Option<NodeId>,
pub state: NodeState,
pub current_term: Option<u64>,
pub last_applied: Option<u64>,
pub committed: Option<u64>,
pub members: Vec<ClusterMember>,
#[serde(skip_serializing_if = "Option::is_none")]
pub extra: Option<serde_json::Value>,
}
#[async_trait]
pub trait Consensus: Send + Sync {
fn node_id(&self) -> NodeId;
fn mode(&self) -> ClusterMode;
async fn initialize(&self) -> anyhow::Result<()>;
async fn can_write(&self) -> bool;
async fn leader_id(&self) -> Option<NodeId>;
async fn propose(&self, changelog: ChangeLog) -> anyhow::Result<ConsensusResponse>;
async fn metrics(&self) -> ConsensusMetrics;
async fn add_node(&self, node_id: NodeId, addr: String) -> anyhow::Result<()>;
async fn remove_node(&self, node_id: NodeId) -> anyhow::Result<()>;
async fn members(&self) -> Vec<ClusterMember>;
async fn shutdown(&self) -> anyhow::Result<()>;
}
pub type ConsensusFactory = Arc<
dyn Fn(ClusterConfig, Arc<crate::storage::ProxyStore>) -> anyhow::Result<Arc<dyn Consensus>>
+ Send
+ Sync,
>;
#[derive(Debug, thiserror::Error)]
pub enum ConsensusError {
#[error("Not leader, current leader is: {0:?}")]
NotLeader(Option<NodeId>),
#[error("Consensus error: {0}")]
Protocol(String),
#[error("Network error: {0}")]
Network(String),
#[error("Storage error: {0}")]
Storage(String),
#[error("Configuration error: {0}")]
Config(String),
#[error("Node not found: {0}")]
NodeNotFound(NodeId),
#[error("Timeout: {0}")]
Timeout(String),
}