d_engine/core/replication/
mod.rs

1//! Raft log replication implementation (Section 5.3)
2//!
3//! Handles core replication mechanics including:
4//! - Leader log propagation
5//! - Follower log consistency checks
6//! - Conflict resolution algorithms
7mod batch_buffer;
8mod replication_handler;
9
10pub use batch_buffer::*;
11pub use replication_handler::*;
12
13#[cfg(test)]
14mod batch_buffer_test;
15#[cfg(test)]
16mod replication_handler_test;
17
18// Client Request Extension Definition
19// -----------------------------------------------------------------------------
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use dashmap::DashMap;
24#[cfg(test)]
25use mockall::automock;
26use tonic::async_trait;
27use tonic::Status;
28
29use super::LeaderStateSnapshot;
30use super::StateSnapshot;
31use crate::alias::ROF;
32use crate::alias::TROF;
33use crate::proto::append_entries_response;
34use crate::proto::AppendEntriesRequest;
35use crate::proto::AppendEntriesResponse;
36use crate::proto::ClientCommand;
37use crate::proto::ClientResponse;
38use crate::proto::ConflictResult;
39use crate::proto::Entry;
40use crate::proto::LogId;
41use crate::proto::SuccessResult;
42use crate::AppendResults;
43use crate::ChannelWithAddressAndRole;
44use crate::MaybeCloneOneshotSender;
45use crate::RaftConfig;
46use crate::Result;
47use crate::RetryPolicies;
48use crate::TypeConfig;
49
50/// Client request with response channel
51#[derive(Debug)]
52pub struct ClientRequestWithSignal {
53    pub id: String,
54    pub commands: Vec<ClientCommand>,
55    pub sender: MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
56}
57
58/// Replication state updates from Leader perspective
59#[derive(Debug)]
60pub struct LeaderStateUpdate {
61    /// (peer_id -> (next_index, match_index))
62    pub peer_index_updates: HashMap<u32, (u64, u64)>,
63    // New commit index, if thre is
64    pub new_commit_index: Option<u64>,
65}
66
67/// AppendEntries response with possible state changes
68#[derive(Debug)]
69pub struct AppendResponseWithUpdates {
70    pub response: AppendEntriesResponse,
71    pub commit_index_update: Option<u64>, // Commit_index to be updated
72}
73
74/// Core replication protocol operations
75#[cfg_attr(test, automock)]
76#[async_trait]
77pub trait ReplicationCore<T>: Send + Sync + 'static
78where T: TypeConfig
79{
80    /// As Leader, send replications to peers.
81    /// (combined regular heartbeat and client proposals)
82    ///
83    /// Each time handle_client_proposal_in_batch is called, perform peer
84    /// synchronization check
85    /// 1. Verify if any peer's next_id <= leader's commit_index
86    /// 2. For non-synced peers meeting this condition: a. Retrieve all unsynced log entries b.
87    ///    Buffer these entries before processing real entries
88    /// 3. Ensure unsynced entries are prepended to the entries queue before actual entries get
89    ///    pushed
90    ///
91    /// Leader state will be updated by LeaderState only(follows SRP).
92    async fn handle_client_proposal_in_batch(
93        &self,
94        commands: Vec<ClientCommand>,
95        state_snapshot: StateSnapshot,
96        leader_state_snapshot: LeaderStateSnapshot,
97        replication_members: &Vec<ChannelWithAddressAndRole>,
98        raft_log: &Arc<ROF<T>>,
99        transport: &Arc<TROF<T>>,
100        raft: &RaftConfig,
101        retry: &RetryPolicies,
102    ) -> Result<AppendResults>;
103
104    /// Handles successful AppendEntries responses
105    ///
106    /// Updates peer match/next indices according to:
107    /// - Last matched log index
108    /// - Current leader term
109    fn handle_success_response(
110        &self,
111        peer_id: u32,
112        peer_term: u64,
113        success_result: SuccessResult,
114        leader_term: u64,
115    ) -> Result<crate::PeerUpdate>;
116
117    /// Resolves log conflicts from follower responses
118    ///
119    /// Implements conflict backtracking optimization (Section 5.3)
120    fn handle_conflict_response(
121        &self,
122        peer_id: u32,
123        conflict_result: ConflictResult,
124        raft_log: &Arc<ROF<T>>,
125    ) -> Result<crate::PeerUpdate>;
126
127    /// Determines follower commit index advancement
128    ///
129    /// Applies Leader's commit index according to:
130    /// - min(leader_commit, last_local_log_index)
131    fn if_update_commit_index_as_follower(
132        my_commit_index: u64,
133        last_raft_log_id: u64,
134        leader_commit_index: u64,
135    ) -> Option<u64>;
136
137    /// Gathers legacy logs for lagging peers
138    ///
139    /// Performs log segmentation based on:
140    /// - Peer's next_index
141    /// - Max allowed historical entries
142    fn retrieve_to_be_synced_logs_for_peers(
143        &self,
144        new_entries: Vec<Entry>,
145        leader_last_index_before_inserting_new_entries: u64,
146        max_legacy_entries_per_peer: u64, //Maximum number of entries
147        peer_next_indices: &HashMap<u32, u64>,
148        raft_log: &Arc<ROF<T>>,
149    ) -> DashMap<u32, Vec<Entry>>;
150
151    /// Handles an incoming AppendEntries RPC request (called by ALL ROLES)
152    ///
153    /// Core responsibilities:
154    /// 1. Term validation and comparison (RFC §5.1)
155    /// 2. Log consistency checking (prev_log_index/term)
156    /// 3. Entry appending with conflict resolution (RFC §5.3)
157    /// 4. Commit index advancement (RFC §5.3)
158    ///
159    /// # Critical Architecture Constraints
160    /// 1. ROLE AGNOSTIC - This method contains no role-specific logic. It simply:
161    ///    - Validates the RPC against local state
162    ///    - Returns required state updates
163    /// 2. STATE CHANGE ISOLATION - Never directly modifies:
164    ///    - Current role (Leader/Follower/etc)
165    ///    - Persistent state (term/votedFor)
166    ///    - Volatile leader state (nextIndex/matchIndex)
167    /// 3. CALLER MUST:
168    ///    - Check `response.term_update` for term conflicts
169    ///    - If higher term exists, transition to Follower
170    ///    - Apply other state updates via role_tx
171    async fn handle_append_entries(
172        &self,
173        request: AppendEntriesRequest,
174        state_snapshot: &StateSnapshot,
175        raft_log: &Arc<ROF<T>>,
176    ) -> Result<AppendResponseWithUpdates>;
177
178    /// Validates an incoming AppendEntries RPC from a Leader against Raft protocol rules.
179    ///
180    /// This function implements the **log consistency checks** defined in Raft paper Section 5.3.
181    /// It determines whether to accept the Leader's log entries by verifying:
182    /// 1. Term freshness
183    /// 2. Virtual log (prev_log_index=0) handling
184    /// 3. Previous log entry consistency
185    ///
186    /// # Raft Protocol Rules Enforced
187    /// 1. **Term Check** (Raft Paper 5.1):
188    ///    - Reject requests with stale terms to prevent partitioned Leaders
189    /// 2. **Virtual Log Handling** (Implementation-Specific):
190    ///    - Special case for empty logs (prev_log_index=0 && prev_log_term=0)
191    /// 3. **Log Matching** (Raft Paper 5.3):
192    ///    - Ensure Leader's prev_log_index/term matches Follower's log
193    ///
194    /// # Parameters
195    /// - `my_term`: Current node's term
196    /// - `request`: Leader's AppendEntries RPC
197    /// - `raft_log`: Reference to node's log store
198    ///
199    /// # Return
200    /// - [`AppendEntriesResponse::success`] if validation passes
201    /// - [`AppendEntriesResponse::higher_term`] if Leader's term is stale
202    /// - [`AppendEntriesResponse::conflict`] with debugging info for log inconsistencies
203    fn check_append_entries_request_is_legal(
204        &self,
205        my_term: u64,
206        request: &AppendEntriesRequest,
207        raft_log: &Arc<ROF<T>>,
208    ) -> AppendEntriesResponse;
209}
210
211impl AppendEntriesResponse {
212    /// Generate a successful response (full success)
213    pub fn success(
214        node_id: u32,
215        term: u64,
216        last_match: Option<LogId>,
217    ) -> Self {
218        Self {
219            node_id,
220            term,
221            result: Some(append_entries_response::Result::Success(SuccessResult { last_match })),
222        }
223    }
224
225    /// Generate conflict response (with conflict details)
226    pub fn conflict(
227        node_id: u32,
228        term: u64,
229        conflict_term: Option<u64>,
230        conflict_index: Option<u64>,
231    ) -> Self {
232        Self {
233            node_id,
234            term,
235            result: Some(append_entries_response::Result::Conflict(ConflictResult {
236                conflict_term,
237                conflict_index,
238            })),
239        }
240    }
241
242    /// Generate a conflict response (Higher term found)
243    pub fn higher_term(
244        node_id: u32,
245        term: u64,
246    ) -> Self {
247        Self {
248            node_id,
249            term,
250            result: Some(append_entries_response::Result::HigherTerm(term)),
251        }
252    }
253
254    /// Check if it is a success response
255    pub fn is_success(&self) -> bool {
256        match &self.result {
257            Some(append_entries_response::Result::Success(_)) => true,
258            _ => false,
259        }
260    }
261
262    /// Check if it is a conflict response
263    pub fn is_conflict(&self) -> bool {
264        match &self.result {
265            Some(append_entries_response::Result::Conflict(_conflict)) => true,
266            _ => false,
267        }
268    }
269
270    /// Check if it is a response of a higher Term
271    pub fn is_higher_term(&self) -> bool {
272        matches!(&self.result, Some(append_entries_response::Result::HigherTerm(_)))
273    }
274}