d_engine/core/replication/
mod.rs

1//! Raft log replication implementation (Section 5.3)
2//!
3//! Handles core replication mechanics including:
4//! - Leader log propagation
5//! - Follower log consistency checks
6//! - Conflict resolution algorithms
7mod batch_buffer;
8mod replication_handler;
9
10pub use batch_buffer::*;
11pub use replication_handler::*;
12
13#[cfg(test)]
14mod batch_buffer_test;
15#[cfg(test)]
16mod replication_handler_test;
17
18// Client Request Extension Definition
19// -----------------------------------------------------------------------------
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use dashmap::DashMap;
24#[cfg(test)]
25use mockall::automock;
26use tonic::async_trait;
27use tonic::Status;
28
29use super::LeaderStateSnapshot;
30use super::StateSnapshot;
31use crate::alias::ROF;
32use crate::proto::client::ClientResponse;
33use crate::proto::common::Entry;
34use crate::proto::common::EntryPayload;
35use crate::proto::common::LogId;
36use crate::proto::replication::append_entries_response;
37use crate::proto::replication::AppendEntriesRequest;
38use crate::proto::replication::AppendEntriesResponse;
39use crate::proto::replication::ConflictResult;
40use crate::proto::replication::SuccessResult;
41use crate::AppendResults;
42use crate::MaybeCloneOneshotSender;
43use crate::Result;
44use crate::TypeConfig;
45
46/// Request with response channel that can handle all Raft payload types
47#[derive(Debug)]
48pub struct RaftRequestWithSignal {
49    #[allow(unused)]
50    pub id: String,
51    pub payloads: Vec<EntryPayload>,
52    pub sender: MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
53}
54
55/// AppendEntries response with possible state changes
56#[derive(Debug)]
57pub struct AppendResponseWithUpdates {
58    pub response: AppendEntriesResponse,
59    pub commit_index_update: Option<u64>, // Commit_index to be updated
60}
61
62/// Core replication protocol operations
63#[cfg_attr(test, automock)]
64#[async_trait]
65pub trait ReplicationCore<T>: Send + Sync + 'static
66where
67    T: TypeConfig,
68{
69    /// As Leader, send replications to peers (combines regular heartbeats and client proposals).
70    ///
71    /// Performs peer synchronization checks:
72    /// 1. Verifies if any peer's next_id <= leader's commit_index
73    /// 2. For non-synced peers: retrieves unsynced logs and buffers them
74    /// 3. Prepends unsynced entries to the entries queue
75    ///
76    /// # Returns
77    /// - `Ok(AppendResults)` with aggregated replication outcomes
78    /// - `Err` for unrecoverable errors
79    ///
80    /// # Return Result Semantics
81    /// 1. **Insufficient Quorum**:   Returns `Ok(AppendResults)` with `commit_quorum_achieved =
82    ///    false` when:
83    ///    - Responses received from all nodes but majority acceptance not achieved
84    ///    - Partial timeouts reduce successful responses below majority
85    ///
86    /// 2. **Timeout Handling**:
87    ///    - Partial timeouts: Returns `Ok` with `commit_quorum_achieved = false`
88    ///    - Complete timeout: Returns `Ok` with `commit_quorum_achieved = false`
89    ///    - Timeout peers are EXCLUDED from `peer_updates`
90    ///
91    /// 3. **Error Conditions**:   Returns `Err` ONLY for:
92    ///    - Empty voting members (`ReplicationError::NoPeerFound`)
93    ///    - Log generation failures (`generate_new_entries` errors)
94    ///    - Higher term detected in peer response (`ReplicationError::HigherTerm`)
95    ///    - Critical response handling errors
96    ///
97    /// # Guarantees
98    /// - Only peers with successful responses appear in `peer_updates`
99    /// - Timeouts never cause top-level `Err` (handled as failed responses)
100    /// - Leader self-vote always counted in quorum calculation
101    ///
102    /// # Note
103    /// - Leader state should be updated by LeaderState only(follows SRP).
104    ///
105    /// # Quorum
106    /// - If there are no voters (not even the leader), quorum is not possible.
107    /// - If the leader is the only voter, quorum is always achieved.
108    /// - If all nodes are learners, quorum is not achieved.
109    async fn handle_raft_request_in_batch(
110        &self,
111        entry_payloads: Vec<EntryPayload>,
112        state_snapshot: StateSnapshot,
113        leader_state_snapshot: LeaderStateSnapshot,
114        ctx: &crate::RaftContext<T>,
115    ) -> Result<AppendResults>;
116
117    /// Handles successful AppendEntries responses
118    ///
119    /// Updates peer match/next indices according to:
120    /// - Last matched log index
121    /// - Current leader term
122    fn handle_success_response(
123        &self,
124        peer_id: u32,
125        peer_term: u64,
126        success_result: SuccessResult,
127        leader_term: u64,
128    ) -> Result<crate::PeerUpdate>;
129
130    /// Resolves log conflicts from follower responses
131    ///
132    /// Implements conflict backtracking optimization (Section 5.3)
133    fn handle_conflict_response(
134        &self,
135        peer_id: u32,
136        conflict_result: ConflictResult,
137        raft_log: &Arc<ROF<T>>,
138        current_next_index: u64,
139    ) -> Result<crate::PeerUpdate>;
140
141    /// Determines follower commit index advancement
142    ///
143    /// Applies Leader's commit index according to:
144    /// - min(leader_commit, last_local_log_index)
145    fn if_update_commit_index_as_follower(
146        my_commit_index: u64,
147        last_raft_log_id: u64,
148        leader_commit_index: u64,
149    ) -> Option<u64>;
150
151    /// Gathers legacy logs for lagging peers
152    ///
153    /// Performs log segmentation based on:
154    /// - Peer's next_index
155    /// - Max allowed historical entries
156    fn retrieve_to_be_synced_logs_for_peers(
157        &self,
158        new_entries: Vec<Entry>,
159        leader_last_index_before_inserting_new_entries: u64,
160        max_legacy_entries_per_peer: u64, //Maximum number of entries
161        peer_next_indices: &HashMap<u32, u64>,
162        raft_log: &Arc<ROF<T>>,
163    ) -> DashMap<u32, Vec<Entry>>;
164
165    /// Handles an incoming AppendEntries RPC request (called by ALL ROLES)
166    ///
167    /// Core responsibilities:
168    /// 1. Term validation and comparison (RFC §5.1)
169    /// 2. Log consistency checking (prev_log_index/term)
170    /// 3. Entry appending with conflict resolution (RFC §5.3)
171    /// 4. Commit index advancement (RFC §5.3)
172    ///
173    /// # Critical Architecture Constraints
174    /// 1. ROLE AGNOSTIC - This method contains no role-specific logic. It simply:
175    ///    - Validates the RPC against local state
176    ///    - Returns required state updates
177    /// 2. STATE CHANGE ISOLATION - Never directly modifies:
178    ///    - Current role (Leader/Follower/etc)
179    ///    - Persistent state (term/votedFor)
180    ///    - Volatile leader state (nextIndex/matchIndex)
181    /// 3. CALLER MUST:
182    ///    - Check `response.term_update` for term conflicts
183    ///    - If higher term exists, transition to Follower
184    ///    - Apply other state updates via role_tx
185    async fn handle_append_entries(
186        &self,
187        request: AppendEntriesRequest,
188        state_snapshot: &StateSnapshot,
189        raft_log: &Arc<ROF<T>>,
190    ) -> Result<AppendResponseWithUpdates>;
191
192    /// Validates an incoming AppendEntries RPC from a Leader against Raft protocol rules.
193    ///
194    /// This function implements the **log consistency checks** defined in Raft paper Section 5.3.
195    /// It determines whether to accept the Leader's log entries by verifying:
196    /// 1. Term freshness
197    /// 2. Virtual log (prev_log_index=0) handling
198    /// 3. Previous log entry consistency
199    ///
200    /// # Raft Protocol Rules Enforced
201    /// 1. **Term Check** (Raft Paper 5.1):
202    ///    - Reject requests with stale terms to prevent partitioned Leaders
203    /// 2. **Virtual Log Handling** (Implementation-Specific):
204    ///    - Special case for empty logs (prev_log_index=0 && prev_log_term=0)
205    /// 3. **Log Matching** (Raft Paper 5.3):
206    ///    - Ensure Leader's prev_log_index/term matches Follower's log
207    ///
208    /// # Parameters
209    /// - `my_term`: Current node's term
210    /// - `request`: Leader's AppendEntries RPC
211    /// - `raft_log`: Reference to node's log store
212    ///
213    /// # Return
214    /// - [`AppendEntriesResponse::success`] if validation passes
215    /// - [`AppendEntriesResponse::higher_term`] if Leader's term is stale
216    /// - [`AppendEntriesResponse::conflict`] with debugging info for log inconsistencies
217    fn check_append_entries_request_is_legal(
218        &self,
219        my_term: u64,
220        request: &AppendEntriesRequest,
221        raft_log: &Arc<ROF<T>>,
222    ) -> AppendEntriesResponse;
223}
224
225impl AppendEntriesResponse {
226    /// Generate a successful response (full success)
227    pub fn success(
228        node_id: u32,
229        term: u64,
230        last_match: Option<LogId>,
231    ) -> Self {
232        Self {
233            node_id,
234            term,
235            result: Some(append_entries_response::Result::Success(SuccessResult {
236                last_match,
237            })),
238        }
239    }
240
241    /// Generate conflict response (with conflict details)
242    pub fn conflict(
243        node_id: u32,
244        term: u64,
245        conflict_term: Option<u64>,
246        conflict_index: Option<u64>,
247    ) -> Self {
248        Self {
249            node_id,
250            term,
251            result: Some(append_entries_response::Result::Conflict(ConflictResult {
252                conflict_term,
253                conflict_index,
254            })),
255        }
256    }
257
258    /// Generate a conflict response (Higher term found)
259    pub fn higher_term(
260        node_id: u32,
261        term: u64,
262    ) -> Self {
263        Self {
264            node_id,
265            term,
266            result: Some(append_entries_response::Result::HigherTerm(term)),
267        }
268    }
269
270    /// Check if it is a success response
271    pub fn is_success(&self) -> bool {
272        matches!(
273            &self.result,
274            Some(append_entries_response::Result::Success(_))
275        )
276    }
277
278    /// Check if it is a conflict response
279    pub fn is_conflict(&self) -> bool {
280        matches!(
281            &self.result,
282            Some(append_entries_response::Result::Conflict(_conflict))
283        )
284    }
285
286    /// Check if it is a response of a higher Term
287    pub fn is_higher_term(&self) -> bool {
288        matches!(
289            &self.result,
290            Some(append_entries_response::Result::HigherTerm(_))
291        )
292    }
293}