1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
//! Consensus or agreeing on some value is a fundamental issue in a distributed system.
//! While there are algorithms like Paxos exists since long back, the complexity of those
//! make implementation complicated.
//!
//! So Raft was designed to solve the problem while keeping the algorithm understandable.
//!
//! Raft tackles the problem in two steps -
//! * Leader Election - Elect a node as a leader on startup or when the existing one fails
//! * Log Replication - Maintain the log consistency among nodes
//!
//! **This crate handles Leader election provided a list of nodes.**
//!
//! For more on Raft [https://raft.github.io](https://raft.github.io).
//!
//! ## Usage
//! *almost-raft* uses a closed loop, the only way to communicate is to use mpsc channel and control
//! messages.
//!
//! First step is to implement `trait Node`.
//! For example - a simple node that uses mpsc channel to communicate with others
//! ```ignore
//! use tokio::sync::mpsc::Sender;
//! use almost_raft::{Message, Node};
//! #[derive(Debug, Clone)]
//! struct NodeMPSC {
//!     id: String,
//!     sender: Sender<Message<NodeMPSC>>,
//! }
//!
//! #[async_trait]
//! impl Node for NodeMPSC {
//!     type NodeType = NodeMPSC;
//!     async fn send_message(&self, msg: Message<Self::NodeType>) {
//!         self.sender.send(msg).await;
//!     }
//!
//!     fn node_id(&self) -> &String {
//!         &self.id
//!     }
//! }
//! ```
//! To initiate [RaftElectionState](crate::election::RaftElectionState)
//! ```ignore
//! use tokio::sync::mpsc;
//! use almost_raft::election::RaftElectionState;
//! let (heartbeat_interval, message_timeout, timeout, max_node, min_node) =
//!     (1000, 20, 5000, 5, 3);
//! let (tx, mut from_raft) = mpsc::channel(10);
//! let self_id = uuid::Uuid::new_v4().to_string();
//! let nodes = vec![]; // we'll add node later
//! let (state, tx_to_raft) = RaftElectionState::init(
//!     self_id,
//!     timeout,
//!     heartbeat_interval,
//!     message_timeout,
//!     nodes,
//!     tx.clone(),
//!     max_node,
//!     min_node,
//! );
//! ```
//!
//! Now we can start the election process using the `state`. But this will not necessarily start the
//! election, it'll wait as long as there isn't enough node (`min_node`).
//!
//! ```ignore
//! use almost_raft::election::raft_election;
//! tokio::spawn(raft_election(state));
//! ```
//!
//! Let's add nodes
//! ```ignore
//! use tokio::sync::mpsc;
//! use almost_raft::Message;
//! let (tx,rx) = mpsc::channel(10);
//! tx_to_raft
//!     .send(Message::ControlAddNode(NodeMPSC {
//!         id: uuid::Uuid::new_v4().to_string(),
//!         sender: tx,
//!     }))
//!     .await;
//! ```
//!
//! Raft will notify through mpsc channel if there's any change in leadership. To receive the event
//! ```ignore
//! // let (tx, mut from_raft) = mpsc::channel(10);
//! // tx was used to initialize RaftElectionState
//! from_raft.recv().await;
//! ```
//!

#![warn(missing_docs)]

/// handles election process
pub mod election;

use async_trait::async_trait;
use serde::{Deserialize, Serialize};

/// States of the node
#[derive(Debug, PartialEq)]
pub enum NodeState {
    /// Initial or the normal state of the node
    Follower,
    /// Node is holding an election and calling for votes
    Candidate,
    /// Node won the election with majority votes and became leader
    Leader,
    /// Node is terminating
    Terminating,
}

/// A Cluster node
#[async_trait]
pub trait Node {
    /// concrete node type
    type NodeType;
    /// send message to the node
    async fn send_message(&self, msg: Message<Self::NodeType>);
    /// unique node identifier
    fn node_id(&self) -> &String;

    /// Provide implementation to get id provided by service discovery provider(e.g. Kubernetes).
    /// By default this function is an alias to [`Self::node_id`]
    fn service_instance_id(&self) -> &String {
        self.node_id()
    }
}

/// Messages to communicate with Raft
#[derive(Debug, Serialize, Deserialize)]
pub enum Message<T> {
    /// Asking for vote from other nodes for term
    RequestVote {
        /// Sender node id
        node_id: String,
        /// raft election term
        term: usize,
    },
    /// Message in response to `Message::RequestVote`
    RequestVoteResponse {
        /// raft election term for which the vote was requested for
        term: usize,
        /// Is voting for the `term`
        vote: bool,
    },
    /// Heartbeat message
    HeartBeat {
        /// Current leader, i.e. message sender's node ID
        leader_node_id: String,
        /// Term of the leader
        term: usize,
    },
    /// Add a new node
    ControlAddNode(T),
    /// Remove an existing node, removing self will cause node termination
    ControlRemoveNode(T),
    /// A leader has been elected or change of existing one
    ControlLeaderChanged(String),
}

#[doc(hidden)]
#[macro_export]
macro_rules! log_error {
    ($result:expr) => {
        if let Err(e) = $result {
            error!("{}", e.to_string());
        }
    };
}

#[cfg(test)]
mod test {}