use crate::{
Term,
log::{LogEntries, LogIndex, LogPosition},
node::NodeId,
};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum Message {
RequestVoteCall {
header: MessageHeader,
last_position: LogPosition,
},
RequestVoteReply {
header: MessageHeader,
vote_granted: bool,
},
AppendEntriesCall {
header: MessageHeader,
commit_index: LogIndex,
entries: LogEntries,
},
AppendEntriesReply {
header: MessageHeader,
last_position: LogPosition,
},
}
impl Message {
pub fn from(&self) -> NodeId {
match self {
Self::RequestVoteCall { header, .. } => header.from,
Self::RequestVoteReply { header, .. } => header.from,
Self::AppendEntriesCall { header, .. } => header.from,
Self::AppendEntriesReply { header, .. } => header.from,
}
}
pub fn term(&self) -> Term {
match self {
Self::RequestVoteCall { header, .. } => header.term,
Self::RequestVoteReply { header, .. } => header.term,
Self::AppendEntriesCall { header, .. } => header.term,
Self::AppendEntriesReply { header, .. } => header.term,
}
}
pub fn seqno(&self) -> MessageSeqNo {
match self {
Self::RequestVoteCall { header, .. } => header.seqno,
Self::RequestVoteReply { header, .. } => header.seqno,
Self::AppendEntriesCall { header, .. } => header.seqno,
Self::AppendEntriesReply { header, .. } => header.seqno,
}
}
pub(crate) fn request_vote_call(
term: Term,
from: NodeId,
seqno: MessageSeqNo,
last_position: LogPosition,
) -> Self {
Self::RequestVoteCall {
header: MessageHeader { term, from, seqno },
last_position,
}
}
pub(crate) fn request_vote_reply(
term: Term,
from: NodeId,
seqno: MessageSeqNo,
vote_granted: bool,
) -> Self {
Self::RequestVoteReply {
header: MessageHeader { from, term, seqno },
vote_granted,
}
}
pub(crate) fn append_entries_call(
term: Term,
from: NodeId,
commit_index: LogIndex,
seqno: MessageSeqNo,
entries: LogEntries,
) -> Self {
Self::AppendEntriesCall {
header: MessageHeader { from, term, seqno },
commit_index,
entries,
}
}
pub(crate) fn append_entries_reply(
term: Term,
from: NodeId,
seqno: MessageSeqNo,
last_position: LogPosition,
) -> Self {
Self::AppendEntriesReply {
header: MessageHeader { term, from, seqno },
last_position,
}
}
pub(crate) fn merge(&mut self, other: Self) {
debug_assert_eq!(self.from(), other.from());
if other.seqno() <= self.seqno() {
return;
}
debug_assert!(self.term() <= other.term());
let Self::AppendEntriesCall {
header: header0,
commit_index: leader_commit0,
entries: entries0,
} = self
else {
*self = other;
return;
};
let Self::AppendEntriesCall {
header: header1,
commit_index: leader_commit1,
entries: entries1,
} = other
else {
*self = other;
return;
};
*header0 = header1;
*leader_commit0 = leader_commit1;
if entries0.contains(entries1.prev_position()) {
entries0.append(&entries1);
} else {
*entries0 = entries1;
}
}
pub(crate) fn handle_snapshot_installed(&mut self, last_included_position: LogPosition) {
match self {
Message::RequestVoteCall {
header,
last_position,
} => {
header.term = header.term.max(last_included_position.term);
if last_position.index < last_included_position.index {
*last_position = last_included_position;
}
}
Message::RequestVoteReply { header, .. } => {
header.term = header.term.max(last_included_position.term);
}
Message::AppendEntriesCall {
header, entries, ..
} => {
header.term = header.term.max(last_included_position.term);
entries.handle_snapshot_installed(last_included_position);
}
Message::AppendEntriesReply {
header,
last_position,
} => {
header.term = header.term.max(last_included_position.term);
if last_position.index < last_included_position.index {
*last_position = last_included_position;
}
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct MessageHeader {
pub from: NodeId,
pub term: Term,
pub seqno: MessageSeqNo,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct MessageSeqNo(u64);
impl MessageSeqNo {
pub(crate) const ZERO: Self = Self(0);
pub const fn new(seqno: u64) -> Self {
Self(seqno)
}
pub const fn get(self) -> u64 {
self.0
}
}