1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
use crate::engine::handler::replication_handler::ReplicationHandler;
use crate::engine::handler::replication_handler::SendNone;
use crate::engine::Command;
use crate::engine::EngineConfig;
use crate::engine::EngineOutput;
use crate::entry::RaftPayload;
use crate::proposer::Leader;
use crate::proposer::LeaderQuorumSet;
use crate::raft_state::LogStateReader;
use crate::type_config::alias::LogIdOf;
use crate::AsyncRuntime;
use crate::RaftLogId;
use crate::RaftState;
use crate::RaftTypeConfig;

#[cfg(test)]
mod append_entries_test;
#[cfg(test)]
mod send_heartbeat_test;

/// Handle leader operations.
///
/// - Append new logs;
/// - Change membership;
/// - etc
pub(crate) struct LeaderHandler<'x, C>
where C: RaftTypeConfig
{
    pub(crate) config: &'x mut EngineConfig<C::NodeId>,
    pub(crate) leader: &'x mut Leader<C, LeaderQuorumSet<C::NodeId>>,
    pub(crate) state: &'x mut RaftState<C::NodeId, C::Node, <C::AsyncRuntime as AsyncRuntime>::Instant>,
    pub(crate) output: &'x mut EngineOutput<C>,
}

impl<'x, C> LeaderHandler<'x, C>
where C: RaftTypeConfig
{
    /// Append new log entries by a leader.
    ///
    /// Also Update effective membership if the payload contains
    /// membership config.
    ///
    /// If there is a membership config log entry, the caller has to guarantee the previous one is
    /// committed.
    ///
    /// TODO(xp): if vote indicates this node is not the leader, refuse append
    #[tracing::instrument(level = "debug", skip(self, entries))]
    pub(crate) fn leader_append_entries(&mut self, mut entries: Vec<C::Entry>) {
        let l = entries.len();
        if l == 0 {
            return;
        }

        self.leader.assign_log_ids(&mut entries);

        self.state.extend_log_ids_from_same_leader(&entries);

        let mut membership_entry = None;
        for entry in entries.iter() {
            if let Some(m) = entry.get_membership() {
                debug_assert!(
                    membership_entry.is_none(),
                    "only one membership entry is allowed in a batch"
                );
                membership_entry = Some((*entry.get_log_id(), m.clone()));
            }
        }

        // TODO: In future implementations with asynchronous IO,
        //       ensure logs are not written until the vote is committed
        //       to maintain consistency.
        //       ---
        //       Currently, IO requests to `RaftLogStorage` are executed
        //       within the `RaftCore` task. This means an `AppendLog` request
        //       won't be submitted to `RaftLogStorage` until `save_vote()` completes,
        //       which ensures consistency.
        //       ---
        //       However, when `RaftLogStorage` is moved to a separate task,
        //       `RaftCore` will communicate with `RaftLogStorage` via a channel.
        //       This change could result in `AppendLog` requests being submitted
        //       before the previous `save_vote()` request is finished.
        //       ---
        //       This scenario creates a risk where a log entry becomes visible and
        //       is replicated by `ReplicationCore` to other nodes before the vote
        //       is flushed to disk. If the vote isn't flushed and the server restarts,
        //       the vote could revert to a previous state. This could allow a new leader
        //       to be elected with a smaller vote (term), breaking consistency.
        self.output.push_command(Command::AppendInputEntries {
            // A leader should always use the leader's vote.
            // It is allowed to be different from local vote.
            vote: self.leader.vote,
            entries,
        });

        let mut rh = self.replication_handler();

        // Since this entry, the condition to commit has been changed.
        // But we only need to commit in the new membership config.
        // Because any quorum in the new one intersect with one in the previous membership config.
        if let Some((log_id, m)) = membership_entry {
            rh.append_membership(&log_id, &m);
        }

        rh.initiate_replication(SendNone::False);
    }

    #[tracing::instrument(level = "debug", skip_all)]
    pub(crate) fn send_heartbeat(&mut self) -> () {
        let mut rh = self.replication_handler();
        rh.initiate_replication(SendNone::True);
    }

    /// Get the log id for a linearizable read.
    ///
    /// See: [Read Operation](crate::docs::protocol::read)
    pub(crate) fn get_read_log_id(&self) -> Option<LogIdOf<C>> {
        let committed = self.state.committed().copied();
        // noop log id is the first log this leader proposed.
        std::cmp::max(self.leader.noop_log_id, committed)
    }

    pub(crate) fn replication_handler(&mut self) -> ReplicationHandler<C> {
        ReplicationHandler {
            config: self.config,
            leader: self.leader,
            state: self.state,
            output: self.output,
        }
    }
}