d_engine_core/replication/mod.rs
1//! Raft log replication implementation (Section 5.3)
2//!
3//! Handles core replication mechanics including:
4//! - Leader log propagation
5//! - Follower log consistency checks
6//! - Conflict resolution algorithms
7mod replication_handler;
8pub use replication_handler::*;
9
10#[cfg(test)]
11pub mod replication_handler_test;
12
13// Client Request Extension Definition
14// -----------------------------------------------------------------------------
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use d_engine_proto::client::ClientResponse;
19use d_engine_proto::common::Entry;
20use d_engine_proto::common::EntryPayload;
21use d_engine_proto::server::replication::AppendEntriesRequest;
22use d_engine_proto::server::replication::AppendEntriesResponse;
23use d_engine_proto::server::replication::ConflictResult;
24use d_engine_proto::server::replication::SuccessResult;
25use dashmap::DashMap;
26#[cfg(any(test, feature = "__test_support"))]
27use mockall::automock;
28use tonic::Status;
29use tonic::async_trait;
30
31use super::LeaderStateSnapshot;
32use super::StateSnapshot;
33use crate::AppendResults;
34use crate::MaybeCloneOneshotSender;
35use crate::Result;
36use crate::TypeConfig;
37use crate::alias::ROF;
38
39/// Request with response channel that can handle all Raft payload types
40#[derive(Debug)]
41pub struct RaftRequestWithSignal {
42 #[allow(unused)]
43 pub id: String,
44 pub payloads: Vec<EntryPayload>,
45 /// Multiple senders for merged requests (1 sender per payload, matched by index)
46 /// Invariant: senders.len() == payloads.len()
47 pub senders: Vec<MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>>,
48
49 /// Does this request need to wait for StateMachine's ApplyCompleted event?
50 ///
51 /// - `true`: Command payload → must wait for state machine apply
52 /// - `false`: Noop/Config payload → respond immediately after commit
53 pub wait_for_apply_event: bool,
54}
55
56/// AppendEntries response with possible state changes
57#[derive(Debug)]
58pub struct AppendResponseWithUpdates {
59 pub response: AppendEntriesResponse,
60 pub commit_index_update: Option<u64>, // Commit_index to be updated
61}
62
63/// Core replication protocol operations
64#[cfg_attr(any(test, feature = "__test_support"), automock)]
65#[async_trait]
66pub trait ReplicationCore<T>: Send + Sync + 'static
67where
68 T: TypeConfig,
69{
70 /// As Leader, send replications to peers (combines regular heartbeats and client proposals).
71 ///
72 /// Performs peer synchronization checks:
73 /// 1. Verifies if any peer's next_id <= leader's commit_index
74 /// 2. For non-synced peers: retrieves unsynced logs and buffers them
75 /// 3. Prepends unsynced entries to the entries queue
76 ///
77 /// # Returns
78 /// - `Ok(AppendResults)` with aggregated replication outcomes
79 /// - `Err` for unrecoverable errors
80 ///
81 /// # Return Result Semantics
82 /// 1. **Insufficient Quorum**: Returns `Ok(AppendResults)` with `commit_quorum_achieved =
83 /// false` when:
84 /// - Responses received from all nodes but majority acceptance not achieved
85 /// - Partial timeouts reduce successful responses below majority
86 ///
87 /// 2. **Timeout Handling**:
88 /// - Partial timeouts: Returns `Ok` with `commit_quorum_achieved = false`
89 /// - Complete timeout: Returns `Ok` with `commit_quorum_achieved = false`
90 /// - Timeout peers are EXCLUDED from `peer_updates`
91 ///
92 /// 3. **Error Conditions**: Returns `Err` ONLY for:
93 /// - Empty voting members (`ReplicationError::NoPeerFound`)
94 /// - Log generation failures (`generate_new_entries` errors)
95 /// - Higher term detected in peer response (`ReplicationError::HigherTerm`)
96 /// - Critical response handling errors
97 ///
98 /// # Guarantees
99 /// - Only peers with successful responses appear in `peer_updates`
100 /// - Timeouts never cause top-level `Err` (handled as failed responses)
101 /// - Leader self-vote always counted in quorum calculation
102 ///
103 /// # Note
104 /// - Leader state should be updated by LeaderState only(follows SRP).
105 ///
106 /// # Quorum
107 /// - If there are no voters (not even the leader), quorum is not possible.
108 /// - If the leader is the only voter, quorum is always achieved.
109 /// - If all nodes are learners, quorum is not achieved.
110 async fn handle_raft_request_in_batch(
111 &self,
112 entry_payloads: Vec<EntryPayload>,
113 state_snapshot: StateSnapshot,
114 leader_state_snapshot: LeaderStateSnapshot,
115 cluster_metadata: &crate::raft_role::ClusterMetadata,
116 ctx: &crate::RaftContext<T>,
117 ) -> Result<AppendResults>;
118
119 /// Handles successful AppendEntries responses
120 ///
121 /// Updates peer match/next indices according to:
122 /// - Last matched log index
123 /// - Current leader term
124 fn handle_success_response(
125 &self,
126 peer_id: u32,
127 peer_term: u64,
128 success_result: SuccessResult,
129 leader_term: u64,
130 ) -> Result<crate::PeerUpdate>;
131
132 /// Resolves log conflicts from follower responses
133 ///
134 /// Implements conflict backtracking optimization (Section 5.3)
135 fn handle_conflict_response(
136 &self,
137 peer_id: u32,
138 conflict_result: ConflictResult,
139 raft_log: &Arc<ROF<T>>,
140 current_next_index: u64,
141 ) -> Result<crate::PeerUpdate>;
142
143 /// Determines follower commit index advancement
144 ///
145 /// Applies Leader's commit index according to:
146 /// - min(leader_commit, last_local_log_index)
147 fn if_update_commit_index_as_follower(
148 my_commit_index: u64,
149 last_raft_log_id: u64,
150 leader_commit_index: u64,
151 ) -> Option<u64>;
152
153 /// Gathers legacy logs for lagging peers
154 ///
155 /// Performs log segmentation based on:
156 /// - Peer's next_index
157 /// - Max allowed historical entries
158 fn retrieve_to_be_synced_logs_for_peers(
159 &self,
160 new_entries: Vec<Entry>,
161 leader_last_index_before_inserting_new_entries: u64,
162 max_legacy_entries_per_peer: u64, //Maximum number of entries
163 peer_next_indices: &HashMap<u32, u64>,
164 raft_log: &Arc<ROF<T>>,
165 ) -> DashMap<u32, Vec<Entry>>;
166
167 /// Handles an incoming AppendEntries RPC request (called by ALL ROLES)
168 ///
169 /// Core responsibilities:
170 /// 1. Term validation and comparison (RFC §5.1)
171 /// 2. Log consistency checking (prev_log_index/term)
172 /// 3. Entry appending with conflict resolution (RFC §5.3)
173 /// 4. Commit index advancement (RFC §5.3)
174 ///
175 /// # Critical Architecture Constraints
176 /// 1. ROLE AGNOSTIC - This method contains no role-specific logic. It simply:
177 /// - Validates the RPC against local state
178 /// - Returns required state updates
179 /// 2. STATE CHANGE ISOLATION - Never directly modifies:
180 /// - Current role (Leader/Follower/etc)
181 /// - Persistent state (term/votedFor)
182 /// - Volatile leader state (nextIndex/matchIndex)
183 /// 3. CALLER MUST:
184 /// - Check `response.term_update` for term conflicts
185 /// - If higher term exists, transition to Follower
186 /// - Apply other state updates via role_tx
187 async fn handle_append_entries(
188 &self,
189 request: AppendEntriesRequest,
190 state_snapshot: &StateSnapshot,
191 raft_log: &Arc<ROF<T>>,
192 ) -> Result<AppendResponseWithUpdates>;
193
194 /// Validates an incoming AppendEntries RPC from a Leader against Raft protocol rules.
195 ///
196 /// This function implements the **log consistency checks** defined in Raft paper Section 5.3.
197 /// It determines whether to accept the Leader's log entries by verifying:
198 /// 1. Term freshness
199 /// 2. Virtual log (prev_log_index=0) handling
200 /// 3. Previous log entry consistency
201 ///
202 /// # Raft Protocol Rules Enforced
203 /// 1. **Term Check** (Raft Paper 5.1):
204 /// - Reject requests with stale terms to prevent partitioned Leaders
205 /// 2. **Virtual Log Handling** (Implementation-Specific):
206 /// - Special case for empty logs (prev_log_index=0 && prev_log_term=0)
207 /// 3. **Log Matching** (Raft Paper 5.3):
208 /// - Ensure Leader's prev_log_index/term matches Follower's log
209 ///
210 /// # Parameters
211 /// - `my_term`: Current node's term
212 /// - `request`: Leader's AppendEntries RPC
213 /// - `raft_log`: Reference to node's log store
214 ///
215 /// # Return
216 /// - [`AppendEntriesResponse::success`] if validation passes
217 /// - [`AppendEntriesResponse::higher_term`] if Leader's term is stale
218 /// - [`AppendEntriesResponse::conflict`] with debugging info for log inconsistencies
219 fn check_append_entries_request_is_legal(
220 &self,
221 my_term: u64,
222 request: &AppendEntriesRequest,
223 raft_log: &Arc<ROF<T>>,
224 ) -> AppendEntriesResponse;
225}