1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256
use std::sync::Arc; use actix::prelude::*; use crate::{ AppData, AppDataResponse, AppError, common::{ApplyLogsTask, DependencyAddr, UpdateCurrentLeader}, network::RaftNetwork, messages::{AppendEntriesRequest, AppendEntriesResponse, ConflictOpt, Entry, EntryPayload}, raft::{RaftState, Raft, SnapshotState}, storage::{GetLogEntries, RaftStorage, ReplicateToLog}, }; impl<D: AppData, R: AppDataResponse, E: AppError, N: RaftNetwork<D>, S: RaftStorage<D, R, E>> Handler<AppendEntriesRequest<D>> for Raft<D, R, E, N, S> { type Result = ResponseActFuture<Self, AppendEntriesResponse, ()>; /// An RPC invoked by the leader to replicate log entries (§5.3); also used as heartbeat (§5.2). /// /// This method implements the append entries algorithm and upholds all of the safety checks /// detailed in §5.3. /// /// Implementation overview from spec: /// /// 1. Reply `false` if `term` is less than node's current `term` (§5.1). /// 2. Reply `false` if log doesn’t contain an entry at `prev_log_index` whose term /// matches `prev_log_term` (§5.3). /// 3. If an existing entry conflicts with a new one (same index but different terms), delete the /// existing entry and all that follow it (§5.3). /// 4. Append any new entries not already in the log. /// 5. If `leader_commit` is greater than node's commit index, set nodes commit index to /// `min(leader_commit, index of last new entry)`. /// /// The essential goal of this algorithm is that the receiver (the node on which this method /// is being executed) must find the exact entry in its log specified by the RPC's last index /// and last term fields, and then begin writing the new entries thereafter. /// /// When the receiver can not find the entry specified in the RPC's prev index & prev term /// fields, it will respond with a failure to the leader. **This implementation of Raft /// includes the _conflicting term_ optimization** which is intended to reduce the number of /// rejected append entries RPCs from followers which are lagging behind, which is detailed in /// §5.3. In such cases, if the Raft cluster is configured with a snapshot policy other than /// `Disabled`, the leader will make a determination if an `InstallSnapshot` RPC should be /// sent to this node. /// /// In Raft, the leader handles inconsistencies by forcing the followers’ logs to duplicate /// its own. This means that conflicting entries in follower logs will be overwritten with /// entries from the leader’s log. §5.4 details the safety of this protocol. It is important /// to note that logs which are _committed_ will not be overwritten. This is a critical /// feature of Raft. /// /// Raft also gurantees that only logs which have been comitted may be applied to the state /// machine, which ensures that there will never be a case where a log needs to be reverted /// after being applied to the state machine. /// /// #### inconsistency example /// Followers may receive valid append entries requests from leaders, append them, respond, /// and before the leader is able to replicate the entries to a majority of nodes, the leader /// may die, a new leader may be elected which does not have the same entries, as they were /// not replicated to a majority of followers, and the new leader will proceeed to overwrite /// the inconsistent entries. fn handle(&mut self, msg: AppendEntriesRequest<D>, ctx: &mut Self::Context) -> Self::Result { // Only handle requests if actor has finished initialization. if let &RaftState::Initializing = &self.state { return Box::new(fut::err(())); } // If message's term is less than most recent term, then we do not honor the request. if &msg.term < &self.current_term { return Box::new(fut::ok(AppendEntriesResponse{term: self.current_term, success: false, conflict_opt: None})); } // Update election timeout. self.update_election_timeout_stamp(); // Update current term if needed. if self.current_term != msg.term { self.update_current_term(msg.term, None); self.save_hard_state(ctx); } // Update current leader if needed. if self.current_leader != Some(msg.leader_id) { self.update_current_leader(ctx, UpdateCurrentLeader::OtherNode(msg.leader_id)); } // Transition to follower state if needed. match &mut self.state { // Ensure we are not in a snapshotting state. RaftState::Follower(inner) => match inner.snapshot_state { SnapshotState::Idle => (), _ => inner.snapshot_state = SnapshotState::Idle, } // NonVoters stay in this state until a config change is received which changes its state. RaftState::NonVoter => (), // Any other state needs to transition to follower. _ => self.become_follower(ctx), } // Kick off process of applying logs to state machine based on `msg.leader_commit`. self.commit_index = msg.leader_commit; // The value for `self.commit_index` is only updated here when not the leader. if &self.commit_index > &self.last_applied { let _ = self.apply_logs_pipeline.unbounded_send(ApplyLogsTask::Outstanding); } // If this is just a heartbeat, then respond. if msg.entries.len() == 0 { return Box::new(fut::ok(AppendEntriesResponse{term: self.current_term, success: true, conflict_opt: None})); } // If RPC's `prev_log_index` is 0, or the RPC's previous log info matches the local // log info, then replication is g2g. let (term, msg_prev_index, msg_prev_term) = (self.current_term, msg.prev_log_index, msg.prev_log_term); let has_prev_log_match = &msg.prev_log_index == &u64::min_value() || (&msg_prev_index == &self.last_log_index && &msg_prev_term == &self.last_log_term); if has_prev_log_match { return Box::new(self.append_log_entries(ctx, Arc::new(msg.entries)) .map(move |_, _, _| { AppendEntriesResponse{term, success: true, conflict_opt: None} })); } // Previous log info doesn't immediately line up, so perform log consistency check and // proceed based on its result. Box::new(self.log_consistency_check(ctx, msg_prev_index, msg_prev_term) .and_then(move |res, act, ctx| match res { Some(conflict_opt) => { fut::Either::A(fut::ok( AppendEntriesResponse{term, success: false, conflict_opt: Some(conflict_opt)} )) } None => { fut::Either::B(act.append_log_entries(ctx, Arc::new(msg.entries)) .map(move |_, _, _| { AppendEntriesResponse{term, success: true, conflict_opt: None} })) } })) } } impl<D: AppData, R: AppDataResponse, E: AppError, N: RaftNetwork<D>, S: RaftStorage<D, R, E>> Raft<D, R, E, N, S> { /// Append the given entries to the log. /// /// This routine also encapsulates all logic which must be performed related to appending log /// entries. /// /// One important piece of logic to note here is the handling of config change entries. Per /// the Raft spec in §6: /// /// > Once a given server adds the new configuration entry to its log, it uses that /// > configuration for all future decisions (a server always uses the latest configuration in /// > its log, regardless of whether the entry is committed). /// /// This routine will extract the most recent (the latter most) entry in the given payload of /// entries which is a config change entry and will update the node's member state based on /// that entry. fn append_log_entries( &mut self, ctx: &mut Context<Self>, entries: Arc<Vec<Entry<D>>>, ) -> impl ActorFuture<Actor=Self, Item=(), Error=()> { // If we are already eppending entries, then abort this operation. if self.is_appending_logs { return fut::Either::A(fut::err(())); } // Check the given entries for any config changes and take the most recent. let last_conf_change = entries.iter().filter_map(|ent| match &ent.payload { EntryPayload::ConfigChange(conf) => Some(conf), _ => None, }).last(); let f = match last_conf_change { Some(conf) => { // Update membership info & apply hard state. fut::Either::A(self.update_membership(ctx, conf.membership.clone())) } None => fut::Either::B(fut::ok(())), }; fut::Either::B(f.and_then(move |_, act, _| { act.is_appending_logs = true; fut::wrap_future(act.storage.send::<ReplicateToLog<D, E>>(ReplicateToLog::new(entries.clone()))) .map_err(|err, act: &mut Self, ctx| act.map_fatal_actix_messaging_error(ctx, err, DependencyAddr::RaftStorage)) .and_then(|res, act, ctx| act.map_fatal_storage_result(ctx, res)) .map(move |_, act, _| { if let Some((idx, term)) = entries.last().map(|elem| (elem.index, elem.term)) { act.last_log_index = idx; act.last_log_term = term; } }) .then(|res, act, _| { act.is_appending_logs = false; fut::result(res) }) })) } /// Perform the AppendEntries RPC consistency check. /// /// If the log entry at the specified index does not exist, the most recent entry in the log /// will be used to build and return a `ConflictOpt` struct to be sent back to the leader. /// /// If The log entry at the specified index does exist, but the terms to no match up, this /// implementation will fetch the last 50 entries from the given index, and will use the /// earliest entry from the log which is still in the given term to build a `ConflictOpt` /// struct to be sent back to the leader. /// /// If everyhing checks out, a `None` value will be returned and log replication may continue. fn log_consistency_check( &mut self, _: &mut Context<Self>, index: u64, term: u64, ) -> impl ActorFuture<Actor=Self, Item=Option<ConflictOpt>, Error=()> { let storage = self.storage.clone(); fut::wrap_future(self.storage.send::<GetLogEntries<D, E>>(GetLogEntries::new(index, index))) .map_err(|err, act: &mut Self, ctx| act.map_fatal_actix_messaging_error(ctx, err, DependencyAddr::RaftStorage)) .and_then(|res, act, ctx| act.map_fatal_storage_result(ctx, res)) .and_then(move |res, act, _| { match res.last() { // The target entry was not found. This can only mean that we don't have the // specified index yet. Use the last known index & term. None => { fut::Either::A(fut::ok(Some(ConflictOpt{term: act.last_log_term, index: act.last_log_index}))) } // The target entry was found. Compare its term with target term to ensure // everything is consistent. Some(target_entry) => { let (target_entry_index, target_entry_term) = (target_entry.index, target_entry.term); if &target_entry_index == &index && &target_entry_term == &term { // Everything checks out. We're g2g. fut::Either::A(fut::ok(None)) } else { // Logs are inconsistent. Fetch the last 50 logs, and use the last // entry of that payload which is still in the target term for // conflict optimization. let start = if index >= 50 { index - 50 } else { 0 }; fut::Either::B(fut::wrap_future(storage.send::<GetLogEntries<D, E>>(GetLogEntries::new(start, index))) .map_err(|err, act: &mut Self, ctx| act.map_fatal_actix_messaging_error(ctx, err, DependencyAddr::RaftStorage)) .and_then(|res, act, ctx| act.map_fatal_storage_result(ctx, res)) .and_then(move |res, act, _| { match res.iter().find(|entry| entry.term == term) { Some(entry) => { fut::ok(Some(ConflictOpt{ term: entry.term, index: entry.index, })) } None => { fut::ok(Some(ConflictOpt{ term: act.last_log_term, index: act.last_log_index, })) } } })) } } } }) } }