d_engine/core/replication/mod.rs
1//! Raft log replication implementation (Section 5.3)
2//!
3//! Handles core replication mechanics including:
4//! - Leader log propagation
5//! - Follower log consistency checks
6//! - Conflict resolution algorithms
7mod batch_buffer;
8mod replication_handler;
9
10pub use batch_buffer::*;
11pub use replication_handler::*;
12
13#[cfg(test)]
14mod batch_buffer_test;
15#[cfg(test)]
16mod replication_handler_test;
17
18// Client Request Extension Definition
19// -----------------------------------------------------------------------------
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use dashmap::DashMap;
24#[cfg(test)]
25use mockall::automock;
26use tonic::async_trait;
27use tonic::Status;
28
29use super::LeaderStateSnapshot;
30use super::StateSnapshot;
31use crate::alias::ROF;
32use crate::alias::TROF;
33use crate::proto::append_entries_response;
34use crate::proto::AppendEntriesRequest;
35use crate::proto::AppendEntriesResponse;
36use crate::proto::ClientCommand;
37use crate::proto::ClientResponse;
38use crate::proto::ConflictResult;
39use crate::proto::Entry;
40use crate::proto::LogId;
41use crate::proto::SuccessResult;
42use crate::AppendResults;
43use crate::ChannelWithAddressAndRole;
44use crate::MaybeCloneOneshotSender;
45use crate::RaftConfig;
46use crate::Result;
47use crate::RetryPolicies;
48use crate::TypeConfig;
49
50/// Client request with response channel
51#[derive(Debug)]
52pub struct ClientRequestWithSignal {
53 pub id: String,
54 pub commands: Vec<ClientCommand>,
55 pub sender: MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
56}
57
58/// Replication state updates from Leader perspective
59#[derive(Debug)]
60pub struct LeaderStateUpdate {
61 /// (peer_id -> (next_index, match_index))
62 pub peer_index_updates: HashMap<u32, (u64, u64)>,
63 // New commit index, if thre is
64 pub new_commit_index: Option<u64>,
65}
66
67/// AppendEntries response with possible state changes
68#[derive(Debug)]
69pub struct AppendResponseWithUpdates {
70 pub response: AppendEntriesResponse,
71 pub commit_index_update: Option<u64>, // Commit_index to be updated
72}
73
74/// Core replication protocol operations
75#[cfg_attr(test, automock)]
76#[async_trait]
77pub trait ReplicationCore<T>: Send + Sync + 'static
78where T: TypeConfig
79{
80 /// As Leader, send replications to peers.
81 /// (combined regular heartbeat and client proposals)
82 ///
83 /// Each time handle_client_proposal_in_batch is called, perform peer
84 /// synchronization check
85 /// 1. Verify if any peer's next_id <= leader's commit_index
86 /// 2. For non-synced peers meeting this condition: a. Retrieve all unsynced log entries b.
87 /// Buffer these entries before processing real entries
88 /// 3. Ensure unsynced entries are prepended to the entries queue before actual entries get
89 /// pushed
90 ///
91 /// Leader state will be updated by LeaderState only(follows SRP).
92 async fn handle_client_proposal_in_batch(
93 &self,
94 commands: Vec<ClientCommand>,
95 state_snapshot: StateSnapshot,
96 leader_state_snapshot: LeaderStateSnapshot,
97 replication_members: &Vec<ChannelWithAddressAndRole>,
98 raft_log: &Arc<ROF<T>>,
99 transport: &Arc<TROF<T>>,
100 raft: &RaftConfig,
101 retry: &RetryPolicies,
102 ) -> Result<AppendResults>;
103
104 /// Handles successful AppendEntries responses
105 ///
106 /// Updates peer match/next indices according to:
107 /// - Last matched log index
108 /// - Current leader term
109 fn handle_success_response(
110 &self,
111 peer_id: u32,
112 peer_term: u64,
113 success_result: SuccessResult,
114 leader_term: u64,
115 ) -> Result<crate::PeerUpdate>;
116
117 /// Resolves log conflicts from follower responses
118 ///
119 /// Implements conflict backtracking optimization (Section 5.3)
120 fn handle_conflict_response(
121 &self,
122 peer_id: u32,
123 conflict_result: ConflictResult,
124 raft_log: &Arc<ROF<T>>,
125 ) -> Result<crate::PeerUpdate>;
126
127 /// Determines follower commit index advancement
128 ///
129 /// Applies Leader's commit index according to:
130 /// - min(leader_commit, last_local_log_index)
131 fn if_update_commit_index_as_follower(
132 my_commit_index: u64,
133 last_raft_log_id: u64,
134 leader_commit_index: u64,
135 ) -> Option<u64>;
136
137 /// Gathers legacy logs for lagging peers
138 ///
139 /// Performs log segmentation based on:
140 /// - Peer's next_index
141 /// - Max allowed historical entries
142 fn retrieve_to_be_synced_logs_for_peers(
143 &self,
144 new_entries: Vec<Entry>,
145 leader_last_index_before_inserting_new_entries: u64,
146 max_legacy_entries_per_peer: u64, //Maximum number of entries
147 peer_next_indices: &HashMap<u32, u64>,
148 raft_log: &Arc<ROF<T>>,
149 ) -> DashMap<u32, Vec<Entry>>;
150
151 /// Handles an incoming AppendEntries RPC request (called by ALL ROLES)
152 ///
153 /// Core responsibilities:
154 /// 1. Term validation and comparison (RFC §5.1)
155 /// 2. Log consistency checking (prev_log_index/term)
156 /// 3. Entry appending with conflict resolution (RFC §5.3)
157 /// 4. Commit index advancement (RFC §5.3)
158 ///
159 /// # Critical Architecture Constraints
160 /// 1. ROLE AGNOSTIC - This method contains no role-specific logic. It simply:
161 /// - Validates the RPC against local state
162 /// - Returns required state updates
163 /// 2. STATE CHANGE ISOLATION - Never directly modifies:
164 /// - Current role (Leader/Follower/etc)
165 /// - Persistent state (term/votedFor)
166 /// - Volatile leader state (nextIndex/matchIndex)
167 /// 3. CALLER MUST:
168 /// - Check `response.term_update` for term conflicts
169 /// - If higher term exists, transition to Follower
170 /// - Apply other state updates via role_tx
171 async fn handle_append_entries(
172 &self,
173 request: AppendEntriesRequest,
174 state_snapshot: &StateSnapshot,
175 raft_log: &Arc<ROF<T>>,
176 ) -> Result<AppendResponseWithUpdates>;
177
178 /// Validates an incoming AppendEntries RPC from a Leader against Raft protocol rules.
179 ///
180 /// This function implements the **log consistency checks** defined in Raft paper Section 5.3.
181 /// It determines whether to accept the Leader's log entries by verifying:
182 /// 1. Term freshness
183 /// 2. Virtual log (prev_log_index=0) handling
184 /// 3. Previous log entry consistency
185 ///
186 /// # Raft Protocol Rules Enforced
187 /// 1. **Term Check** (Raft Paper 5.1):
188 /// - Reject requests with stale terms to prevent partitioned Leaders
189 /// 2. **Virtual Log Handling** (Implementation-Specific):
190 /// - Special case for empty logs (prev_log_index=0 && prev_log_term=0)
191 /// 3. **Log Matching** (Raft Paper 5.3):
192 /// - Ensure Leader's prev_log_index/term matches Follower's log
193 ///
194 /// # Parameters
195 /// - `my_term`: Current node's term
196 /// - `request`: Leader's AppendEntries RPC
197 /// - `raft_log`: Reference to node's log store
198 ///
199 /// # Return
200 /// - [`AppendEntriesResponse::success`] if validation passes
201 /// - [`AppendEntriesResponse::higher_term`] if Leader's term is stale
202 /// - [`AppendEntriesResponse::conflict`] with debugging info for log inconsistencies
203 fn check_append_entries_request_is_legal(
204 &self,
205 my_term: u64,
206 request: &AppendEntriesRequest,
207 raft_log: &Arc<ROF<T>>,
208 ) -> AppendEntriesResponse;
209}
210
211impl AppendEntriesResponse {
212 /// Generate a successful response (full success)
213 pub fn success(
214 node_id: u32,
215 term: u64,
216 last_match: Option<LogId>,
217 ) -> Self {
218 Self {
219 node_id,
220 term,
221 result: Some(append_entries_response::Result::Success(SuccessResult { last_match })),
222 }
223 }
224
225 /// Generate conflict response (with conflict details)
226 pub fn conflict(
227 node_id: u32,
228 term: u64,
229 conflict_term: Option<u64>,
230 conflict_index: Option<u64>,
231 ) -> Self {
232 Self {
233 node_id,
234 term,
235 result: Some(append_entries_response::Result::Conflict(ConflictResult {
236 conflict_term,
237 conflict_index,
238 })),
239 }
240 }
241
242 /// Generate a conflict response (Higher term found)
243 pub fn higher_term(
244 node_id: u32,
245 term: u64,
246 ) -> Self {
247 Self {
248 node_id,
249 term,
250 result: Some(append_entries_response::Result::HigherTerm(term)),
251 }
252 }
253
254 /// Check if it is a success response
255 pub fn is_success(&self) -> bool {
256 match &self.result {
257 Some(append_entries_response::Result::Success(_)) => true,
258 _ => false,
259 }
260 }
261
262 /// Check if it is a conflict response
263 pub fn is_conflict(&self) -> bool {
264 match &self.result {
265 Some(append_entries_response::Result::Conflict(_conflict)) => true,
266 _ => false,
267 }
268 }
269
270 /// Check if it is a response of a higher Term
271 pub fn is_higher_term(&self) -> bool {
272 matches!(&self.result, Some(append_entries_response::Result::HigherTerm(_)))
273 }
274}