d_engine_core/replication/mod.rs
1//! Raft log replication implementation (Section 5.3)
2//!
3//! Handles core replication mechanics including:
4//! - Leader log propagation
5//! - Follower log consistency checks
6//! - Conflict resolution algorithms
7mod batch_buffer;
8mod replication_handler;
9
10pub use batch_buffer::*;
11pub use replication_handler::*;
12
13#[cfg(test)]
14mod batch_buffer_test;
15
16// Client Request Extension Definition
17// -----------------------------------------------------------------------------
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use d_engine_proto::client::ClientResponse;
22use d_engine_proto::common::Entry;
23use d_engine_proto::common::EntryPayload;
24use d_engine_proto::server::replication::AppendEntriesRequest;
25use d_engine_proto::server::replication::AppendEntriesResponse;
26use d_engine_proto::server::replication::ConflictResult;
27use d_engine_proto::server::replication::SuccessResult;
28use dashmap::DashMap;
29#[cfg(any(test, feature = "test-utils"))]
30use mockall::automock;
31use tonic::Status;
32use tonic::async_trait;
33
34use super::LeaderStateSnapshot;
35use super::StateSnapshot;
36use crate::AppendResults;
37use crate::MaybeCloneOneshotSender;
38use crate::Result;
39use crate::TypeConfig;
40use crate::alias::ROF;
41
42/// Request with response channel that can handle all Raft payload types
43#[derive(Debug)]
44pub struct RaftRequestWithSignal {
45 #[allow(unused)]
46 pub id: String,
47 pub payloads: Vec<EntryPayload>,
48 pub sender: MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
49}
50
51/// AppendEntries response with possible state changes
52#[derive(Debug)]
53pub struct AppendResponseWithUpdates {
54 pub response: AppendEntriesResponse,
55 pub commit_index_update: Option<u64>, // Commit_index to be updated
56}
57
58/// Core replication protocol operations
59#[cfg_attr(any(test, feature = "test-utils"), automock)]
60#[async_trait]
61pub trait ReplicationCore<T>: Send + Sync + 'static
62where
63 T: TypeConfig,
64{
65 /// As Leader, send replications to peers (combines regular heartbeats and client proposals).
66 ///
67 /// Performs peer synchronization checks:
68 /// 1. Verifies if any peer's next_id <= leader's commit_index
69 /// 2. For non-synced peers: retrieves unsynced logs and buffers them
70 /// 3. Prepends unsynced entries to the entries queue
71 ///
72 /// # Returns
73 /// - `Ok(AppendResults)` with aggregated replication outcomes
74 /// - `Err` for unrecoverable errors
75 ///
76 /// # Return Result Semantics
77 /// 1. **Insufficient Quorum**: Returns `Ok(AppendResults)` with `commit_quorum_achieved =
78 /// false` when:
79 /// - Responses received from all nodes but majority acceptance not achieved
80 /// - Partial timeouts reduce successful responses below majority
81 ///
82 /// 2. **Timeout Handling**:
83 /// - Partial timeouts: Returns `Ok` with `commit_quorum_achieved = false`
84 /// - Complete timeout: Returns `Ok` with `commit_quorum_achieved = false`
85 /// - Timeout peers are EXCLUDED from `peer_updates`
86 ///
87 /// 3. **Error Conditions**: Returns `Err` ONLY for:
88 /// - Empty voting members (`ReplicationError::NoPeerFound`)
89 /// - Log generation failures (`generate_new_entries` errors)
90 /// - Higher term detected in peer response (`ReplicationError::HigherTerm`)
91 /// - Critical response handling errors
92 ///
93 /// # Guarantees
94 /// - Only peers with successful responses appear in `peer_updates`
95 /// - Timeouts never cause top-level `Err` (handled as failed responses)
96 /// - Leader self-vote always counted in quorum calculation
97 ///
98 /// # Note
99 /// - Leader state should be updated by LeaderState only(follows SRP).
100 ///
101 /// # Quorum
102 /// - If there are no voters (not even the leader), quorum is not possible.
103 /// - If the leader is the only voter, quorum is always achieved.
104 /// - If all nodes are learners, quorum is not achieved.
105 async fn handle_raft_request_in_batch(
106 &self,
107 entry_payloads: Vec<EntryPayload>,
108 state_snapshot: StateSnapshot,
109 leader_state_snapshot: LeaderStateSnapshot,
110 cluster_metadata: &crate::raft_role::ClusterMetadata,
111 ctx: &crate::RaftContext<T>,
112 ) -> Result<AppendResults>;
113
114 /// Handles successful AppendEntries responses
115 ///
116 /// Updates peer match/next indices according to:
117 /// - Last matched log index
118 /// - Current leader term
119 fn handle_success_response(
120 &self,
121 peer_id: u32,
122 peer_term: u64,
123 success_result: SuccessResult,
124 leader_term: u64,
125 ) -> Result<crate::PeerUpdate>;
126
127 /// Resolves log conflicts from follower responses
128 ///
129 /// Implements conflict backtracking optimization (Section 5.3)
130 fn handle_conflict_response(
131 &self,
132 peer_id: u32,
133 conflict_result: ConflictResult,
134 raft_log: &Arc<ROF<T>>,
135 current_next_index: u64,
136 ) -> Result<crate::PeerUpdate>;
137
138 /// Determines follower commit index advancement
139 ///
140 /// Applies Leader's commit index according to:
141 /// - min(leader_commit, last_local_log_index)
142 fn if_update_commit_index_as_follower(
143 my_commit_index: u64,
144 last_raft_log_id: u64,
145 leader_commit_index: u64,
146 ) -> Option<u64>;
147
148 /// Gathers legacy logs for lagging peers
149 ///
150 /// Performs log segmentation based on:
151 /// - Peer's next_index
152 /// - Max allowed historical entries
153 fn retrieve_to_be_synced_logs_for_peers(
154 &self,
155 new_entries: Vec<Entry>,
156 leader_last_index_before_inserting_new_entries: u64,
157 max_legacy_entries_per_peer: u64, //Maximum number of entries
158 peer_next_indices: &HashMap<u32, u64>,
159 raft_log: &Arc<ROF<T>>,
160 ) -> DashMap<u32, Vec<Entry>>;
161
162 /// Handles an incoming AppendEntries RPC request (called by ALL ROLES)
163 ///
164 /// Core responsibilities:
165 /// 1. Term validation and comparison (RFC §5.1)
166 /// 2. Log consistency checking (prev_log_index/term)
167 /// 3. Entry appending with conflict resolution (RFC §5.3)
168 /// 4. Commit index advancement (RFC §5.3)
169 ///
170 /// # Critical Architecture Constraints
171 /// 1. ROLE AGNOSTIC - This method contains no role-specific logic. It simply:
172 /// - Validates the RPC against local state
173 /// - Returns required state updates
174 /// 2. STATE CHANGE ISOLATION - Never directly modifies:
175 /// - Current role (Leader/Follower/etc)
176 /// - Persistent state (term/votedFor)
177 /// - Volatile leader state (nextIndex/matchIndex)
178 /// 3. CALLER MUST:
179 /// - Check `response.term_update` for term conflicts
180 /// - If higher term exists, transition to Follower
181 /// - Apply other state updates via role_tx
182 async fn handle_append_entries(
183 &self,
184 request: AppendEntriesRequest,
185 state_snapshot: &StateSnapshot,
186 raft_log: &Arc<ROF<T>>,
187 ) -> Result<AppendResponseWithUpdates>;
188
189 /// Validates an incoming AppendEntries RPC from a Leader against Raft protocol rules.
190 ///
191 /// This function implements the **log consistency checks** defined in Raft paper Section 5.3.
192 /// It determines whether to accept the Leader's log entries by verifying:
193 /// 1. Term freshness
194 /// 2. Virtual log (prev_log_index=0) handling
195 /// 3. Previous log entry consistency
196 ///
197 /// # Raft Protocol Rules Enforced
198 /// 1. **Term Check** (Raft Paper 5.1):
199 /// - Reject requests with stale terms to prevent partitioned Leaders
200 /// 2. **Virtual Log Handling** (Implementation-Specific):
201 /// - Special case for empty logs (prev_log_index=0 && prev_log_term=0)
202 /// 3. **Log Matching** (Raft Paper 5.3):
203 /// - Ensure Leader's prev_log_index/term matches Follower's log
204 ///
205 /// # Parameters
206 /// - `my_term`: Current node's term
207 /// - `request`: Leader's AppendEntries RPC
208 /// - `raft_log`: Reference to node's log store
209 ///
210 /// # Return
211 /// - [`AppendEntriesResponse::success`] if validation passes
212 /// - [`AppendEntriesResponse::higher_term`] if Leader's term is stale
213 /// - [`AppendEntriesResponse::conflict`] with debugging info for log inconsistencies
214 fn check_append_entries_request_is_legal(
215 &self,
216 my_term: u64,
217 request: &AppendEntriesRequest,
218 raft_log: &Arc<ROF<T>>,
219 ) -> AppendEntriesResponse;
220}