use std::fmt::Display;
use std::time::Duration;
use std::error::Error;
use std::fmt;
use rxqlite_common::MessageResponse;
use serde::{Serialize, Deserialize};
use std::collections::{
btree_map::BTreeMap,
btree_set::BTreeSet,
};
pub type NodeId = u64;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, Default)]
pub struct Node {
pub rpc_addr: String,
pub api_addr: String,
}
impl Display for Node {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Node {{ rpc_addr: {}, api_addr: {} }}",
self.rpc_addr, self.api_addr
)
}
}
pub trait TryAsRef<T> {
fn try_as_ref(&self) -> Option<&T>;
}
#[derive(Debug, Clone,Copy, PartialEq, Eq)]
#[derive(serde::Deserialize, serde::Serialize)]
pub enum RPCTypes {
Vote,
AppendEntries,
InstallSnapshot,
}
impl fmt::Display for RPCTypes {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[derive(serde::Deserialize, serde::Serialize)]
#[error("infallible")]
pub enum Infallible {}
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[derive(serde::Deserialize, serde::Serialize)]
#[serde(bound = "")]
#[error("timeout after {timeout:?} when {action} {id}->{target}")]
pub struct Timeout {
pub action: RPCTypes,
pub id: NodeId,
pub target: NodeId,
pub timeout: Duration,
}
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[derive(serde::Deserialize, serde::Serialize)]
#[error("error occur on remote peer {target}: {source}")]
pub struct RemoteError<T: Error> {
#[serde(bound = "")]
pub target: NodeId,
#[serde(bound = "")]
pub target_node: Option<Node>,
pub source: T,
}
impl<T: Error> RemoteError<T> {
pub fn new(target: NodeId, e: T) -> Self {
Self {
target,
target_node: None,
source: e,
}
}
pub fn new_with_node(target: NodeId, node: Node, e: T) -> Self {
Self {
target,
target_node: Some(node),
source: e,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[derive(serde::Serialize, serde::Deserialize)]
#[serde(bound(serialize = "E: serde::Serialize"))]
#[serde(bound(deserialize = "E: for <'d> serde::Deserialize<'d>"))]
pub enum RPCError<E: Error> {
Timeout(#[from] Timeout),
Unreachable,
PayloadTooLarge,
Network,
#[error(transparent)]
RemoteError(#[from] RemoteError<E>),
}
impl<E: Error> Display for RPCError<E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Timeout(to)=>write!(f, "Timeout: {}",to),
Self::Unreachable=>write!(f, "Unreachable"),
Self::PayloadTooLarge=>write!(f, "PayloadTooLarge"),
Self::Network=>write!(f, "Network"),
Self::RemoteError(err)=>write!(f, "RemoteError: {}",err),
}
}
}
impl<E> RPCError<RaftError<E>>
where
E: Error,
{
pub fn forward_to_leader(&self) -> Option<&ForwardToLeader>
where E: TryAsRef<ForwardToLeader> {
match self {
RPCError::Timeout(_) => None,
RPCError::Unreachable => None,
RPCError::PayloadTooLarge => None,
RPCError::Network => None,
RPCError::RemoteError(remote_err) => remote_err.source.forward_to_leader(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[derive(serde::Deserialize, serde::Serialize)]
pub enum RaftError<E :Error = Infallible>
{
#[error(transparent)]
APIError(E),
#[serde(bound = "")]
#[error(transparent)]
Fatal(Fatal),
}
impl<E :Error> RaftError<E> {
pub fn forward_to_leader(&self) -> Option<&ForwardToLeader>
where E: TryAsRef<ForwardToLeader>,
{
match self {
RaftError::APIError(api_err) => api_err.try_as_ref(),
RaftError::Fatal(_) => None,
}
}
pub fn into_forward_to_leader(self) -> Option<ForwardToLeader>
where E: TryInto<ForwardToLeader>,
{
match self {
RaftError::APIError(api_err) => api_err.try_into().ok(),
RaftError::Fatal(_) => None,
}
}
}
#[derive(serde::Deserialize, serde::Serialize)]
pub struct ClientWriteResponse<Resp = MessageResponse>
{
pub log_id: LogId,
pub data: Option<Resp>,
pub membership: Option<Membership>,
}
#[derive(Debug, Clone,Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq , PartialOrd, Ord)]
pub struct LeaderId {
pub term: u64,
pub node_id: NodeId,
}
pub type CommittedLeaderId = LeaderId;
#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub struct LogId {
pub leader_id: CommittedLeaderId,
pub index: u64,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct Vote {
pub leader_id: LeaderId,
pub committed: bool,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub enum ServerState {
Learner,
Follower,
Candidate,
Leader,
Shutdown,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct Membership
{
pub configs: Vec<BTreeSet<NodeId>>,
pub nodes: BTreeMap<NodeId, Node>,
}
impl Membership {
pub fn is_voter(&self, node_id: &NodeId) -> bool {
for c in self.configs.iter() {
if c.contains(node_id) {
return true;
}
}
false
}
pub fn voter_ids(&self) -> impl Iterator<Item = NodeId> + '_ {
self.nodes.keys().filter(|x| self.is_voter(x)).copied()
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct StoredMembership
{
pub log_id: Option<LogId>,
pub membership: Membership,
}
impl StoredMembership {
pub fn voter_ids(&self) -> impl Iterator<Item = NodeId> + '_ {
self.membership.voter_ids()
}
}
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[derive(serde::Deserialize, serde::Serialize)]
#[serde(bound = "")]
pub enum Fatal
{
#[error("storage error")]
StorageError,
#[error("panicked")]
Panicked,
#[error("raft stopped")]
Stopped,
}
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[derive(serde::Deserialize, serde::Serialize)]
#[serde(bound = "")]
#[error("has to forward request to: {leader_id:?}, {leader_node:?}")]
pub struct ForwardToLeader {
pub leader_id: Option<NodeId>,
pub leader_node: Option<Node>,
}
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[derive(serde::Deserialize, serde::Serialize)]
#[serde(bound = "")]
#[error("the cluster is already undergoing a configuration change at log {membership_log_id:?}, last committed membership log id: {committed:?}")]
pub struct InProgress {
pub committed: Option<LogId>,
pub membership_log_id: Option<LogId>,
}
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[derive(serde::Deserialize, serde::Serialize)]
#[error("new membership can not be empty")]
pub struct EmptyMembership {}
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[derive(serde::Deserialize, serde::Serialize)]
#[serde(bound = "")]
#[error("Learner {node_id} not found: add it as learner before adding it as a voter")]
pub struct LearnerNotFound{
pub node_id: NodeId,
}
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[derive(serde::Deserialize, serde::Serialize)]
#[serde(bound = "")]
pub enum ChangeMembershipError {
#[error(transparent)]
InProgress(#[from] InProgress),
#[error(transparent)]
EmptyMembership(#[from] EmptyMembership),
#[error(transparent)]
LearnerNotFound(#[from] LearnerNotFound),
}
#[derive(Debug, Clone, thiserror::Error, derive_more::TryInto)]
#[derive(PartialEq, Eq)]
#[derive(serde::Deserialize, serde::Serialize)]
#[serde(bound = "")]
pub enum ClientWriteError
{
#[error(transparent)]
ForwardToLeader(#[from] ForwardToLeader),
#[error(transparent)]
ChangeMembershipError(#[from] ChangeMembershipError),
}
impl TryAsRef<ForwardToLeader> for ClientWriteError
{
fn try_as_ref(&self) -> Option<&ForwardToLeader> {
match self {
Self::ForwardToLeader(f) => Some(f),
_ => None,
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct RaftMetrics {
pub running_state: Result<(), Fatal>,
pub id: NodeId,
pub current_term: u64,
pub vote: Vote,
pub last_log_index: Option<u64>,
pub last_applied: Option<LogId>,
pub snapshot: Option<LogId>,
pub purged: Option<LogId>,
pub state: ServerState,
pub current_leader: Option<NodeId>,
pub millis_since_quorum_ack: Option<u64>,
pub membership_config: StoredMembership,
}
#[derive(Serialize, Deserialize)]
pub enum NotificationRequest {
Register,
Unregister,
}
#[derive(Serialize, Deserialize)]
pub enum NotificationEvent {
Notification(rxqlite_notification::Notification),
}
pub type RXQLiteError = anyhow::Error;