1use std::fmt::Debug;
2use std::marker::PhantomData;
3use std::sync::Arc;
4
5use d_engine_proto::common::LogId;
6use d_engine_proto::server::election::VoteRequest;
7use d_engine_proto::server::election::VotedFor;
8use tonic::async_trait;
9use tracing::debug;
10use tracing::error;
11use tracing::trace;
12use tracing::warn;
13
14use super::ElectionCore;
15use crate::ElectionError;
16use crate::Membership;
17use crate::RaftLog;
18use crate::RaftNodeConfig;
19use crate::Result;
20use crate::StateUpdate;
21use crate::Transport;
22use crate::TypeConfig;
23use crate::alias::MOF;
24use crate::alias::ROF;
25use crate::alias::TROF;
26use crate::cluster::is_majority;
27use crate::if_higher_term_found;
28use crate::is_target_log_more_recent;
29
30#[derive(Clone)]
31pub struct ElectionHandler<T: TypeConfig> {
32 pub(crate) my_id: u32,
33 _phantom: PhantomData<T>,
34}
35
36#[async_trait]
37impl<T> ElectionCore<T> for ElectionHandler<T>
38where
39 T: TypeConfig,
40{
41 async fn broadcast_vote_requests(
42 &self,
43 term: u64,
44 membership: Arc<MOF<T>>,
45 raft_log: &Arc<ROF<T>>,
46 transport: &Arc<TROF<T>>,
47 settings: &Arc<RaftNodeConfig>,
48 ) -> Result<()> {
49 debug!("broadcast_vote_requests...");
50
51 if membership.is_single_node_cluster().await {
53 debug!(
54 "Single-node cluster detected (node_id={}): automatically winning election",
55 self.my_id
56 );
57 return Ok(());
58 }
59
60 let members = membership.voters().await;
61 if members.is_empty() {
62 error!("No voting members found for node {}", self.my_id);
63 return Err(ElectionError::NoVotingMemberFound {
64 candidate_id: self.my_id,
65 }
66 .into());
67 }
68
69 debug!("Sending vote requests to peers: {:?}", &members);
70
71 let LogId {
72 index: last_log_index,
73 term: last_log_term,
74 } = raft_log.last_log_id().unwrap_or(LogId { index: 0, term: 0 });
75 let request = VoteRequest {
76 term,
77 candidate_id: self.my_id,
78 last_log_index,
79 last_log_term,
80 };
81
82 match transport.send_vote_requests(request, &settings.retry, membership).await {
83 Ok(vote_result) => {
84 let mut succeed = 1;
85 for response in vote_result.responses {
86 match response {
87 Ok(vote_response) => {
88 if vote_response.vote_granted {
89 debug!("send_vote_requests_to_peers success!");
90 succeed += 1;
91 } else {
92 debug!(
93 "if_higher_term_found({}, {}, false)",
94 term, vote_response.term,
95 );
96 if if_higher_term_found(term, vote_response.term, false) {
97 warn!("Higher term found during election phase.");
98 return Err(
99 ElectionError::HigherTerm(vote_response.term).into()
100 );
101 }
102
103 if is_target_log_more_recent(
104 last_log_index,
105 last_log_term,
106 vote_response.last_log_index,
107 vote_response.last_log_term,
108 ) {
109 warn!("More update to date log found in vote response");
110
111 return Err(ElectionError::LogConflict {
112 index: last_log_index,
113 expected_term: last_log_term,
114 actual_term: vote_response.last_log_term,
115 }
116 .into());
117 }
118
119 warn!("send_vote_requests_to_peers failed!");
120 }
121 }
122 Err(e) => {
123 error!("send_vote_requests_to_peers error: {:?}", e);
124 }
125 }
126 }
127 debug!(
128 "send_vote_requests to: {:?} with succeed number = {}",
129 &vote_result.peer_ids, succeed
130 );
131
132 let required = vote_result.peer_ids.len() + 1;
133 if !vote_result.peer_ids.is_empty() && is_majority(succeed, required) {
134 debug!("send_vote_requests receives majority.");
135 return Ok(());
136 } else {
137 debug!("failed to receive majority votes.");
138 return Err(ElectionError::QuorumFailure { required, succeed }.into());
139 }
140 }
141 Err(e) => {
142 error!("broadcast_vote_requests encountered an error: {:?}", e);
143 return Err(e);
144 }
145 }
146 }
147
148 async fn handle_vote_request(
149 &self,
150 request: VoteRequest,
151 current_term: u64,
152 voted_for_option: Option<VotedFor>,
153 raft_log: &Arc<ROF<T>>,
154 ) -> Result<StateUpdate> {
155 debug!("VoteRequest::Received: {:?}", request);
156 let mut new_voted_for = None;
157 let mut term_update = None;
158 let last_logid = raft_log.last_log_id().unwrap_or(LogId { index: 0, term: 0 });
159
160 let new_voted_for_option = if request.term > current_term {
191 term_update = Some(request.term);
192 None
195 } else {
196 voted_for_option
197 };
198
199 let grant_vote = if request.term < current_term {
201 trace!(
203 "[node-{} -> node-{}] Request term is lower, cannot grant vote. VoteRequest = {:?}",
204 request.candidate_id, self.my_id, &request
205 );
206
207 false
208 } else {
209 if !is_target_log_more_recent(
212 last_logid.index,
213 last_logid.term,
214 request.last_log_index,
215 request.last_log_term,
216 ) {
217 trace!(
218 "node-{}: last_log_index({}(t:{})) -> node-{}: last_log_index({}(t:{}))",
219 request.candidate_id,
220 request.last_log_index,
221 request.last_log_term,
222 self.my_id,
223 last_logid.index,
224 last_logid.term
225 );
226
227 false
228 } else {
229 if let Some(voted_for) = new_voted_for_option {
231 trace!(
232 "[node-{} -> node-{}] node-{} current vote: {:?}",
233 request.candidate_id, self.my_id, self.my_id, &voted_for
234 );
235 voted_for.voted_for_term == request.term
238 && voted_for.voted_for_id == request.candidate_id
239 } else {
240 trace!(
241 "node-{} vote for node-{} successfully!",
242 self.my_id, request.candidate_id,
243 );
244
245 true
246 }
247 }
248 };
249
250 if grant_vote {
251 new_voted_for = Some(VotedFor {
252 voted_for_id: request.candidate_id,
253 voted_for_term: request.term,
254 committed: false,
255 });
256 trace!(
257 "node-{} -> node-{} successfully!",
258 request.candidate_id, self.my_id,
259 );
260 } else {
261 trace!(
262 "node-{} -> node-{} failed!",
263 request.candidate_id, self.my_id,
264 );
265 }
266
267 Ok(StateUpdate {
268 new_voted_for,
269 term_update,
270 })
271 }
272
273 #[tracing::instrument]
281 fn check_vote_request_is_legal(
282 &self,
283 request: &VoteRequest,
284 current_term: u64,
285 last_log_index: u64,
286 last_log_term: u64,
287 voted_for_option: Option<VotedFor>,
288 ) -> bool {
289 if current_term > request.term {
290 debug!(
291 "current_term({:?}) > request.term({:?})",
292 current_term, request.term
293 );
294 return false;
295 }
296
297 if !is_target_log_more_recent(
299 last_log_index,
300 last_log_term,
301 request.last_log_index,
302 request.last_log_term,
303 ) {
304 debug!(
305 "node_log_is_less_than_requester{:?}, last_log_index={:?}, last_log_term={:?}",
306 request, last_log_index, last_log_term
307 );
308 return false;
309 }
310
311 if voted_for_option.is_some()
313 && !self.if_node_could_grant_the_vote_request(request, voted_for_option)
314 {
315 debug!(
316 "node_could_not_grant_the_vote_request: {:?}, voted_for_option={:?}",
317 request, &voted_for_option
318 );
319 return false;
320 }
321
322 true
323 }
324}
325impl<T> ElectionHandler<T>
326where
327 T: TypeConfig,
328{
329 pub fn new(my_id: u32) -> Self {
330 Self {
331 my_id,
332 _phantom: PhantomData,
333 }
334 }
335
336 #[allow(dead_code)]
340 fn if_node_log_is_less_than_requester(
341 &self,
342 request_last_log_index: u64,
343 request_last_log_term: u64,
344 last_log_index: u64,
345 last_log_term: u64,
346 ) -> bool {
347 (request_last_log_term > last_log_term)
348 || (request_last_log_term == last_log_term && request_last_log_index >= last_log_index)
349 }
350
351 fn if_node_could_grant_the_vote_request(
352 &self,
353 request: &VoteRequest,
354 voted_for_option: Option<VotedFor>,
355 ) -> bool {
356 if let Some(vf) = voted_for_option {
357 debug!(
358 "voted_id: {:?}, voted_term: {:?}",
359 vf.voted_for_id, vf.voted_for_term
360 );
361
362 if vf.voted_for_id == 0 {
363 return true;
364 }
365
366 if vf.voted_for_term < request.term {
367 return true;
368 }
369
370 false
371 } else {
372 true
373 }
374 }
375}
376
377impl<T: TypeConfig> Debug for ElectionHandler<T> {
378 fn fmt(
379 &self,
380 f: &mut std::fmt::Formatter<'_>,
381 ) -> std::fmt::Result {
382 f.debug_struct("ElectionHandler").field("my_id", &self.my_id).finish()
383 }
384}