use std::mem;
use protobuf::Message as PbMessage;
use crate::config::Config;
use crate::default_logger;
use crate::eraftpb::{
ConfChange, ConfChangeType, ConfState, Entry, EntryType, HardState, Message, MessageType,
Snapshot,
};
use crate::errors::{Error, Result};
use crate::read_only::ReadState;
use crate::{Raft, SoftState, Status, StatusRef, Storage, INVALID_ID};
use slog::Logger;
#[derive(Debug, Default)]
pub struct Peer {
pub id: u64,
pub context: Option<Vec<u8>>,
}
#[derive(Debug, PartialEq, Copy, Clone)]
pub enum SnapshotStatus {
Finish,
Failure,
}
fn is_local_msg(t: MessageType) -> bool {
match t {
MessageType::MsgHup
| MessageType::MsgBeat
| MessageType::MsgUnreachable
| MessageType::MsgSnapStatus
| MessageType::MsgCheckQuorum => true,
_ => false,
}
}
fn is_response_msg(t: MessageType) -> bool {
match t {
MessageType::MsgAppendResponse
| MessageType::MsgRequestVoteResponse
| MessageType::MsgHeartbeatResponse
| MessageType::MsgUnreachable
| MessageType::MsgRequestPreVoteResponse => true,
_ => false,
}
}
pub fn is_empty_snap(s: &Snapshot) -> bool {
s.get_metadata().index == 0
}
#[derive(Default, Debug, PartialEq)]
pub struct Ready {
ss: Option<SoftState>,
hs: Option<HardState>,
read_states: Vec<ReadState>,
entries: Vec<Entry>,
snapshot: Snapshot,
pub committed_entries: Option<Vec<Entry>>,
pub messages: Vec<Message>,
must_sync: bool,
}
impl Ready {
fn new<T: Storage>(
raft: &mut Raft<T>,
prev_ss: &SoftState,
prev_hs: &HardState,
since_idx: Option<u64>,
) -> Ready {
let mut rd = Ready {
entries: raft.raft_log.unstable_entries().unwrap_or(&[]).to_vec(),
..Default::default()
};
if !raft.msgs.is_empty() {
mem::swap(&mut raft.msgs, &mut rd.messages);
}
rd.committed_entries = Some(
(match since_idx {
None => raft.raft_log.next_entries(),
Some(idx) => raft.raft_log.next_entries_since(idx),
})
.unwrap_or_else(Vec::new),
);
let ss = raft.soft_state();
if &ss != prev_ss {
rd.ss = Some(ss);
}
let hs = raft.hard_state();
if &hs != prev_hs {
if hs.vote != prev_hs.vote || hs.term != prev_hs.term {
rd.must_sync = true;
}
rd.hs = Some(hs);
}
if raft.raft_log.unstable.snapshot.is_some() {
rd.snapshot = raft.raft_log.unstable.snapshot.clone().unwrap();
}
if !raft.read_states.is_empty() {
rd.read_states = raft.read_states.clone();
}
rd
}
#[inline]
pub fn ss(&self) -> Option<&SoftState> {
self.ss.as_ref()
}
#[inline]
pub fn hs(&self) -> Option<&HardState> {
self.hs.as_ref()
}
#[inline]
pub fn read_states(&self) -> &[ReadState] {
&self.read_states
}
#[inline]
pub fn entries(&self) -> &[Entry] {
&self.entries
}
#[inline]
pub fn snapshot(&self) -> &Snapshot {
&self.snapshot
}
#[inline]
pub fn must_sync(&self) -> bool {
self.must_sync
}
}
pub struct RawNode<T: Storage> {
pub raft: Raft<T>,
prev_ss: SoftState,
prev_hs: HardState,
logger: Logger,
}
impl<T: Storage> RawNode<T> {
#[allow(clippy::new_ret_no_self)]
pub fn new(config: &Config, store: T) -> Result<RawNode<T>> {
let logger = default_logger().new(o!());
assert_ne!(config.id, 0, "config.id must not be zero");
let r = Raft::new(config, store)?;
let mut rn = RawNode {
raft: r,
prev_hs: Default::default(),
prev_ss: Default::default(),
logger,
};
rn.prev_hs = rn.raft.hard_state();
rn.prev_ss = rn.raft.soft_state();
info!(rn.logger, "RawNode created with id {id}.", id = rn.raft.id);
Ok(rn)
}
#[inline(always)]
pub fn with_logger(mut self, logger: &Logger) -> Self {
self.logger = logger.new(o!());
self.raft = self.raft.with_logger(logger);
self
}
fn commit_ready(&mut self, rd: Ready) {
if rd.ss.is_some() {
self.prev_ss = rd.ss.unwrap();
}
if let Some(e) = rd.hs {
if e != HardState::default() {
self.prev_hs = e;
}
}
if !rd.entries.is_empty() {
let e = rd.entries.last().unwrap();
self.raft.raft_log.stable_to(e.index, e.term);
}
if rd.snapshot != Snapshot::default() {
self.raft
.raft_log
.stable_snap_to(rd.snapshot.get_metadata().index);
}
if !rd.read_states.is_empty() {
self.raft.read_states.clear();
}
}
fn commit_apply(&mut self, applied: u64) {
self.raft.commit_apply(applied);
}
pub fn tick(&mut self) -> bool {
self.raft.tick()
}
pub fn campaign(&mut self) -> Result<()> {
let mut m = Message::default();
m.set_msg_type(MessageType::MsgHup);
self.raft.step(m)
}
pub fn propose(&mut self, context: Vec<u8>, data: Vec<u8>) -> Result<()> {
let mut m = Message::default();
m.set_msg_type(MessageType::MsgPropose);
m.from = self.raft.id;
let mut e = Entry::default();
e.data = data;
e.context = context;
m.set_entries(vec![e].into());
self.raft.step(m)
}
pub fn ping(&mut self) {
self.raft.ping()
}
#[cfg_attr(feature = "cargo-clippy", allow(clippy::needless_pass_by_value))]
pub fn propose_conf_change(&mut self, context: Vec<u8>, cc: ConfChange) -> Result<()> {
let data = cc.write_to_bytes()?;
let mut m = Message::default();
m.set_msg_type(MessageType::MsgPropose);
let mut e = Entry::default();
e.set_entry_type(EntryType::EntryConfChange);
e.data = data;
e.context = context;
m.set_entries(vec![e].into());
self.raft.step(m)
}
pub fn apply_conf_change(&mut self, cc: &ConfChange) -> Result<ConfState> {
if cc.node_id == INVALID_ID && cc.get_change_type() != ConfChangeType::BeginMembershipChange
{
let mut cs = ConfState::default();
cs.nodes = self.raft.prs().voter_ids().iter().cloned().collect();
cs.learners = self.raft.prs().learner_ids().iter().cloned().collect();
return Ok(cs);
}
let nid = cc.node_id;
match cc.get_change_type() {
ConfChangeType::AddNode => self.raft.add_node(nid)?,
ConfChangeType::AddLearnerNode => self.raft.add_learner(nid)?,
ConfChangeType::RemoveNode => self.raft.remove_node(nid)?,
ConfChangeType::BeginMembershipChange => self.raft.begin_membership_change(cc)?,
ConfChangeType::FinalizeMembershipChange => {
self.raft.mut_prs().finalize_membership_change()?
}
};
Ok(self.raft.prs().configuration().clone().into())
}
pub fn step(&mut self, m: Message) -> Result<()> {
if is_local_msg(m.get_msg_type()) {
return Err(Error::StepLocalMsg);
}
if self.raft.prs().get(m.from).is_some() || !is_response_msg(m.get_msg_type()) {
return self.raft.step(m);
}
Err(Error::StepPeerNotFound)
}
pub fn ready_since(&mut self, applied_idx: u64) -> Ready {
Ready::new(
&mut self.raft,
&self.prev_ss,
&self.prev_hs,
Some(applied_idx),
)
}
pub fn ready(&mut self) -> Ready {
Ready::new(&mut self.raft, &self.prev_ss, &self.prev_hs, None)
}
pub fn has_ready_since(&self, applied_idx: Option<u64>) -> bool {
let raft = &self.raft;
if !raft.msgs.is_empty() || raft.raft_log.unstable_entries().is_some() {
return true;
}
if !raft.read_states.is_empty() {
return true;
}
if self.get_snap().map_or(false, |s| !is_empty_snap(s)) {
return true;
}
let has_unapplied_entries = match applied_idx {
None => raft.raft_log.has_next_entries(),
Some(idx) => raft.raft_log.has_next_entries_since(idx),
};
if has_unapplied_entries {
return true;
}
if raft.soft_state() != self.prev_ss {
return true;
}
let hs = raft.hard_state();
if hs != HardState::default() && hs != self.prev_hs {
return true;
}
false
}
#[inline]
pub fn has_ready(&self) -> bool {
self.has_ready_since(None)
}
#[inline]
pub fn get_snap(&self) -> Option<&Snapshot> {
self.raft.get_snap()
}
pub fn advance(&mut self, rd: Ready) {
self.advance_append(rd);
let commit_idx = self.prev_hs.commit;
if commit_idx != 0 {
self.advance_apply(commit_idx);
}
}
#[inline]
pub fn advance_append(&mut self, rd: Ready) {
self.commit_ready(rd);
}
#[inline]
pub fn advance_apply(&mut self, applied: u64) {
self.commit_apply(applied);
}
#[inline]
pub fn status(&self) -> Status {
Status::new(&self.raft)
}
pub fn status_ref(&self) -> StatusRef {
StatusRef::new(&self.raft)
}
pub fn report_unreachable(&mut self, id: u64) {
let mut m = Message::default();
m.set_msg_type(MessageType::MsgUnreachable);
m.from = id;
let _ = self.raft.step(m);
}
pub fn report_snapshot(&mut self, id: u64, status: SnapshotStatus) {
let rej = status == SnapshotStatus::Failure;
let mut m = Message::default();
m.set_msg_type(MessageType::MsgSnapStatus);
m.from = id;
m.reject = rej;
let _ = self.raft.step(m);
}
pub fn request_snapshot(&mut self, request_index: u64) -> Result<()> {
self.raft.request_snapshot(request_index)
}
pub fn transfer_leader(&mut self, transferee: u64) {
let mut m = Message::default();
m.set_msg_type(MessageType::MsgTransferLeader);
m.from = transferee;
let _ = self.raft.step(m);
}
pub fn read_index(&mut self, rctx: Vec<u8>) {
let mut m = Message::default();
m.set_msg_type(MessageType::MsgReadIndex);
let mut e = Entry::default();
e.data = rctx;
m.set_entries(vec![e].into());
let _ = self.raft.step(m);
}
#[inline]
pub fn get_store(&self) -> &T {
self.raft.get_store()
}
#[inline]
pub fn mut_store(&mut self) -> &mut T {
self.raft.mut_store()
}
#[inline]
pub fn skip_bcast_commit(&mut self, skip: bool) {
self.raft.skip_bcast_commit(skip)
}
#[inline]
pub fn set_batch_append(&mut self, batch_append: bool) {
self.raft.set_batch_append(batch_append)
}
}
#[cfg(test)]
mod test {
use crate::eraftpb::MessageType;
use super::is_local_msg;
#[test]
fn test_is_local_msg() {
let tests = vec![
(MessageType::MsgHup, true),
(MessageType::MsgBeat, true),
(MessageType::MsgUnreachable, true),
(MessageType::MsgSnapStatus, true),
(MessageType::MsgCheckQuorum, true),
(MessageType::MsgPropose, false),
(MessageType::MsgAppend, false),
(MessageType::MsgAppendResponse, false),
(MessageType::MsgRequestVote, false),
(MessageType::MsgRequestVoteResponse, false),
(MessageType::MsgSnapshot, false),
(MessageType::MsgHeartbeat, false),
(MessageType::MsgHeartbeatResponse, false),
(MessageType::MsgTransferLeader, false),
(MessageType::MsgTimeoutNow, false),
(MessageType::MsgReadIndex, false),
(MessageType::MsgReadIndexResp, false),
(MessageType::MsgRequestPreVote, false),
(MessageType::MsgRequestPreVoteResponse, false),
];
for (msg_type, result) in tests {
assert_eq!(is_local_msg(msg_type), result);
}
}
}