#![forbid(unsafe_code)]
#[macro_use]
extern crate log;
#[macro_use]
mod util;
mod core;
mod error;
mod event;
mod member;
mod metrics;
mod msg;
mod options;
mod rpc;
mod storage;
mod task;
pub use crate::core::State;
pub use crate::error::{Error, Result};
pub use crate::event::{Event, EventHandler};
pub use crate::metrics::{Metrics, MetricsWatcher};
pub use crate::options::{Options, OptionsBuilder, Quorum};
pub use crate::rpc::{HeartbeatRequest, HeartbeatResponse, MoveLeaderRequest, Rpc, VoteRequest, VoteResponse};
pub use crate::storage::{HardState, Storage};
pub use crate::task::{TaskSpawner, Thread};
use crate::core::ElectionCore;
use crate::metrics::metrics_channel;
use crate::msg::Message;
use crossbeam_channel::Sender;
use std::fmt::{Debug, Display};
use std::hash::Hash;
use std::sync::Arc;
pub trait NodeId {
type GroupId: Display + PartialEq;
fn group_id(&self) -> Self::GroupId;
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[repr(u8)]
pub enum VoteResult {
NotGranted = 0,
Granted = 1,
}
impl VoteResult {
#[inline]
pub fn is_granted(self) -> bool {
self == VoteResult::Granted
}
}
pub trait VoteFactor<T: ElectionType> {
fn vote(&self, other: &Self) -> VoteResult;
}
pub trait ElectionType: 'static + Sized + Clone + Debug {
type NodeId: NodeId + Display + Debug + Eq + Hash + Clone + Send;
type VoteFactor: VoteFactor<Self> + Debug + Clone + Send;
type Thread: Thread;
type TaskSpawner: TaskSpawner + Send + Sync;
type Storage: Storage<Self> + Send;
type Rpc: Rpc<Self> + Send + Sync;
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[repr(u8)]
pub enum InitialMode {
Normal = 0,
AsLeader = 1,
AsObserver = 2,
AsCandidate = 3,
}
pub struct Election<T: ElectionType> {
main_thread: Option<T::Thread>,
msg_tx: Sender<Message<T>>,
metrics_watcher: MetricsWatcher<T>,
}
impl<T: ElectionType> Drop for Election<T> {
#[inline]
fn drop(&mut self) {
let _ = self.shutdown();
}
}
impl<T: ElectionType> Election<T> {
#[inline]
pub fn start(
options: Options,
node_id: T::NodeId,
task_spawner: Arc<T::TaskSpawner>,
storage: T::Storage,
rpc: Arc<T::Rpc>,
event_handler: Arc<dyn EventHandler<T>>,
) -> Result<Self> {
let (msg_tx, msg_rx) = crossbeam_channel::bounded(1024);
let (metrics_reporter, metrics_watcher) = metrics_channel();
let election_core = ElectionCore::new(
options,
node_id,
task_spawner,
storage,
rpc,
msg_tx.clone(),
msg_rx,
event_handler,
metrics_reporter,
);
let election_thread = election_core.spawn()?;
Ok(Election {
msg_tx,
main_thread: Some(election_thread),
metrics_watcher,
})
}
#[inline]
fn shutdown(&mut self) -> Result<()> {
let _ = self.msg_tx.send(Message::Shutdown);
if let Some(thread) = self.main_thread.take() {
thread.join();
}
Ok(())
}
#[inline]
pub fn metrics_watcher(&self) -> MetricsWatcher<T> {
self.metrics_watcher.clone()
}
#[inline]
pub fn initialize(&self, members: Vec<T::NodeId>, initial_mode: InitialMode) -> Result<()> {
let (tx, rx) = crossbeam_channel::bounded(1);
self.msg_tx
.send(Message::Initialize {
members,
initial_mode,
tx,
})
.map_err(|e| try_format_error!(ChannelError, "failed to send initialize to message channel: {}", e))?;
rx.recv()
.map_err(|e| try_format_error!(ChannelError, "failed to receive initialize result from channel: {}", e))
.and_then(|res| res)?;
Ok(())
}
#[inline]
pub fn submit_heartbeat(&self, req: HeartbeatRequest<T>) -> Result<HeartbeatResponse<T>> {
let (tx, rx) = crossbeam_channel::bounded(1);
self.msg_tx.send(Message::HeartbeatRequest { req, tx }).map_err(|e| {
try_format_error!(
ChannelError,
"failed to send heartbeat request to message channel: {}",
e
)
})?;
let resp = rx
.recv()
.map_err(|e| try_format_error!(ChannelError, "failed to receive heartbeat response from channel: {}", e))
.and_then(|res| res)?;
Ok(resp)
}
#[inline]
pub fn submit_vote(&self, req: VoteRequest<T>) -> Result<VoteResponse<T>> {
let (tx, rx) = crossbeam_channel::bounded(1);
self.msg_tx
.send(Message::VoteRequest { req, tx })
.map_err(|e| try_format_error!(ChannelError, "failed to send vote request to message channel: {}", e))?;
let resp = rx
.recv()
.map_err(|e| try_format_error!(ChannelError, "failed to receive vote response from channel: {}", e))
.and_then(|res| res)?;
Ok(resp)
}
#[inline]
pub fn update_options(&self, options: Options) -> Result<()> {
let (tx, rx) = crossbeam_channel::bounded(1);
self.msg_tx
.send(Message::UpdateOptions { options, tx })
.map_err(|e| try_format_error!(ChannelError, "failed to send update options to message channel: {}", e))?;
rx.recv()
.map_err(|e| {
try_format_error!(
ChannelError,
"failed to receive update options result from channel: {}",
e
)
})
.and_then(|res| res)?;
Ok(())
}
#[inline]
pub fn move_leader(&self, target_node: T::NodeId) -> Result<()> {
let (tx, rx) = crossbeam_channel::bounded(1);
self.msg_tx
.send(Message::MoveLeader { target_node, tx })
.map_err(|e| try_format_error!(ChannelError, "failed to send move_leader to message channel: {}", e))?;
rx.recv()
.map_err(|e| try_format_error!(ChannelError, "failed to receive move_leader result from channel: {}", e))
.and_then(|res| res)?;
Ok(())
}
#[inline]
pub fn submit_move_leader_request(&self, req: MoveLeaderRequest<T>) -> Result<()> {
let (tx, rx) = crossbeam_channel::bounded(1);
self.msg_tx.send(Message::MoveLeaderRequest { req, tx }).map_err(|e| {
try_format_error!(
ChannelError,
"failed to send move_leader request to message channel: {}",
e
)
})?;
rx.recv()
.map_err(|e| try_format_error!(ChannelError, "failed to receive move_leader result from channel: {}", e))
.and_then(|res| res)?;
Ok(())
}
#[inline]
pub fn step_up_to_leader(&self, increase_term: bool) -> Result<()> {
let (tx, rx) = crossbeam_channel::bounded(1);
self.msg_tx
.send(Message::StepUpToLeader { increase_term, tx })
.map_err(|e| try_format_error!(ChannelError, "failed to send StepUpToLeader to message channel: {}", e))?;
rx.recv()
.map_err(|e| {
try_format_error!(
ChannelError,
"failed to receive StepUpToLeader result from channel: {}",
e
)
})
.and_then(|res| res)?;
Ok(())
}
#[inline]
pub fn step_down_to_follower(&self) -> Result<()> {
let (tx, rx) = crossbeam_channel::bounded(1);
self.msg_tx.send(Message::StepDownToFollower { tx }).map_err(|e| {
try_format_error!(
ChannelError,
"failed to send StepDownToFollower to message channel: {}",
e
)
})?;
rx.recv()
.map_err(|e| {
try_format_error!(
ChannelError,
"failed to receive StepDownToFollower result from channel: {}",
e
)
})
.and_then(|res| res)?;
Ok(())
}
}