use std::fmt;
use crate::LogIdOptionExt;
use crate::RaftTypeConfig;
use crate::base::shared_id_generator::SharedIdGenerator;
use crate::display_ext::DisplayInstantExt;
use crate::engine::leader_log_ids::LeaderLogIds;
use crate::progress::Progress;
use crate::progress::VecProgress;
use crate::progress::entry::ProgressEntry;
use crate::progress::stream_id::StreamId;
use crate::quorum::QuorumSet;
use crate::type_config::TypeConfigExt;
use crate::type_config::alias::CommittedLeaderIdOf;
use crate::type_config::alias::CommittedVoteOf;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::LogIdOf;
use crate::vote::raft_vote::RaftVoteExt;
#[derive(Clone, Debug)]
#[derive(PartialEq, Eq)]
pub(crate) struct Leader<C, QS: QuorumSet<C::NodeId>>
where C: RaftTypeConfig
{
pub(crate) transfer_to: Option<C::NodeId>,
pub(crate) committed_vote: CommittedVoteOf<C>,
pub(crate) next_heartbeat: InstantOf<C>,
last_log_id: Option<LogIdOf<C>>,
pub(crate) noop_log_id: LogIdOf<C>,
pub(crate) progress: VecProgress<C::NodeId, ProgressEntry<C>, Option<LogIdOf<C>>, QS>,
pub(crate) clock_progress: VecProgress<C::NodeId, Option<InstantOf<C>>, Option<InstantOf<C>>, QS>,
}
impl<C, QS> Leader<C, QS>
where
C: RaftTypeConfig,
QS: QuorumSet<C::NodeId> + Clone + fmt::Debug + 'static,
{
#[allow(clippy::clone_on_copy)]
pub(crate) fn new(
vote: CommittedVoteOf<C>,
quorum_set: QS,
learner_ids: impl IntoIterator<Item = C::NodeId>,
last_leader_log_ids: Option<LeaderLogIds<CommittedLeaderIdOf<C>>>,
id_gen: SharedIdGenerator,
) -> Self {
let cl_id = vote.committed_leader_id();
if let Some(ref log_ids) = last_leader_log_ids {
debug_assert!(
Some(&cl_id) >= Some(log_ids.last_ref().committed_leader_id()),
"vote {} must GE last_leader_log_ids.last_log_id() {:?}",
vote,
last_leader_log_ids
);
debug_assert!(
Some(&cl_id) >= Some(log_ids.first_ref().committed_leader_id()),
"vote {} must GE last_leader_log_ids.first_log_id() {:?}",
vote,
last_leader_log_ids
);
}
let learner_ids = learner_ids.into_iter().collect::<Vec<_>>();
let first_ref = last_leader_log_ids.as_ref().map(|x| x.first_ref());
let last_ref = last_leader_log_ids.as_ref().map(|x| x.last_ref());
let noop_log_id = if first_ref.as_ref().map(|x| x.committed_leader_id()) == Some(&cl_id) {
first_ref.unwrap().into_log_id()
} else {
LogIdOf::<C>::new(cl_id, last_ref.next_index())
};
let last_log_id = last_ref.map(|r| r.into_log_id());
Self {
transfer_to: None,
committed_vote: vote,
next_heartbeat: C::now(),
last_log_id: last_log_id.clone(),
noop_log_id,
progress: VecProgress::new(quorum_set.clone(), learner_ids.iter().cloned(), || {
let stream_id = StreamId::new(id_gen.next_id());
ProgressEntry::empty(stream_id, last_log_id.next_index())
}),
clock_progress: VecProgress::new(quorum_set, learner_ids, || None),
}
}
pub(crate) fn noop_log_id(&self) -> &LogIdOf<C> {
&self.noop_log_id
}
pub(crate) fn last_log_id(&self) -> Option<&LogIdOf<C>> {
self.last_log_id.as_ref()
}
pub(crate) fn committed_vote_ref(&self) -> &CommittedVoteOf<C> {
&self.committed_vote
}
pub(crate) fn mark_transfer(&mut self, to: C::NodeId) {
self.transfer_to = Some(to);
}
pub(crate) fn get_transfer_to(&self) -> Option<&C::NodeId> {
self.transfer_to.as_ref()
}
pub(crate) fn assign_log_ids(&mut self, count: usize) -> Option<LeaderLogIds<CommittedLeaderIdOf<C>>> {
debug_assert!(self.transfer_to.is_none(), "leader is disabled to propose new log");
if count == 0 {
return None;
}
let committed_leader_id = self.committed_vote.committed_leader_id();
let first = self.last_log_id().next_index();
let last = first + count as u64 - 1;
self.last_log_id = Some(LogIdOf::<C>::new(committed_leader_id.clone(), last));
Some(LeaderLogIds::new(committed_leader_id, first, last))
}
pub(crate) fn last_quorum_acked_time(&mut self) -> Option<InstantOf<C>> {
let node_id = self.committed_vote.to_leader_node_id();
let now = C::now();
tracing::debug!(
"{}: update with leader's local time, before retrieving quorum acked clock: leader_id: {}, now: {}",
func_name!(),
node_id,
now.display()
);
let granted = self.clock_progress.increase_to(&node_id, Some(now));
match granted {
Ok(x) => *x,
Err(x) => *x,
}
}
pub(crate) fn is_replication_stream_valid(&self, target: &C::NodeId, stream_id: StreamId) -> bool {
if let Some(prog_ent) = self.progress.try_get(target)
&& prog_ent.stream_id == stream_id
{
return true;
}
tracing::warn!(
"{}: target node {} stream_id:{} not found in progress tracker. It may be from a delayed message, ignore",
func_name!(),
target,
stream_id,
);
false
}
}
#[cfg(test)]
mod tests {
use crate::Vote;
use crate::base::shared_id_generator::SharedIdGenerator;
use crate::engine::leader_log_ids::LeaderLogIds;
use crate::engine::testing::UTConfig;
use crate::engine::testing::log_id;
use crate::progress::Progress;
use crate::proposer::Leader;
use crate::type_config::TypeConfigExt;
use crate::vote::raft_vote::RaftVoteExt;
#[test]
fn test_leader_new_with_proposed_log_id() {
tracing::info!("--- vote greater than last log id, create new noop_log_id");
{
let vote = Vote::new(2, 2).into_committed();
let leader = Leader::<UTConfig, _>::new(
vote,
vec![1, 2, 3],
vec![],
Some(LeaderLogIds::new(*log_id(1, 2, 0).committed_leader_id(), 1, 3)),
SharedIdGenerator::new(),
);
assert_eq!(leader.noop_log_id(), &log_id(2, 2, 4));
assert_eq!(leader.last_log_id(), Some(&log_id(1, 2, 3)));
}
tracing::info!("--- vote equals last log id, reuse noop_log_id");
{
let vote = Vote::new(1, 2).into_committed();
let leader = Leader::<UTConfig, _>::new(
vote,
vec![1, 2, 3],
vec![],
Some(LeaderLogIds::new(*log_id(1, 2, 0).committed_leader_id(), 1, 3)),
SharedIdGenerator::new(),
);
assert_eq!(leader.noop_log_id(), &log_id(1, 2, 1));
assert_eq!(leader.last_log_id(), Some(&log_id(1, 2, 3)));
}
tracing::info!("--- vote equals last log id, reuse noop_log_id, last_leader_log_id.len()==1");
{
let vote = Vote::new(1, 2).into_committed();
let leader = Leader::<UTConfig, _>::new(
vote,
vec![1, 2, 3],
vec![],
Some(LeaderLogIds::new_single(log_id(1, 2, 3))),
SharedIdGenerator::new(),
);
assert_eq!(leader.noop_log_id(), &log_id(1, 2, 3));
assert_eq!(leader.last_log_id(), Some(&log_id(1, 2, 3)));
}
tracing::info!("--- no last log ids, create new noop_log_id, last_leader_log_id.len()==0");
{
let vote = Vote::new(1, 2).into_committed();
let leader = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], None, SharedIdGenerator::new());
assert_eq!(leader.noop_log_id(), &log_id(1, 2, 0));
assert_eq!(leader.last_log_id(), None);
}
}
#[test]
fn test_leader_established() {
let vote = Vote::new(2, 2).into_committed();
let mut leader = Leader::<UTConfig, _>::new(
vote,
vec![1, 2, 3],
vec![],
Some(LeaderLogIds::new_single(log_id(1, 2, 3))),
SharedIdGenerator::new(),
);
let log_ids: Vec<_> = leader.assign_log_ids(1).unwrap().into_iter().collect();
assert_eq!(
log_ids,
vec![log_id(2, 2, 4)],
"entry log id assigned following last-log-id"
);
assert_eq!(Some(log_id(2, 2, 4)), leader.last_log_id);
}
#[test]
fn test_1_entry_none_last_log_id() {
let vote = Vote::new(0, 0).into_committed();
let mut leading = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], None, SharedIdGenerator::new());
let log_ids: Vec<_> = leading.assign_log_ids(1).unwrap().into_iter().collect();
assert_eq!(log_ids, vec![log_id(0, 0, 0)]);
assert_eq!(Some(log_id(0, 0, 0)), leading.last_log_id);
}
#[test]
fn test_no_entries_provided() {
let vote = Vote::new(2, 2).into_committed();
let mut leading = Leader::<UTConfig, _>::new(
vote,
vec![1, 2, 3],
vec![],
Some(LeaderLogIds::new_single(log_id(1, 1, 8))),
SharedIdGenerator::new(),
);
let log_ids = leading.assign_log_ids(0);
assert_eq!(log_ids, None);
assert_eq!(Some(log_id(1, 1, 8)), leading.last_log_id);
}
#[test]
fn test_multiple_entries() {
let vote = Vote::new(2, 2).into_committed();
let mut leading = Leader::<UTConfig, _>::new(
vote,
vec![1, 2, 3],
[],
Some(LeaderLogIds::new_single(log_id(1, 1, 8))),
SharedIdGenerator::new(),
);
let log_ids: Vec<_> = leading.assign_log_ids(3).unwrap().into_iter().collect();
assert_eq!(log_ids, vec![log_id(2, 2, 9), log_id(2, 2, 10), log_id(2, 2, 11)]);
assert_eq!(Some(log_id(2, 2, 11)), leading.last_log_id);
}
#[test]
fn test_leading_last_quorum_acked_time_leader_is_voter() {
let mut leading = Leader::<UTConfig, Vec<u64>>::new(
Vote::new(2, 1).into_committed(),
vec![1, 2, 3],
[4],
None,
SharedIdGenerator::new(),
);
let now1 = UTConfig::<()>::now();
let _t2 = leading.clock_progress.increase_to(&2, Some(now1));
let t1 = leading.last_quorum_acked_time();
assert_eq!(Some(now1), t1, "n1(leader) and n2 acked, t1 > t2");
}
#[test]
fn test_leading_last_quorum_acked_time_leader_is_learner() {
let mut leading = Leader::<UTConfig, Vec<u64>>::new(
Vote::new(2, 4).into_committed(),
vec![1, 2, 3],
[4],
None,
SharedIdGenerator::new(),
);
let t2 = UTConfig::<()>::now();
leading.clock_progress.increase_to(&2, Some(t2)).ok();
let t = leading.last_quorum_acked_time();
assert!(t.is_none(), "n1(leader+learner) does not count in quorum");
let t3 = UTConfig::<()>::now();
leading.clock_progress.increase_to(&3, Some(t3)).ok();
let t = leading.last_quorum_acked_time();
assert_eq!(Some(t2), t, "n2 and n3 acked");
}
#[test]
fn test_leading_last_quorum_acked_time_leader_is_not_member() {
let mut leading = Leader::<UTConfig, Vec<u64>>::new(
Vote::new(2, 5).into_committed(),
vec![1, 2, 3],
[4],
None,
SharedIdGenerator::new(),
);
let t2 = UTConfig::<()>::now();
leading.clock_progress.increase_to(&2, Some(t2)).ok();
let t = leading.last_quorum_acked_time();
assert!(t.is_none(), "n1(leader+learner) does not count in quorum");
let t3 = UTConfig::<()>::now();
leading.clock_progress.increase_to(&3, Some(t3)).ok();
let t = leading.last_quorum_acked_time();
assert_eq!(Some(t2), t, "n2 and n3 acked");
}
}