d_engine_core/replication/mod.rs
1//! Raft log replication implementation (Section 5.3)
2//!
3//! Handles core replication mechanics including:
4//! - Leader log propagation
5//! - Follower log consistency checks
6//! - Conflict resolution algorithms
7mod batch_buffer;
8mod replication_handler;
9
10pub use batch_buffer::*;
11pub use replication_handler::*;
12
13#[cfg(test)]
14mod batch_buffer_test;
15
16#[cfg(test)]
17pub mod replication_handler_test;
18
19// Client Request Extension Definition
20// -----------------------------------------------------------------------------
21use std::collections::HashMap;
22use std::sync::Arc;
23
24use d_engine_proto::client::ClientResponse;
25use d_engine_proto::common::Entry;
26use d_engine_proto::common::EntryPayload;
27use d_engine_proto::server::replication::AppendEntriesRequest;
28use d_engine_proto::server::replication::AppendEntriesResponse;
29use d_engine_proto::server::replication::ConflictResult;
30use d_engine_proto::server::replication::SuccessResult;
31use dashmap::DashMap;
32#[cfg(any(test, feature = "__test_support"))]
33use mockall::automock;
34use tonic::Status;
35use tonic::async_trait;
36
37use super::LeaderStateSnapshot;
38use super::StateSnapshot;
39use crate::AppendResults;
40use crate::MaybeCloneOneshotSender;
41use crate::Result;
42use crate::TypeConfig;
43use crate::alias::ROF;
44
45/// Request with response channel that can handle all Raft payload types
46#[derive(Debug)]
47pub struct RaftRequestWithSignal {
48 #[allow(unused)]
49 pub id: String,
50 pub payloads: Vec<EntryPayload>,
51 pub sender: MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
52}
53
54/// AppendEntries response with possible state changes
55#[derive(Debug)]
56pub struct AppendResponseWithUpdates {
57 pub response: AppendEntriesResponse,
58 pub commit_index_update: Option<u64>, // Commit_index to be updated
59}
60
61/// Core replication protocol operations
62#[cfg_attr(any(test, feature = "__test_support"), automock)]
63#[async_trait]
64pub trait ReplicationCore<T>: Send + Sync + 'static
65where
66 T: TypeConfig,
67{
68 /// As Leader, send replications to peers (combines regular heartbeats and client proposals).
69 ///
70 /// Performs peer synchronization checks:
71 /// 1. Verifies if any peer's next_id <= leader's commit_index
72 /// 2. For non-synced peers: retrieves unsynced logs and buffers them
73 /// 3. Prepends unsynced entries to the entries queue
74 ///
75 /// # Returns
76 /// - `Ok(AppendResults)` with aggregated replication outcomes
77 /// - `Err` for unrecoverable errors
78 ///
79 /// # Return Result Semantics
80 /// 1. **Insufficient Quorum**: Returns `Ok(AppendResults)` with `commit_quorum_achieved =
81 /// false` when:
82 /// - Responses received from all nodes but majority acceptance not achieved
83 /// - Partial timeouts reduce successful responses below majority
84 ///
85 /// 2. **Timeout Handling**:
86 /// - Partial timeouts: Returns `Ok` with `commit_quorum_achieved = false`
87 /// - Complete timeout: Returns `Ok` with `commit_quorum_achieved = false`
88 /// - Timeout peers are EXCLUDED from `peer_updates`
89 ///
90 /// 3. **Error Conditions**: Returns `Err` ONLY for:
91 /// - Empty voting members (`ReplicationError::NoPeerFound`)
92 /// - Log generation failures (`generate_new_entries` errors)
93 /// - Higher term detected in peer response (`ReplicationError::HigherTerm`)
94 /// - Critical response handling errors
95 ///
96 /// # Guarantees
97 /// - Only peers with successful responses appear in `peer_updates`
98 /// - Timeouts never cause top-level `Err` (handled as failed responses)
99 /// - Leader self-vote always counted in quorum calculation
100 ///
101 /// # Note
102 /// - Leader state should be updated by LeaderState only(follows SRP).
103 ///
104 /// # Quorum
105 /// - If there are no voters (not even the leader), quorum is not possible.
106 /// - If the leader is the only voter, quorum is always achieved.
107 /// - If all nodes are learners, quorum is not achieved.
108 async fn handle_raft_request_in_batch(
109 &self,
110 entry_payloads: Vec<EntryPayload>,
111 state_snapshot: StateSnapshot,
112 leader_state_snapshot: LeaderStateSnapshot,
113 cluster_metadata: &crate::raft_role::ClusterMetadata,
114 ctx: &crate::RaftContext<T>,
115 ) -> Result<AppendResults>;
116
117 /// Handles successful AppendEntries responses
118 ///
119 /// Updates peer match/next indices according to:
120 /// - Last matched log index
121 /// - Current leader term
122 fn handle_success_response(
123 &self,
124 peer_id: u32,
125 peer_term: u64,
126 success_result: SuccessResult,
127 leader_term: u64,
128 ) -> Result<crate::PeerUpdate>;
129
130 /// Resolves log conflicts from follower responses
131 ///
132 /// Implements conflict backtracking optimization (Section 5.3)
133 fn handle_conflict_response(
134 &self,
135 peer_id: u32,
136 conflict_result: ConflictResult,
137 raft_log: &Arc<ROF<T>>,
138 current_next_index: u64,
139 ) -> Result<crate::PeerUpdate>;
140
141 /// Determines follower commit index advancement
142 ///
143 /// Applies Leader's commit index according to:
144 /// - min(leader_commit, last_local_log_index)
145 fn if_update_commit_index_as_follower(
146 my_commit_index: u64,
147 last_raft_log_id: u64,
148 leader_commit_index: u64,
149 ) -> Option<u64>;
150
151 /// Gathers legacy logs for lagging peers
152 ///
153 /// Performs log segmentation based on:
154 /// - Peer's next_index
155 /// - Max allowed historical entries
156 fn retrieve_to_be_synced_logs_for_peers(
157 &self,
158 new_entries: Vec<Entry>,
159 leader_last_index_before_inserting_new_entries: u64,
160 max_legacy_entries_per_peer: u64, //Maximum number of entries
161 peer_next_indices: &HashMap<u32, u64>,
162 raft_log: &Arc<ROF<T>>,
163 ) -> DashMap<u32, Vec<Entry>>;
164
165 /// Handles an incoming AppendEntries RPC request (called by ALL ROLES)
166 ///
167 /// Core responsibilities:
168 /// 1. Term validation and comparison (RFC §5.1)
169 /// 2. Log consistency checking (prev_log_index/term)
170 /// 3. Entry appending with conflict resolution (RFC §5.3)
171 /// 4. Commit index advancement (RFC §5.3)
172 ///
173 /// # Critical Architecture Constraints
174 /// 1. ROLE AGNOSTIC - This method contains no role-specific logic. It simply:
175 /// - Validates the RPC against local state
176 /// - Returns required state updates
177 /// 2. STATE CHANGE ISOLATION - Never directly modifies:
178 /// - Current role (Leader/Follower/etc)
179 /// - Persistent state (term/votedFor)
180 /// - Volatile leader state (nextIndex/matchIndex)
181 /// 3. CALLER MUST:
182 /// - Check `response.term_update` for term conflicts
183 /// - If higher term exists, transition to Follower
184 /// - Apply other state updates via role_tx
185 async fn handle_append_entries(
186 &self,
187 request: AppendEntriesRequest,
188 state_snapshot: &StateSnapshot,
189 raft_log: &Arc<ROF<T>>,
190 ) -> Result<AppendResponseWithUpdates>;
191
192 /// Validates an incoming AppendEntries RPC from a Leader against Raft protocol rules.
193 ///
194 /// This function implements the **log consistency checks** defined in Raft paper Section 5.3.
195 /// It determines whether to accept the Leader's log entries by verifying:
196 /// 1. Term freshness
197 /// 2. Virtual log (prev_log_index=0) handling
198 /// 3. Previous log entry consistency
199 ///
200 /// # Raft Protocol Rules Enforced
201 /// 1. **Term Check** (Raft Paper 5.1):
202 /// - Reject requests with stale terms to prevent partitioned Leaders
203 /// 2. **Virtual Log Handling** (Implementation-Specific):
204 /// - Special case for empty logs (prev_log_index=0 && prev_log_term=0)
205 /// 3. **Log Matching** (Raft Paper 5.3):
206 /// - Ensure Leader's prev_log_index/term matches Follower's log
207 ///
208 /// # Parameters
209 /// - `my_term`: Current node's term
210 /// - `request`: Leader's AppendEntries RPC
211 /// - `raft_log`: Reference to node's log store
212 ///
213 /// # Return
214 /// - [`AppendEntriesResponse::success`] if validation passes
215 /// - [`AppendEntriesResponse::higher_term`] if Leader's term is stale
216 /// - [`AppendEntriesResponse::conflict`] with debugging info for log inconsistencies
217 fn check_append_entries_request_is_legal(
218 &self,
219 my_term: u64,
220 request: &AppendEntriesRequest,
221 raft_log: &Arc<ROF<T>>,
222 ) -> AppendEntriesResponse;
223}