use std::mem;
use crate::eraftpb::{
ConfChange, ConfChangeType, ConfState, Entry, EntryType, HardState, Message, MessageType,
Snapshot,
};
use prost::Message as ProstMsg;
use super::config::Config;
use super::errors::{Error, Result};
use super::read_only::ReadState;
use super::Status;
use super::Storage;
use super::{Raft, SoftState, INVALID_ID};
#[derive(Debug, Default)]
pub struct Peer {
pub id: u64,
pub context: Option<Vec<u8>>,
}
#[derive(Debug, PartialEq, Copy, Clone)]
pub enum SnapshotStatus {
Finish,
Failure,
}
fn is_local_msg(t: MessageType) -> bool {
match t {
MessageType::MsgHup
| MessageType::MsgBeat
| MessageType::MsgUnreachable
| MessageType::MsgSnapStatus
| MessageType::MsgCheckQuorum => true,
_ => false,
}
}
fn is_response_msg(t: MessageType) -> bool {
match t {
MessageType::MsgAppendResponse
| MessageType::MsgRequestVoteResponse
| MessageType::MsgHeartbeatResponse
| MessageType::MsgUnreachable
| MessageType::MsgRequestPreVoteResponse => true,
_ => false,
}
}
pub fn is_empty_snap(s: &Snapshot) -> bool {
s.get_metadata().get_index() == 0
}
#[derive(Default, Debug, PartialEq)]
pub struct Ready {
pub ss: Option<SoftState>,
pub hs: Option<HardState>,
pub read_states: Vec<ReadState>,
pub entries: Vec<Entry>,
pub snapshot: Snapshot,
pub committed_entries: Option<Vec<Entry>>,
pub messages: Vec<Message>,
pub must_sync: bool,
}
impl Ready {
fn new<T: Storage>(
raft: &mut Raft<T>,
prev_ss: &SoftState,
prev_hs: &HardState,
since_idx: Option<u64>,
) -> Ready {
let mut rd = Ready {
entries: raft.raft_log.unstable_entries().unwrap_or(&[]).to_vec(),
..Default::default()
};
if !raft.msgs.is_empty() {
mem::swap(&mut raft.msgs, &mut rd.messages);
}
rd.committed_entries = Some(
(match since_idx {
None => raft.raft_log.next_entries(),
Some(idx) => raft.raft_log.next_entries_since(idx),
})
.unwrap_or_else(Vec::new),
);
let ss = raft.soft_state();
if &ss != prev_ss {
rd.ss = Some(ss);
}
let hs = raft.hard_state();
if &hs != prev_hs {
if hs.get_vote() != prev_hs.get_vote() || hs.get_term() != prev_hs.get_term() {
rd.must_sync = true;
}
rd.hs = Some(hs);
}
if raft.raft_log.get_unstable().snapshot.is_some() {
rd.snapshot = raft.raft_log.get_unstable().snapshot.clone().unwrap();
}
if !raft.read_states.is_empty() {
rd.read_states = raft.read_states.clone();
}
rd
}
}
pub struct RawNode<T: Storage> {
pub raft: Raft<T>,
prev_ss: SoftState,
prev_hs: HardState,
}
impl<T: Storage> RawNode<T> {
pub fn new(config: &Config, store: T, mut peers: Vec<Peer>) -> Result<RawNode<T>> {
assert_ne!(config.id, 0, "config.id must not be zero");
let r = Raft::new(config, store);
let mut rn = RawNode {
raft: r,
prev_hs: Default::default(),
prev_ss: Default::default(),
};
let last_index = rn.raft.get_store().last_index().expect("");
if last_index == 0 {
rn.raft.become_follower(1, INVALID_ID);
let mut ents = Vec::with_capacity(peers.len());
for (i, peer) in peers.iter_mut().enumerate() {
let mut cc = ConfChange::new_();
cc.set_change_type(ConfChangeType::AddNode);
cc.set_node_id(peer.id);
if let Some(ctx) = peer.context.take() {
cc.set_context(ctx);
}
let data =
protobuf::Message::write_to_bytes(&cc).expect("unexpected marshal error");
let mut e = Entry::new_();
e.set_entry_type(EntryType::EntryConfChange);
e.set_term(1);
e.set_index(i as u64 + 1);
e.set_data(data);
ents.push(e);
}
rn.raft.raft_log.append(&ents);
rn.raft.raft_log.committed = ents.len() as u64;
for peer in peers {
rn.raft.add_node(peer.id);
}
}
rn.prev_ss = rn.raft.soft_state();
if last_index == 0 {
rn.prev_hs = Default::default();
} else {
rn.prev_hs = rn.raft.hard_state();
}
Ok(rn)
}
fn commit_ready(&mut self, rd: Ready) {
if rd.ss.is_some() {
self.prev_ss = rd.ss.unwrap();
}
if let Some(e) = rd.hs {
if e != HardState::new_() {
self.prev_hs = e;
}
}
if !rd.entries.is_empty() {
let e = rd.entries.last().unwrap();
self.raft.raft_log.stable_to(e.get_index(), e.get_term());
}
if rd.snapshot != Snapshot::new_() {
self.raft
.raft_log
.stable_snap_to(rd.snapshot.get_metadata().get_index());
}
if !rd.read_states.is_empty() {
self.raft.read_states.clear();
}
}
fn commit_apply(&mut self, applied: u64) {
self.raft.raft_log.applied_to(applied);
}
pub fn tick(&mut self) -> bool {
self.raft.tick()
}
pub fn campaign(&mut self) -> Result<()> {
let mut m = Message::new_();
m.set_msg_type(MessageType::MsgHup);
self.raft.step(m)
}
pub fn propose(&mut self, context: Vec<u8>, data: Vec<u8>) -> Result<()> {
let mut m = Message::new_();
m.set_msg_type(MessageType::MsgPropose);
m.set_from(self.raft.id);
let mut e = Entry::new_();
e.set_data(data);
e.set_context(context);
m.set_entries(vec![e]);
self.raft.step(m)
}
#[cfg_attr(feature = "cargo-clippy", allow(clippy::needless_pass_by_value))]
pub fn propose_conf_change(&mut self, context: Vec<u8>, cc: ConfChange) -> Result<()> {
let mut data = Vec::with_capacity(ProstMsg::encoded_len(&cc));
cc.encode(&mut data)?;
let mut m = Message::new_();
m.set_msg_type(MessageType::MsgPropose);
let mut e = Entry::new_();
e.set_entry_type(EntryType::EntryConfChange);
e.set_data(data);
e.set_context(context);
m.set_entries(vec![e]);
self.raft.step(m)
}
pub fn apply_conf_change(&mut self, cc: &ConfChange) -> ConfState {
if cc.get_node_id() == INVALID_ID {
let mut cs = ConfState::new_();
cs.set_nodes(self.raft.prs().nodes());
cs.set_learners(self.raft.prs().learner_nodes());
return cs;
}
let nid = cc.get_node_id();
match cc.get_change_type() {
ConfChangeType::AddNode => self.raft.add_node(nid),
ConfChangeType::AddLearnerNode => self.raft.add_learner(nid),
ConfChangeType::RemoveNode => self.raft.remove_node(nid),
}
let mut cs = ConfState::new_();
cs.set_nodes(self.raft.prs().nodes());
cs.set_learners(self.raft.prs().learner_nodes());
cs
}
pub fn step(&mut self, m: Message) -> Result<()> {
if is_local_msg(m.get_msg_type()) {
return Err(Error::StepLocalMsg);
}
if self.raft.prs().get(m.get_from()).is_some() || !is_response_msg(m.get_msg_type()) {
return self.raft.step(m);
}
Err(Error::StepPeerNotFound)
}
pub fn ready_since(&mut self, applied_idx: u64) -> Ready {
Ready::new(
&mut self.raft,
&self.prev_ss,
&self.prev_hs,
Some(applied_idx),
)
}
pub fn ready(&mut self) -> Ready {
Ready::new(&mut self.raft, &self.prev_ss, &self.prev_hs, None)
}
pub fn has_ready_since(&self, applied_idx: Option<u64>) -> bool {
let raft = &self.raft;
if !raft.msgs.is_empty() || raft.raft_log.unstable_entries().is_some() {
return true;
}
if !raft.read_states.is_empty() {
return true;
}
if self.get_snap().map_or(false, |s| !is_empty_snap(s)) {
return true;
}
let has_unapplied_entries = match applied_idx {
None => raft.raft_log.has_next_entries(),
Some(idx) => raft.raft_log.has_next_entries_since(idx),
};
if has_unapplied_entries {
return true;
}
if raft.soft_state() != self.prev_ss {
return true;
}
let hs = raft.hard_state();
if hs != HardState::new_() && hs != self.prev_hs {
return true;
}
false
}
pub fn has_ready(&self) -> bool {
self.has_ready_since(None)
}
#[inline]
pub fn get_snap(&self) -> Option<&Snapshot> {
self.raft.get_snap()
}
pub fn advance(&mut self, rd: Ready) {
self.advance_append(rd);
let commit_idx = self.prev_hs.get_commit();
if commit_idx != 0 {
self.advance_apply(commit_idx);
}
}
pub fn advance_append(&mut self, rd: Ready) {
self.commit_ready(rd);
}
pub fn advance_apply(&mut self, applied: u64) {
self.commit_apply(applied);
}
pub fn status(&self) -> Status {
Status::new(&self.raft)
}
pub fn report_unreachable(&mut self, id: u64) {
let mut m = Message::new_();
m.set_msg_type(MessageType::MsgUnreachable);
m.set_from(id);
let _ = self.raft.step(m);
}
pub fn report_snapshot(&mut self, id: u64, status: SnapshotStatus) {
let rej = status == SnapshotStatus::Failure;
let mut m = Message::new_();
m.set_msg_type(MessageType::MsgSnapStatus);
m.set_from(id);
m.set_reject(rej);
let _ = self.raft.step(m);
}
pub fn transfer_leader(&mut self, transferee: u64) {
let mut m = Message::new_();
m.set_msg_type(MessageType::MsgTransferLeader);
m.set_from(transferee);
let _ = self.raft.step(m);
}
pub fn read_index(&mut self, rctx: Vec<u8>) {
let mut m = Message::new_();
m.set_msg_type(MessageType::MsgReadIndex);
let mut e = Entry::new_();
e.set_data(rctx);
m.set_entries(vec![e]);
let _ = self.raft.step(m);
}
#[inline]
pub fn get_store(&self) -> &T {
self.raft.get_store()
}
#[inline]
pub fn mut_store(&mut self) -> &mut T {
self.raft.mut_store()
}
#[inline]
pub fn skip_bcast_commit(&mut self, skip: bool) {
self.raft.skip_bcast_commit(skip)
}
}
#[cfg(test)]
mod test {
use super::is_local_msg;
use crate::eraftpb::MessageType;
use crate::setup_for_test;
#[test]
fn test_is_local_msg() {
setup_for_test();
let tests = vec![
(MessageType::MsgHup, true),
(MessageType::MsgBeat, true),
(MessageType::MsgUnreachable, true),
(MessageType::MsgSnapStatus, true),
(MessageType::MsgCheckQuorum, true),
(MessageType::MsgPropose, false),
(MessageType::MsgAppend, false),
(MessageType::MsgAppendResponse, false),
(MessageType::MsgRequestVote, false),
(MessageType::MsgRequestVoteResponse, false),
(MessageType::MsgSnapshot, false),
(MessageType::MsgHeartbeat, false),
(MessageType::MsgHeartbeatResponse, false),
(MessageType::MsgTransferLeader, false),
(MessageType::MsgTimeoutNow, false),
(MessageType::MsgReadIndex, false),
(MessageType::MsgReadIndexResp, false),
(MessageType::MsgRequestPreVote, false),
(MessageType::MsgRequestPreVoteResponse, false),
];
for (msg_type, result) in tests {
assert_eq!(is_local_msg(msg_type), result);
}
}
}