d_engine_core/election/
election_handler.rs

1use std::fmt::Debug;
2use std::marker::PhantomData;
3use std::sync::Arc;
4
5use d_engine_proto::common::LogId;
6use d_engine_proto::server::election::VoteRequest;
7use d_engine_proto::server::election::VotedFor;
8use tonic::async_trait;
9use tracing::debug;
10use tracing::error;
11use tracing::trace;
12use tracing::warn;
13
14use super::ElectionCore;
15use crate::ElectionError;
16use crate::Membership;
17use crate::RaftLog;
18use crate::RaftNodeConfig;
19use crate::Result;
20use crate::StateUpdate;
21use crate::Transport;
22use crate::TypeConfig;
23use crate::alias::MOF;
24use crate::alias::ROF;
25use crate::alias::TROF;
26use crate::cluster::is_majority;
27use crate::if_higher_term_found;
28use crate::is_target_log_more_recent;
29
30#[derive(Clone)]
31pub struct ElectionHandler<T: TypeConfig> {
32    pub(crate) my_id: u32,
33    _phantom: PhantomData<T>,
34}
35
36#[async_trait]
37impl<T> ElectionCore<T> for ElectionHandler<T>
38where
39    T: TypeConfig,
40{
41    async fn broadcast_vote_requests(
42        &self,
43        term: u64,
44        membership: Arc<MOF<T>>,
45        raft_log: &Arc<ROF<T>>,
46        transport: &Arc<TROF<T>>,
47        settings: &Arc<RaftNodeConfig>,
48    ) -> Result<()> {
49        debug!("broadcast_vote_requests...");
50
51        // Single-node cluster: no peers to vote, automatically win election
52        if membership.is_single_node_cluster().await {
53            debug!(
54                "Single-node cluster detected (node_id={}): automatically winning election",
55                self.my_id
56            );
57            return Ok(());
58        }
59
60        let members = membership.voters().await;
61        if members.is_empty() {
62            error!("No voting members found for node {}", self.my_id);
63            return Err(ElectionError::NoVotingMemberFound {
64                candidate_id: self.my_id,
65            }
66            .into());
67        }
68
69        debug!("Sending vote requests to peers: {:?}", &members);
70
71        let LogId {
72            index: last_log_index,
73            term: last_log_term,
74        } = raft_log.last_log_id().unwrap_or(LogId { index: 0, term: 0 });
75        let request = VoteRequest {
76            term,
77            candidate_id: self.my_id,
78            last_log_index,
79            last_log_term,
80        };
81
82        match transport.send_vote_requests(request, &settings.retry, membership).await {
83            Ok(vote_result) => {
84                let mut succeed = 1;
85                for response in vote_result.responses {
86                    match response {
87                        Ok(vote_response) => {
88                            if vote_response.vote_granted {
89                                debug!("send_vote_requests_to_peers success!");
90                                succeed += 1;
91                            } else {
92                                debug!(
93                                    "if_higher_term_found({}, {}, false)",
94                                    term, vote_response.term,
95                                );
96                                if if_higher_term_found(term, vote_response.term, false) {
97                                    warn!("Higher term found during election phase.");
98                                    return Err(
99                                        ElectionError::HigherTerm(vote_response.term).into()
100                                    );
101                                }
102
103                                if is_target_log_more_recent(
104                                    last_log_index,
105                                    last_log_term,
106                                    vote_response.last_log_index,
107                                    vote_response.last_log_term,
108                                ) {
109                                    warn!("More update to date log found in vote response");
110
111                                    return Err(ElectionError::LogConflict {
112                                        index: last_log_index,
113                                        expected_term: last_log_term,
114                                        actual_term: vote_response.last_log_term,
115                                    }
116                                    .into());
117                                }
118
119                                warn!("send_vote_requests_to_peers failed!");
120                            }
121                        }
122                        Err(e) => {
123                            error!("send_vote_requests_to_peers error: {:?}", e);
124                        }
125                    }
126                }
127                debug!(
128                    "send_vote_requests to: {:?} with succeed number = {}",
129                    &vote_result.peer_ids, succeed
130                );
131
132                let required = vote_result.peer_ids.len() + 1;
133                if !vote_result.peer_ids.is_empty() && is_majority(succeed, required) {
134                    debug!("send_vote_requests receives majority.");
135                    return Ok(());
136                } else {
137                    debug!("failed to receive majority votes.");
138                    return Err(ElectionError::QuorumFailure { required, succeed }.into());
139                }
140            }
141            Err(e) => {
142                error!("broadcast_vote_requests encountered an error: {:?}", e);
143                return Err(e);
144            }
145        }
146    }
147
148    async fn handle_vote_request(
149        &self,
150        request: VoteRequest,
151        current_term: u64,
152        voted_for_option: Option<VotedFor>,
153        raft_log: &Arc<ROF<T>>,
154    ) -> Result<StateUpdate> {
155        debug!("VoteRequest::Received: {:?}", request);
156        let mut new_voted_for = None;
157        let mut term_update = None;
158        let last_logid = raft_log.last_log_id().unwrap_or(LogId { index: 0, term: 0 });
159
160        // if self.check_vote_request_is_legal(
161        //     &request,
162        //     current_term,
163        //     last_logid.index,
164        //     last_logid.term,
165        //     voted_for_option,
166        // ) {
167        //     debug!("switch to follower");
168        //     let term = request.term;
169
170        //     // 1. Update term
171        //     term_update = Some(term);
172
173        //     // 2. update vote for
174        //     debug!(
175        //         "updated my voted for: target node: {:?} with term:{:?}",
176        //         request.candidate_id, term
177        //     );
178        //     new_voted_for = Some(VotedFor {
179        //         voted_for_id: request.candidate_id,
180        //         voted_for_term: term,
181        //     });
182        // }
183        // Ok(StateUpdate {
184        //     new_voted_for,
185        //     term_update,
186        // })
187        //--------------------------------------------------
188
189        // Check if request term is higher than current term
190        let new_voted_for_option = if request.term > current_term {
191            term_update = Some(request.term);
192            // When updating term, reset voted_for to allow voting in new term
193            // But we haven't voted yet, so we'll decide below
194            None
195        } else {
196            voted_for_option
197        };
198
199        // Check if we should grant the vote
200        let grant_vote = if request.term < current_term {
201            // Request term is lower, cannot grant vote
202            trace!(
203                "[node-{} -> node-{}] Request term is lower, cannot grant vote. VoteRequest = {:?}",
204                request.candidate_id, self.my_id, &request
205            );
206
207            false
208        } else {
209            // Request term is >= current term
210            // Check log completeness
211            if !is_target_log_more_recent(
212                last_logid.index,
213                last_logid.term,
214                request.last_log_index,
215                request.last_log_term,
216            ) {
217                trace!(
218                    "node-{}: last_log_index({}(t:{})) -> node-{}: last_log_index({}(t:{}))",
219                    request.candidate_id,
220                    request.last_log_index,
221                    request.last_log_term,
222                    self.my_id,
223                    last_logid.index,
224                    last_logid.term
225                );
226
227                false
228            } else {
229                // Check if already voted for someone else in this term
230                if let Some(voted_for) = new_voted_for_option {
231                    trace!(
232                        "[node-{} -> node-{}] node-{} current vote: {:?}",
233                        request.candidate_id, self.my_id, self.my_id, &voted_for
234                    );
235                    // If already voted for someone else, cannot grant vote unless it's the same
236                    // candidate
237                    voted_for.voted_for_term == request.term
238                        && voted_for.voted_for_id == request.candidate_id
239                } else {
240                    trace!(
241                        "node-{} vote for node-{} successfully!",
242                        self.my_id, request.candidate_id,
243                    );
244
245                    true
246                }
247            }
248        };
249
250        if grant_vote {
251            new_voted_for = Some(VotedFor {
252                voted_for_id: request.candidate_id,
253                voted_for_term: request.term,
254                committed: false,
255            });
256            trace!(
257                "node-{} -> node-{} successfully!",
258                request.candidate_id, self.my_id,
259            );
260        } else {
261            trace!(
262                "node-{} -> node-{} failed!",
263                request.candidate_id, self.my_id,
264            );
265        }
266
267        Ok(StateUpdate {
268            new_voted_for,
269            term_update,
270        })
271    }
272
273    /// The function to check RPC request is leagal or not
274    ///
275    /// Criterias to check:
276    /// - votedFor is null or candidateId
277    /// - candidate s log is at least as up-to-date as receiver s log
278    /// e.g. { my_id: 2 } request=VoteRequest { term: 3, candidate_id: 1, last_log_index: 2,
279    /// last_log_term: 10 } current_term=3 last_log_index=3 last_log_term=8 voted_for_option=None
280    #[tracing::instrument]
281    fn check_vote_request_is_legal(
282        &self,
283        request: &VoteRequest,
284        current_term: u64,
285        last_log_index: u64,
286        last_log_term: u64,
287        voted_for_option: Option<VotedFor>,
288    ) -> bool {
289        if current_term > request.term {
290            debug!(
291                "current_term({:?}) > request.term({:?})",
292                current_term, request.term
293            );
294            return false;
295        }
296
297        //step 1: check if I have more logs than the requester
298        if !is_target_log_more_recent(
299            last_log_index,
300            last_log_term,
301            request.last_log_index,
302            request.last_log_term,
303        ) {
304            debug!(
305                "node_log_is_less_than_requester{:?}, last_log_index={:?}, last_log_term={:?}",
306                request, last_log_index, last_log_term
307            );
308            return false;
309        }
310
311        //step 2: check if I have voted for this term
312        if voted_for_option.is_some()
313            && !self.if_node_could_grant_the_vote_request(request, voted_for_option)
314        {
315            debug!(
316                "node_could_not_grant_the_vote_request: {:?}, voted_for_option={:?}",
317                request, &voted_for_option
318            );
319            return false;
320        }
321
322        true
323    }
324}
325impl<T> ElectionHandler<T>
326where
327    T: TypeConfig,
328{
329    pub fn new(my_id: u32) -> Self {
330        Self {
331            my_id,
332            _phantom: PhantomData,
333        }
334    }
335
336    /// logOk == \/ m.mlastLogTerm > LastTerm(log[i])
337    ///          \/ /\ m.mlastLogTerm = LastTerm(log[i])
338    ///             /\ m.mlastLogIndex >= Len(log[i])
339    #[allow(dead_code)]
340    fn if_node_log_is_less_than_requester(
341        &self,
342        request_last_log_index: u64,
343        request_last_log_term: u64,
344        last_log_index: u64,
345        last_log_term: u64,
346    ) -> bool {
347        (request_last_log_term > last_log_term)
348            || (request_last_log_term == last_log_term && request_last_log_index >= last_log_index)
349    }
350
351    fn if_node_could_grant_the_vote_request(
352        &self,
353        request: &VoteRequest,
354        voted_for_option: Option<VotedFor>,
355    ) -> bool {
356        if let Some(vf) = voted_for_option {
357            debug!(
358                "voted_id: {:?}, voted_term: {:?}",
359                vf.voted_for_id, vf.voted_for_term
360            );
361
362            if vf.voted_for_id == 0 {
363                return true;
364            }
365
366            if vf.voted_for_term < request.term {
367                return true;
368            }
369
370            false
371        } else {
372            true
373        }
374    }
375}
376
377impl<T: TypeConfig> Debug for ElectionHandler<T> {
378    fn fmt(
379        &self,
380        f: &mut std::fmt::Formatter<'_>,
381    ) -> std::fmt::Result {
382        f.debug_struct("ElectionHandler").field("my_id", &self.my_id).finish()
383    }
384}