use std::fmt;
use crate::display_ext::DisplaySliceExt;
use crate::progress::entry::ProgressEntry;
use crate::progress::Progress;
use crate::progress::VecProgress;
use crate::quorum::QuorumSet;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::LogIdOf;
use crate::Instant;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::RaftLogId;
use crate::RaftTypeConfig;
use crate::Vote;
#[derive(Clone, Debug)]
#[derive(PartialEq, Eq)]
pub(crate) struct Leader<C, QS: QuorumSet<C::NodeId>>
where C: RaftTypeConfig
{
pub(crate) vote: Vote<C::NodeId>,
last_log_id: Option<LogId<C::NodeId>>,
pub(crate) noop_log_id: Option<LogId<C::NodeId>>,
pub(crate) progress: VecProgress<C::NodeId, ProgressEntry<C::NodeId>, Option<LogId<C::NodeId>>, QS>,
pub(crate) clock_progress: VecProgress<C::NodeId, Option<InstantOf<C>>, Option<InstantOf<C>>, QS>,
}
impl<C, QS> Leader<C, QS>
where
C: RaftTypeConfig,
QS: QuorumSet<C::NodeId> + Clone + fmt::Debug + 'static,
{
pub(crate) fn new(
vote: Vote<C::NodeId>,
quorum_set: QS,
learner_ids: impl IntoIterator<Item = C::NodeId>,
last_leader_log_id: &[LogIdOf<C>],
) -> Self {
debug_assert!(vote.is_committed());
debug_assert!(
Some(vote.committed_leader_id().unwrap())
>= last_leader_log_id.last().map(|x| x.committed_leader_id().clone()),
"vote {} must GE last_leader_log_id.last() {}",
vote,
last_leader_log_id.display()
);
debug_assert!(
Some(vote.committed_leader_id().unwrap())
>= last_leader_log_id.first().map(|x| x.committed_leader_id().clone()),
"vote {} must GE last_leader_log_id.first() {}",
vote,
last_leader_log_id.display()
);
let learner_ids = learner_ids.into_iter().collect::<Vec<_>>();
let vote_leader_id = vote.committed_leader_id().unwrap();
let first = last_leader_log_id.first();
let noop_log_id = if first.map(|x| x.committed_leader_id().clone()) == Some(vote_leader_id) {
first.cloned()
} else {
Some(LogId::new(
vote.committed_leader_id().unwrap(),
last_leader_log_id.last().next_index(),
))
};
let last_log_id = last_leader_log_id.last().cloned();
Self {
vote,
last_log_id: last_log_id.clone(),
noop_log_id,
progress: VecProgress::new(
quorum_set.clone(),
learner_ids.iter().cloned(),
ProgressEntry::empty(last_log_id.next_index()),
),
clock_progress: VecProgress::new(quorum_set, learner_ids, None),
}
}
pub(crate) fn noop_log_id(&self) -> Option<&LogIdOf<C>> {
self.noop_log_id.as_ref()
}
pub(crate) fn last_log_id(&self) -> Option<&LogId<C::NodeId>> {
self.last_log_id.as_ref()
}
pub(crate) fn vote_ref(&self) -> &Vote<C::NodeId> {
&self.vote
}
pub(crate) fn assign_log_ids<'a, LID: RaftLogId<C::NodeId> + 'a>(
&mut self,
entries: impl IntoIterator<Item = &'a mut LID>,
) {
debug_assert!(self.vote.is_committed());
let committed_leader_id = self.vote.committed_leader_id().unwrap();
let first = LogId::new(committed_leader_id, self.last_log_id().next_index());
let mut last = first.clone();
for entry in entries {
entry.set_log_id(&last);
tracing::debug!("assign log id: {}", last);
last.index += 1;
}
if last.index > first.index {
last.index -= 1;
self.last_log_id = Some(last);
}
}
pub(crate) fn last_quorum_acked_time(&mut self) -> Option<InstantOf<C>> {
let node_id = self.vote.leader_id().voted_for().unwrap();
let now = Instant::now();
tracing::debug!(
leader_id = display(&node_id),
now = debug(now),
"{}: update with leader's local time, before retrieving quorum acked clock",
func_name!()
);
let granted = self.clock_progress.increase_to(&node_id, Some(now));
match granted {
Ok(x) => *x,
Err(x) => *x,
}
}
}
#[cfg(test)]
mod tests {
use crate::engine::testing::UTConfig;
use crate::entry::RaftEntry;
use crate::progress::Progress;
use crate::proposer::Leader;
use crate::testing::blank_ent;
use crate::testing::log_id;
use crate::type_config::TypeConfigExt;
use crate::Entry;
use crate::RaftLogId;
use crate::Vote;
#[test]
fn test_leader_new_with_proposed_log_id() {
tracing::info!("--- vote greater than last log id, create new noop_log_id");
{
let vote = Vote::new_committed(2, 2);
let leader = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], &[log_id(1, 2, 1), log_id(1, 2, 3)]);
assert_eq!(leader.noop_log_id(), Some(&log_id(2, 2, 4)));
assert_eq!(leader.last_log_id(), Some(&log_id(1, 2, 3)));
}
tracing::info!("--- vote equals last log id, reuse noop_log_id");
{
let vote = Vote::new_committed(1, 2);
let leader = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], &[log_id(1, 2, 1), log_id(1, 2, 3)]);
assert_eq!(leader.noop_log_id(), Some(&log_id(1, 2, 1)));
assert_eq!(leader.last_log_id(), Some(&log_id(1, 2, 3)));
}
tracing::info!("--- vote equals last log id, reuse noop_log_id, last_leader_log_id.len()==1");
{
let vote = Vote::new_committed(1, 2);
let leader = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], &[log_id(1, 2, 3)]);
assert_eq!(leader.noop_log_id(), Some(&log_id(1, 2, 3)));
assert_eq!(leader.last_log_id(), Some(&log_id(1, 2, 3)));
}
tracing::info!("--- no last log ids, create new noop_log_id, last_leader_log_id.len()==0");
{
let vote = Vote::new_committed(1, 2);
let leader = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], &[]);
assert_eq!(leader.noop_log_id(), Some(&log_id(1, 2, 0)));
assert_eq!(leader.last_log_id(), None);
}
}
#[test]
fn test_leader_established() {
let vote = Vote::new_committed(2, 2);
let mut leader = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], &[log_id(1, 2, 3)]);
let mut entries = vec![Entry::<UTConfig>::new_blank(log_id(5, 5, 2))];
leader.assign_log_ids(&mut entries);
assert_eq!(
entries[0].get_log_id(),
&log_id(2, 2, 4),
"entry log id assigned following last-log-id"
);
assert_eq!(Some(log_id(2, 2, 4)), leader.last_log_id);
}
#[test]
fn test_1_entry_none_last_log_id() {
let vote = Vote::new_committed(0, 0);
let mut leading = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], &[]);
let mut entries: Vec<Entry<UTConfig>> = vec![blank_ent(1, 1, 1)];
leading.assign_log_ids(&mut entries);
assert_eq!(entries[0].get_log_id(), &log_id(0, 0, 0),);
assert_eq!(Some(log_id(0, 0, 0)), leading.last_log_id);
}
#[test]
fn test_no_entries_provided() {
let vote = Vote::new_committed(2, 2);
let mut leading = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], &[log_id(1, 1, 8)]);
let mut entries: Vec<Entry<UTConfig>> = vec![];
leading.assign_log_ids(&mut entries);
assert_eq!(Some(log_id(1, 1, 8)), leading.last_log_id);
}
#[test]
fn test_multiple_entries() {
let vote = Vote::new_committed(2, 2);
let mut leading = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], [], &[log_id(1, 1, 8)]);
let mut entries: Vec<Entry<UTConfig>> = vec![blank_ent(1, 1, 1), blank_ent(1, 1, 1), blank_ent(1, 1, 1)];
leading.assign_log_ids(&mut entries);
assert_eq!(entries[0].get_log_id(), &log_id(2, 2, 9));
assert_eq!(entries[1].get_log_id(), &log_id(2, 2, 10));
assert_eq!(entries[2].get_log_id(), &log_id(2, 2, 11));
assert_eq!(Some(log_id(2, 2, 11)), leading.last_log_id);
}
#[test]
fn test_leading_last_quorum_acked_time_leader_is_voter() {
let mut leading = Leader::<UTConfig, Vec<u64>>::new(Vote::new_committed(2, 1), vec![1, 2, 3], [4], &[]);
let now1 = UTConfig::now();
let _t2 = leading.clock_progress.increase_to(&2, Some(now1));
let t1 = leading.last_quorum_acked_time();
assert_eq!(Some(now1), t1, "n1(leader) and n2 acked, t1 > t2");
}
#[test]
fn test_leading_last_quorum_acked_time_leader_is_learner() {
let mut leading = Leader::<UTConfig, Vec<u64>>::new(Vote::new_committed(2, 4), vec![1, 2, 3], [4], &[]);
let t2 = UTConfig::now();
let _ = leading.clock_progress.increase_to(&2, Some(t2));
let t = leading.last_quorum_acked_time();
assert!(t.is_none(), "n1(leader+learner) does not count in quorum");
let t3 = UTConfig::now();
let _ = leading.clock_progress.increase_to(&3, Some(t3));
let t = leading.last_quorum_acked_time();
assert_eq!(Some(t2), t, "n2 and n3 acked");
}
#[test]
fn test_leading_last_quorum_acked_time_leader_is_not_member() {
let mut leading = Leader::<UTConfig, Vec<u64>>::new(Vote::new_committed(2, 5), vec![1, 2, 3], [4], &[]);
let t2 = UTConfig::now();
let _ = leading.clock_progress.increase_to(&2, Some(t2));
let t = leading.last_quorum_acked_time();
assert!(t.is_none(), "n1(leader+learner) does not count in quorum");
let t3 = UTConfig::now();
let _ = leading.clock_progress.increase_to(&3, Some(t3));
let t = leading.last_quorum_acked_time();
assert_eq!(Some(t2), t, "n2 and n3 acked");
}
}