d_engine_core/raft_role/
leader_state.rs

1use std::collections::HashMap;
2use std::collections::VecDeque;
3use std::fmt::Debug;
4use std::marker::PhantomData;
5use std::sync::Arc;
6use std::sync::atomic::AtomicBool;
7use std::sync::atomic::AtomicU64;
8use std::sync::atomic::Ordering;
9use std::time::Duration;
10
11use d_engine_proto::client::ClientResponse;
12use d_engine_proto::client::ReadConsistencyPolicy as ClientReadConsistencyPolicy;
13use d_engine_proto::common::AddNode;
14use d_engine_proto::common::BatchPromote;
15use d_engine_proto::common::BatchRemove;
16use d_engine_proto::common::EntryPayload;
17use d_engine_proto::common::LogId;
18use d_engine_proto::common::NodeRole::Leader;
19use d_engine_proto::common::NodeStatus;
20use d_engine_proto::common::membership_change::Change;
21use d_engine_proto::error::ErrorCode;
22use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
23use d_engine_proto::server::cluster::JoinRequest;
24use d_engine_proto::server::cluster::JoinResponse;
25use d_engine_proto::server::cluster::LeaderDiscoveryResponse;
26use d_engine_proto::server::cluster::NodeMeta;
27use d_engine_proto::server::election::VoteResponse;
28use d_engine_proto::server::election::VotedFor;
29use d_engine_proto::server::replication::AppendEntriesResponse;
30use d_engine_proto::server::storage::PurgeLogRequest;
31use d_engine_proto::server::storage::PurgeLogResponse;
32use d_engine_proto::server::storage::SnapshotChunk;
33use d_engine_proto::server::storage::SnapshotMetadata;
34use nanoid::nanoid;
35use tokio::sync::mpsc;
36use tokio::sync::oneshot;
37use tokio::time::Instant;
38use tokio::time::sleep;
39use tokio::time::timeout;
40use tonic::Status;
41use tonic::async_trait;
42use tracing::debug;
43use tracing::error;
44use tracing::info;
45use tracing::instrument;
46use tracing::trace;
47use tracing::warn;
48
49use super::LeaderStateSnapshot;
50use super::RaftRole;
51use super::SharedState;
52use super::StateSnapshot;
53use super::candidate_state::CandidateState;
54use super::role_state::RaftRoleState;
55use crate::AppendResults;
56use crate::BackgroundSnapshotTransfer;
57use crate::BatchBuffer;
58use crate::ConnectionType;
59use crate::ConsensusError;
60use crate::Error;
61use crate::MaybeCloneOneshot;
62use crate::MaybeCloneOneshotSender;
63use crate::Membership;
64use crate::MembershipError;
65use crate::NetworkError;
66use crate::PeerUpdate;
67use crate::PurgeExecutor;
68use crate::QuorumVerificationResult;
69use crate::RaftContext;
70use crate::RaftEvent;
71use crate::RaftLog;
72use crate::RaftNodeConfig;
73use crate::RaftOneshot;
74use crate::RaftRequestWithSignal;
75use crate::ReadConsistencyPolicy as ServerReadConsistencyPolicy;
76use crate::ReplicationConfig;
77use crate::ReplicationCore;
78use crate::ReplicationError;
79use crate::ReplicationTimer;
80use crate::Result;
81use crate::RoleEvent;
82use crate::SnapshotConfig;
83use crate::StateMachine;
84use crate::StateMachineHandler;
85use crate::StateTransitionError;
86use crate::SystemError;
87use crate::Transport;
88use crate::TypeConfig;
89use crate::alias::MOF;
90use crate::alias::ROF;
91use crate::alias::SMHOF;
92use crate::client_command_to_entry_payloads;
93use crate::cluster::is_majority;
94use crate::ensure_safe_join;
95use crate::scoped_timer::ScopedTimer;
96use crate::stream::create_production_snapshot_stream;
97use crate::utils::cluster::error;
98
99// Supporting data structures
100#[derive(Debug, Clone)]
101pub struct PendingPromotion {
102    pub node_id: u32,
103    pub ready_since: Instant,
104}
105
106impl PendingPromotion {
107    pub fn new(
108        node_id: u32,
109        ready_since: Instant,
110    ) -> Self {
111        PendingPromotion {
112            node_id,
113            ready_since,
114        }
115    }
116}
117
118/// Cached cluster topology metadata for hot path optimization.
119///
120/// This metadata is immutable during leadership and only updated on membership changes.
121/// Caching avoids repeated async calls to membership queries in hot paths like append_entries.
122#[derive(Debug, Clone)]
123pub struct ClusterMetadata {
124    /// Single-voter cluster (quorum = 1, used for election and commit optimization)
125    pub single_voter: bool,
126    /// Total number of voters including self (updated on membership changes)
127    pub total_voters: usize,
128    /// Cached replication targets (voters + learners, excluding self)
129    pub replication_targets: Vec<NodeMeta>,
130}
131
132/// Leader node's state in Raft consensus algorithm.
133///
134/// This structure maintains all state that should be persisted on leader crashes,
135/// including replication progress tracking and log compaction management.
136///
137/// # Type Parameters
138/// - `T`: Application-specific Raft type configuration
139pub struct LeaderState<T: TypeConfig> {
140    // -- Core Raft Leader State --
141    /// Shared cluster state with lock protection
142    pub shared_state: SharedState,
143
144    /// === Volatile State ===
145    /// For each server (node_id), index of the next log entry to send to that server
146    ///
147    /// Raft Paper: ยง5.3 Figure 2 (nextIndex)
148    pub next_index: HashMap<u32, u64>,
149
150    /// === Volatile State ===
151    /// For each server (node_id), index of highest log entry known to be replicated
152    ///
153    /// Raft Paper: ยง5.3 Figure 2 (matchIndex)
154    pub(super) match_index: HashMap<u32, u64>,
155
156    /// === Volatile State ===
157    /// Temporary storage for no-op entry log ID during leader initialization
158    pub(super) noop_log_id: Option<u64>,
159
160    // -- Log Compaction & Purge --
161    /// === Volatile State ===
162    /// The upper bound (exclusive) of log entries scheduled for asynchronous physical deletion.
163    ///
164    /// This value is set immediately after a new snapshot is successfully created.
165    /// It represents the next log position that will trigger compaction.
166    ///
167    /// The actual log purge is performed by a background task, which may be delayed
168    /// due to resource constraints or retry mechanisms.
169    pub scheduled_purge_upto: Option<LogId>,
170
171    /// === Persistent State (MUST be on disk) ===
172    /// The last log position that has been **physically removed** from stable storage.
173    ///
174    /// This value is atomically updated when:
175    /// 1. A new snapshot is persisted (marking logs up to `last_included_index` as purgeable)
176    /// 2. The background purge task completes successfully
177    ///
178    /// Raft safety invariant:
179    /// Any log entry with index โ‰ค `last_purged_index` is guaranteed to be
180    /// reflected in the latest snapshot.
181    pub last_purged_index: Option<LogId>,
182
183    /// === Volatile State ===
184    /// Peer purge progress tracking for flow control
185    ///
186    /// Key: Peer node ID
187    /// Value: Last confirmed purge index from peer
188    pub peer_purge_progress: HashMap<u32, u64>,
189
190    /// Record if there is on-going snapshot creation activity
191    pub snapshot_in_progress: AtomicBool,
192
193    // -- Request Processing --
194    /// Batched proposal buffer for client requests
195    ///
196    /// Accumulates requests until either:
197    /// 1. Batch reaches configured size limit
198    /// 2. Explicit flush is triggered
199    batch_buffer: Box<BatchBuffer<RaftRequestWithSignal>>,
200
201    // -- Timing & Scheduling --
202    /// Replication heartbeat timer manager
203    ///
204    /// Handles:
205    /// - Heartbeat interval tracking
206    /// - Election timeout prevention
207    /// - Batch proposal flushing
208    timer: Box<ReplicationTimer>,
209
210    // -- Cluster Configuration --
211    /// Cached Raft node configuration (shared reference)
212    ///
213    /// This includes:
214    /// - Cluster membership
215    /// - Timeout parameters
216    /// - Performance tuning knobs
217    pub(super) node_config: Arc<RaftNodeConfig>,
218
219    /// Last time we checked for learners
220    pub last_learner_check: Instant,
221
222    // -- Stale Learner Handling --
223    /// The next scheduled time to check for stale learners.
224    ///
225    /// This is used to implement periodic checking of learners that have been in the promotion
226    /// queue for too long without making progress. The check interval is configurable via the
227    /// node configuration (`node_config.promotion.stale_check_interval`).
228    ///
229    /// When the current time reaches or exceeds this instant, the leader will perform a scan
230    /// of a subset of the pending promotions to detect stale learners. After the scan, the field
231    /// is updated to `current_time + stale_check_interval`.
232    ///
233    /// Rationale: Avoiding frequent full scans improves batching efficiency and reduces CPU spikes
234    /// in high-load environments (particularly crucial for RocketMQ-on-DLedger workflows).
235    pub next_membership_maintenance_check: Instant,
236
237    /// Queue of learners that have caught up and are pending promotion to voter.
238    pub pending_promotions: VecDeque<PendingPromotion>,
239
240    /// Lease timestamp for LeaseRead policy
241    /// Tracks when leadership was last confirmed with quorum
242    pub(super) lease_timestamp: AtomicU64,
243
244    // -- Cluster Topology Cache --
245    /// Cached cluster metadata (updated on membership changes)
246    /// Avoids repeated async calls in hot paths
247    pub(super) cluster_metadata: ClusterMetadata,
248
249    // -- Type System Marker --
250    /// Phantom data for type parameter anchoring
251    _marker: PhantomData<T>,
252}
253
254#[async_trait]
255impl<T: TypeConfig> RaftRoleState for LeaderState<T> {
256    type T = T;
257
258    fn shared_state(&self) -> &SharedState {
259        &self.shared_state
260    }
261
262    fn shared_state_mut(&mut self) -> &mut SharedState {
263        &mut self.shared_state
264    }
265
266    ///Overwrite default behavior.
267    /// As leader, I should not receive commit index,
268    ///     which is lower than my current one
269    #[tracing::instrument]
270    fn update_commit_index(
271        &mut self,
272        new_commit_index: u64,
273    ) -> Result<()> {
274        if self.commit_index() < new_commit_index {
275            debug!("update_commit_index to: {:?}", new_commit_index);
276            self.shared_state.commit_index = new_commit_index;
277        } else {
278            warn!(
279                "Illegal operation, might be a bug! I am Leader old_commit_index({}) >= new_commit_index:({})",
280                self.commit_index(),
281                new_commit_index
282            )
283        }
284        Ok(())
285    }
286
287    /// As Leader should not vote any more
288    fn voted_for(&self) -> Result<Option<VotedFor>> {
289        self.shared_state().voted_for()
290    }
291
292    /// As Leader might also be able to vote ,
293    ///     if new legal Leader found
294    fn update_voted_for(
295        &mut self,
296        voted_for: VotedFor,
297    ) -> Result<bool> {
298        self.shared_state_mut().update_voted_for(voted_for)
299    }
300
301    fn next_index(
302        &self,
303        node_id: u32,
304    ) -> Option<u64> {
305        Some(if let Some(n) = self.next_index.get(&node_id) {
306            *n
307        } else {
308            1
309        })
310    }
311
312    fn update_next_index(
313        &mut self,
314        node_id: u32,
315        new_next_id: u64,
316    ) -> Result<()> {
317        debug!("update_next_index({}) to {}", node_id, new_next_id);
318        self.next_index.insert(node_id, new_next_id);
319        Ok(())
320    }
321
322    fn update_match_index(
323        &mut self,
324        node_id: u32,
325        new_match_id: u64,
326    ) -> Result<()> {
327        self.match_index.insert(node_id, new_match_id);
328        Ok(())
329    }
330
331    fn match_index(
332        &self,
333        node_id: u32,
334    ) -> Option<u64> {
335        self.match_index.get(&node_id).copied()
336    }
337
338    fn init_peers_next_index_and_match_index(
339        &mut self,
340        last_entry_id: u64,
341        peer_ids: Vec<u32>,
342    ) -> Result<()> {
343        for peer_id in peer_ids {
344            debug!("init leader state for peer_id: {:?}", peer_id);
345            let new_next_id = last_entry_id + 1;
346            self.update_next_index(peer_id, new_next_id)?;
347            self.update_match_index(peer_id, 0)?;
348        }
349        Ok(())
350    }
351    fn noop_log_id(&self) -> Result<Option<u64>> {
352        Ok(self.noop_log_id)
353    }
354
355    /// Verifies leadership status using persistent retry until timeout.
356    ///
357    /// This function is designed for critical operations like configuration changes
358    /// that must eventually succeed. It implements:
359    ///   - Infinite retries with exponential backoff
360    ///   - Jitter randomization to prevent synchronization
361    ///   - Termination only on success, leadership loss, or global timeout
362    ///
363    /// # Parameters
364    /// - `payloads`: Log entries to verify
365    /// - `bypass_queue`: Whether to skip request queues for direct transmission
366    /// - `ctx`: Raft execution context
367    /// - `role_tx`: Channel for role transition events
368    ///
369    /// # Returns
370    /// - `Ok(true)`: Quorum verification succeeded
371    /// - `Ok(false)`: Leadership definitively lost during verification
372    /// - `Err(_)`: Global timeout exceeded or critical failure occurred
373    async fn verify_leadership_persistent(
374        &mut self,
375        payloads: Vec<EntryPayload>,
376        bypass_queue: bool,
377        ctx: &RaftContext<T>,
378        role_tx: &mpsc::UnboundedSender<RoleEvent>,
379    ) -> Result<bool> {
380        let initial_delay =
381            Duration::from_millis(ctx.node_config.retry.internal_quorum.base_delay_ms);
382        let max_delay = Duration::from_millis(ctx.node_config.retry.internal_quorum.max_delay_ms);
383        let global_timeout = ctx.node_config.raft.membership.verify_leadership_persistent_timeout;
384
385        let mut current_delay = initial_delay;
386        let start_time = Instant::now();
387
388        loop {
389            match self.verify_internal_quorum(payloads.clone(), bypass_queue, ctx, role_tx).await {
390                Ok(QuorumVerificationResult::Success) => return Ok(true),
391                Ok(QuorumVerificationResult::LeadershipLost) => return Ok(false),
392                Ok(QuorumVerificationResult::RetryRequired) => {
393                    // Check global timeout before retrying
394                    if start_time.elapsed() > global_timeout {
395                        return Err(NetworkError::GlobalTimeout(
396                            "Leadership verification timed out".to_string(),
397                        )
398                        .into());
399                    }
400
401                    current_delay =
402                        current_delay.checked_mul(2).unwrap_or(max_delay).min(max_delay);
403                    let jitter = Duration::from_millis(rand::random::<u64>() % 500);
404                    sleep(current_delay + jitter).await;
405                }
406                Err(e) => return Err(e),
407            }
408        }
409    }
410
411    /// Immidiatelly verifies leadership status using a limited retry strategy.
412    ///
413    /// - Bypasses all queues with direct RPC transmission
414    /// - Enforces synchronous quorum validation
415    /// - Guarantees real-time network visibility
416    ///
417    /// This function is designed for latency-sensitive operations like linear reads,
418    /// where rapid failure is preferred over prolonged retries. It implements:
419    ///   - Exponential backoff with jitter
420    ///   - Fixed maximum retry attempts
421    ///   - Immediate failure on leadership loss
422    ///
423    /// # Parameters
424    /// - `payloads`: Log entries to verify (typically empty for leadership checks)
425    /// - `bypass_queue`: Whether to skip request queues for direct transmission
426    /// - `ctx`: Raft execution context
427    /// - `role_tx`: Channel for role transition events
428    ///
429    /// # Returns
430    /// - `Ok(true)`: Quorum verification succeeded within retry limits
431    /// - `Ok(false)`: Leadership definitively lost during verification
432    /// - `Err(_)`: Maximum retries exceeded or critical failure occurred
433    async fn verify_leadership_limited_retry(
434        &mut self,
435        payloads: Vec<EntryPayload>,
436        bypass_queue: bool,
437        ctx: &RaftContext<T>,
438        role_tx: &mpsc::UnboundedSender<RoleEvent>,
439    ) -> Result<bool> {
440        let retry_policy = ctx.node_config.retry.internal_quorum;
441        let max_retries = retry_policy.max_retries;
442        let initial_delay =
443            Duration::from_millis(ctx.node_config.retry.internal_quorum.base_delay_ms);
444        let max_delay = Duration::from_millis(ctx.node_config.retry.internal_quorum.max_delay_ms);
445
446        let mut current_delay = initial_delay;
447        let mut attempts = 0;
448
449        loop {
450            match self.verify_internal_quorum(payloads.clone(), bypass_queue, ctx, role_tx).await {
451                Ok(QuorumVerificationResult::Success) => return Ok(true),
452                Ok(QuorumVerificationResult::LeadershipLost) => return Ok(false),
453                Ok(QuorumVerificationResult::RetryRequired) => {
454                    debug!(%attempts, "verify_internal_quorum");
455                    if attempts >= max_retries {
456                        return Err(NetworkError::TaskBackoffFailed(
457                            "Max retries exceeded".to_string(),
458                        )
459                        .into());
460                    }
461
462                    current_delay =
463                        current_delay.checked_mul(2).unwrap_or(max_delay).min(max_delay);
464                    let jitter = Duration::from_millis(rand::random::<u64>() % 500);
465                    sleep(current_delay + jitter).await;
466
467                    attempts += 1;
468                }
469                Err(e) => return Err(e),
470            }
471        }
472    }
473
474    /// Note: This method acts as the centerialized internal Raft client
475    ///
476    /// Check leadership quorum verification Immidiatelly
477    ///
478    /// - Bypasses all queues with direct RPC transmission
479    /// - Enforces synchronous quorum validation
480    /// - Guarantees real-time network visibility
481    ///
482    /// Scenario handling summary:
483    ///
484    /// 1. Quorum achieved:
485    ///    - Return: `Ok(QuorumVerificationResult::Success)`
486    ///
487    /// 2. Quorum NOT achieved (verifiable):
488    ///    - Return: `Ok(QuorumVerificationResult::RetryRequired)`
489    ///
490    /// 3. Quorum NOT achieved (non-verifiable):
491    ///    - Return: `Ok(QuorumVerificationResult::LeadershipLost)`
492    ///
493    /// 4. Partial timeouts:
494    ///    - Return: `Ok(QuorumVerificationResult::RetryRequired)`
495    ///
496    /// 5. All timeouts:
497    ///    - Return: `Ok(QuorumVerificationResult::RetryRequired)`
498    ///
499    /// 6. Higher term detected:
500    ///    - Return: `Err(HigherTerm)`
501    ///
502    /// 7. Critical failure (e.g., system or logic error):
503    ///    - Return: original error
504    /// Override: Initialize cluster metadata after becoming leader.
505    /// Must be called once after leader election succeeds.
506    async fn init_cluster_metadata(
507        &mut self,
508        membership: &Arc<T::M>,
509    ) -> Result<()> {
510        // Calculate total voter count including self as leader
511        let voters = membership.voters().await;
512        let total_voters = voters.len() + 1; // +1 for leader (self)
513
514        // Get all replication targets (voters + learners, excluding self)
515        let replication_targets = membership.replication_peers().await;
516
517        // Single-voter cluster: only this node is a voter (quorum = 1)
518        let single_voter = total_voters == 1;
519
520        self.cluster_metadata = ClusterMetadata {
521            single_voter,
522            total_voters,
523            replication_targets: replication_targets.clone(),
524        };
525
526        debug!(
527            "Initialized cluster metadata: single_voter={}, total_voters={}, replication_targets={}",
528            single_voter,
529            total_voters,
530            replication_targets.len()
531        );
532        Ok(())
533    }
534
535    async fn verify_internal_quorum(
536        &mut self,
537        payloads: Vec<EntryPayload>,
538        bypass_queue: bool,
539        ctx: &RaftContext<T>,
540        role_tx: &mpsc::UnboundedSender<RoleEvent>,
541    ) -> Result<QuorumVerificationResult> {
542        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
543
544        self.process_raft_request(
545            RaftRequestWithSignal {
546                id: nanoid!(),
547                payloads,
548                sender: resp_tx,
549            },
550            ctx,
551            bypass_queue,
552            role_tx,
553        )
554        .await?;
555
556        // Wait for response with timeout
557        let timeout_duration =
558            Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
559        match timeout(timeout_duration, resp_rx).await {
560            // Case 1: Response received successfully and verification passed
561            Ok(Ok(Ok(response))) => {
562                debug!("Leadership check response: {:?}", response);
563
564                // Handle different response cases
565                Ok(if response.is_write_success() {
566                    QuorumVerificationResult::Success
567                } else if response.is_retry_required() {
568                    // Verifiable quorum failure
569                    QuorumVerificationResult::RetryRequired
570                } else {
571                    // Non-verifiable failure or explicit rejection
572                    QuorumVerificationResult::LeadershipLost
573                })
574            }
575
576            // Case 2: Received explicit rejection status
577            Ok(Ok(Err(status))) => {
578                warn!("Leadership rejected by follower: {status:?}");
579                Ok(QuorumVerificationResult::LeadershipLost)
580            }
581
582            // Case 3: Channel communication failure (unrecoverable error)
583            Ok(Err(e)) => {
584                error!("Channel error during leadership check: {:?}", e);
585                Err(NetworkError::SingalReceiveFailed(e.to_string()).into())
586            }
587
588            // Case 4: Waiting for response timeout
589            Err(_) => {
590                warn!("Leadership check timed out after {:?}", timeout_duration);
591                Err(NetworkError::Timeout {
592                    node_id: self.node_id(),
593                    duration: timeout_duration,
594                }
595                .into())
596            }
597        }
598    }
599
600    fn is_leader(&self) -> bool {
601        true
602    }
603
604    fn become_leader(&self) -> Result<RaftRole<T>> {
605        warn!("I am leader already");
606
607        Err(StateTransitionError::InvalidTransition.into())
608    }
609
610    fn become_candidate(&self) -> Result<RaftRole<T>> {
611        error!("Leader can not become Candidate");
612
613        Err(StateTransitionError::InvalidTransition.into())
614    }
615
616    fn become_follower(&self) -> Result<RaftRole<T>> {
617        info!(
618            "Node {} term {} transitioning to Follower",
619            self.node_id(),
620            self.current_term(),
621        );
622        println!(
623            "[Node {}] Leader โ†’ Follower (term {})",
624            self.node_id(),
625            self.current_term()
626        );
627        Ok(RaftRole::Follower(Box::new(self.into())))
628    }
629
630    fn become_learner(&self) -> Result<RaftRole<T>> {
631        error!("Leader can not become Learner");
632
633        Err(StateTransitionError::InvalidTransition.into())
634    }
635
636    fn is_timer_expired(&self) -> bool {
637        self.timer.is_expired()
638    }
639
640    /// Raft starts, we will check if we need reset all timer
641    fn reset_timer(&mut self) {
642        self.timer.reset_batch();
643        self.timer.reset_replication();
644    }
645
646    fn next_deadline(&self) -> Instant {
647        self.timer.next_deadline()
648    }
649
650    /// Trigger heartbeat now
651    async fn tick(
652        &mut self,
653        role_tx: &mpsc::UnboundedSender<RoleEvent>,
654        _raft_tx: &mpsc::Sender<RaftEvent>,
655        ctx: &RaftContext<T>,
656    ) -> Result<()> {
657        let now = Instant::now();
658        // Keep syncing leader_id (hot-path: ~5ns atomic store)
659        self.shared_state().set_current_leader(self.node_id());
660
661        // 1. Clear expired learners
662        if let Err(e) = self.run_periodic_maintenance(role_tx, ctx).await {
663            error!("Failed to run periodic maintenance: {}", e);
664        }
665
666        // 3. Batch trigger check (should be prioritized before heartbeat check)
667        if now >= self.timer.batch_deadline() {
668            self.timer.reset_batch();
669
670            if self.batch_buffer.should_flush() {
671                debug!(?now, "tick::reset_batch batch timer");
672                self.timer.reset_replication();
673
674                // Take out the batched messages and send them immediately
675                // Do not move batch out of this block
676                let batch = self.batch_buffer.take();
677                self.process_batch(batch, role_tx, ctx).await?;
678            }
679        }
680
681        // 4. Heartbeat trigger check
682        // Send heartbeat if the replication timer expires
683        if now >= self.timer.replication_deadline() {
684            debug!(?now, "tick::reset_replication timer");
685            self.timer.reset_replication();
686
687            // Do not move batch out of this block
688            let batch = self.batch_buffer.take();
689            self.process_batch(batch, role_tx, ctx).await?;
690        }
691
692        Ok(())
693    }
694
695    async fn handle_raft_event(
696        &mut self,
697        raft_event: RaftEvent,
698        ctx: &RaftContext<T>,
699        role_tx: mpsc::UnboundedSender<RoleEvent>,
700    ) -> Result<()> {
701        let state_machine = ctx.state_machine();
702        let last_applied_index = state_machine.last_applied().index;
703        let my_id = self.shared_state.node_id;
704        let my_term = self.current_term();
705
706        match raft_event {
707            // Leader receives RequestVote(term=X, candidate=Y)
708            // 1. If X > currentTerm:
709            // - Leader โ†’ Follower, currentTerm = X
710            // - Replay event
711            // 2. Else:
712            // - Reply with VoteGranted=false, currentTerm=currentTerm
713            RaftEvent::ReceiveVoteRequest(vote_request, sender) => {
714                debug!(
715                    "handle_raft_event::RaftEvent::ReceiveVoteRequest: {:?}",
716                    &vote_request
717                );
718
719                let my_term = self.current_term();
720                if my_term < vote_request.term {
721                    self.update_current_term(vote_request.term);
722                    // Step down as Follower
723                    self.send_become_follower_event(None, &role_tx)?;
724
725                    info!("Leader will not process Vote request, it should let Follower do it.");
726                    send_replay_raft_event(
727                        &role_tx,
728                        RaftEvent::ReceiveVoteRequest(vote_request, sender),
729                    )?;
730                } else {
731                    let last_log_id =
732                        ctx.raft_log().last_log_id().unwrap_or(LogId { index: 0, term: 0 });
733                    let response = VoteResponse {
734                        term: my_term,
735                        vote_granted: false,
736                        last_log_index: last_log_id.index,
737                        last_log_term: last_log_id.term,
738                    };
739                    sender.send(Ok(response)).map_err(|e| {
740                        let error_str = format!("{e:?}");
741                        error!("Failed to send: {}", error_str);
742                        NetworkError::SingalSendFailed(error_str)
743                    })?;
744                }
745            }
746
747            RaftEvent::ClusterConf(_metadata_request, sender) => {
748                let cluster_conf = ctx
749                    .membership()
750                    .retrieve_cluster_membership_config(self.shared_state().current_leader())
751                    .await;
752                debug!("Leader receive ClusterConf: {:?}", &cluster_conf);
753
754                sender.send(Ok(cluster_conf)).map_err(|e| {
755                    let error_str = format!("{e:?}");
756                    error!("Failed to send: {}", error_str);
757                    NetworkError::SingalSendFailed(error_str)
758                })?;
759            }
760
761            RaftEvent::ClusterConfUpdate(cluste_conf_change_request, sender) => {
762                let current_conf_version = ctx.membership().get_cluster_conf_version().await;
763                debug!(%current_conf_version, ?cluste_conf_change_request,
764                    "handle_raft_event::RaftEvent::ClusterConfUpdate",
765                );
766
767                // Reject the fake Leader append entries request
768                if my_term >= cluste_conf_change_request.term {
769                    let response = ClusterConfUpdateResponse::higher_term(
770                        my_id,
771                        my_term,
772                        current_conf_version,
773                    );
774
775                    sender.send(Ok(response)).map_err(|e| {
776                        let error_str = format!("{e:?}");
777                        error!("Failed to send: {}", error_str);
778                        NetworkError::SingalSendFailed(error_str)
779                    })?;
780                } else {
781                    // Step down as Follower as new Leader found
782                    info!(
783                        "my({}) term < request one, now I will step down to Follower",
784                        my_id
785                    );
786                    //TODO: if there is a bug?  self.update_current_term(vote_request.term);
787                    self.send_become_follower_event(Some(cluste_conf_change_request.id), &role_tx)?;
788
789                    info!(
790                        "Leader will not process append_entries_request, it should let Follower do it."
791                    );
792                    send_replay_raft_event(
793                        &role_tx,
794                        RaftEvent::ClusterConfUpdate(cluste_conf_change_request, sender),
795                    )?;
796                }
797            }
798
799            RaftEvent::AppendEntries(append_entries_request, sender) => {
800                debug!(
801                    "handle_raft_event::RaftEvent::AppendEntries: {:?}",
802                    &append_entries_request
803                );
804
805                // Reject the fake Leader append entries request
806                if my_term >= append_entries_request.term {
807                    let response = AppendEntriesResponse::higher_term(my_id, my_term);
808
809                    sender.send(Ok(response)).map_err(|e| {
810                        let error_str = format!("{e:?}");
811                        error!("Failed to send: {}", error_str);
812                        NetworkError::SingalSendFailed(error_str)
813                    })?;
814                } else {
815                    // Step down as Follower as new Leader found
816                    info!(
817                        "my({}) term < request one, now I will step down to Follower",
818                        my_id
819                    );
820                    //TODO: if there is a bug?  self.update_current_term(vote_request.term);
821                    self.send_become_follower_event(
822                        Some(append_entries_request.leader_id),
823                        &role_tx,
824                    )?;
825
826                    info!(
827                        "Leader will not process append_entries_request, it should let Follower do it."
828                    );
829                    send_replay_raft_event(
830                        &role_tx,
831                        RaftEvent::AppendEntries(append_entries_request, sender),
832                    )?;
833                }
834            }
835
836            RaftEvent::ClientPropose(client_write_request, sender) => {
837                if let Err(e) = self
838                    .process_raft_request(
839                        RaftRequestWithSignal {
840                            id: nanoid!(),
841                            payloads: client_command_to_entry_payloads(
842                                client_write_request.commands,
843                            ),
844                            sender,
845                        },
846                        ctx,
847                        false,
848                        &role_tx,
849                    )
850                    .await
851                {
852                    error("Leader::process_raft_request", &e);
853                    return Err(e);
854                }
855            }
856
857            RaftEvent::ClientReadRequest(client_read_request, sender) => {
858                let _timer = ScopedTimer::new("leader_linear_read");
859                debug!(
860                    "Leader::ClientReadRequest client_read_request:{:?}",
861                    &client_read_request
862                );
863
864                let keys = client_read_request.keys.clone();
865                let response: std::result::Result<ClientResponse, tonic::Status> = {
866                    let read_operation =
867                        || -> std::result::Result<ClientResponse, tonic::Status> {
868                            let results = ctx
869                                .handlers
870                                .state_machine_handler
871                                .read_from_state_machine(keys)
872                                .unwrap_or_default();
873                            debug!("handle_client_read results: {:?}", results);
874                            Ok(ClientResponse::read_results(results))
875                        };
876
877                    // Determine effective consistency policy using server configuration
878                    let effective_policy = if client_read_request.has_consistency_policy() {
879                        // Client explicitly specified policy
880                        if ctx.node_config().raft.read_consistency.allow_client_override {
881                            match client_read_request.consistency_policy() {
882                                ClientReadConsistencyPolicy::LeaseRead => {
883                                    ServerReadConsistencyPolicy::LeaseRead
884                                }
885                                ClientReadConsistencyPolicy::LinearizableRead => {
886                                    ServerReadConsistencyPolicy::LinearizableRead
887                                }
888                                ClientReadConsistencyPolicy::EventualConsistency => {
889                                    ServerReadConsistencyPolicy::EventualConsistency
890                                }
891                            }
892                        } else {
893                            // Client override not allowed - use server default
894                            ctx.node_config().raft.read_consistency.default_policy.clone()
895                        }
896                    } else {
897                        // No client policy specified - use server default
898                        ctx.node_config().raft.read_consistency.default_policy.clone()
899                    };
900
901                    // Apply consistency policy
902                    match effective_policy {
903                        ServerReadConsistencyPolicy::LinearizableRead => {
904                            if !self
905                                .verify_leadership_limited_retry(vec![], true, ctx, &role_tx)
906                                .await
907                                .unwrap_or(false)
908                            {
909                                warn!("enforce_quorum_consensus failed for linear read request");
910
911                                Err(tonic::Status::failed_precondition(
912                                    "enforce_quorum_consensus failed".to_string(),
913                                ))
914                            } else if let Err(e) = self
915                                .ensure_state_machine_upto_commit_index(
916                                    &ctx.handlers.state_machine_handler,
917                                    last_applied_index,
918                                )
919                                .await
920                            {
921                                warn!(
922                                    "ensure_state_machine_upto_commit_index failed for linear read request"
923                                );
924                                Err(tonic::Status::failed_precondition(format!(
925                                    "ensure_state_machine_upto_commit_index failed: {e:?}"
926                                )))
927                            } else {
928                                read_operation()
929                            }
930                        }
931                        ServerReadConsistencyPolicy::LeaseRead => {
932                            // New lease-based implementation
933                            if self.is_lease_valid(ctx) {
934                                // Lease is valid, serve read locally
935                                read_operation()
936                            } else {
937                                // Lease expired, need to refresh with quorum
938                                if !self
939                                    .verify_leadership_limited_retry(vec![], true, ctx, &role_tx)
940                                    .await
941                                    .unwrap_or(false)
942                                {
943                                    warn!("LeaseRead policy: lease renewal failed");
944                                    Err(tonic::Status::failed_precondition(
945                                        "LeaseRead policy: lease renewal failed".to_string(),
946                                    ))
947                                } else {
948                                    // Update lease timestamp after successful verification
949                                    self.update_lease_timestamp();
950                                    read_operation()
951                                }
952                            }
953                        }
954                        ServerReadConsistencyPolicy::EventualConsistency => {
955                            // Eventual consistency: serve immediately without verification
956                            // Leader can always serve eventually consistent reads
957                            debug!("EventualConsistency: serving local read without verification");
958                            read_operation()
959                        }
960                    }
961                };
962
963                debug!(
964                    "Leader::ClientReadRequest is going to response: {:?}",
965                    &response
966                );
967                sender.send(response).map_err(|e| {
968                    let error_str = format!("{e:?}");
969                    error!("Failed to send: {}", error_str);
970                    NetworkError::SingalSendFailed(error_str)
971                })?;
972            }
973
974            RaftEvent::InstallSnapshotChunk(_streaming, sender) => {
975                sender
976                    .send(Err(Status::permission_denied("Not Follower or Learner. ")))
977                    .map_err(|e| {
978                        let error_str = format!("{e:?}");
979                        error!("Failed to send: {}", error_str);
980                        NetworkError::SingalSendFailed(error_str)
981                    })?;
982
983                return Err(ConsensusError::RoleViolation {
984                    current_role: "Leader",
985                    required_role: "Follower or Learner",
986                    context: format!(
987                        "Leader node {} receives RaftEvent::InstallSnapshotChunk",
988                        ctx.node_id
989                    ),
990                }
991                .into());
992            }
993
994            RaftEvent::RaftLogCleanUp(_purchase_log_request, sender) => {
995                sender
996                    .send(Err(Status::permission_denied(
997                        "Leader should not receive RaftLogCleanUp event.",
998                    )))
999                    .map_err(|e| {
1000                        let error_str = format!("{e:?}");
1001                        error!("Failed to send: {}", error_str);
1002                        NetworkError::SingalSendFailed(error_str)
1003                    })?;
1004
1005                return Err(ConsensusError::RoleViolation {
1006                    current_role: "Leader",
1007                    required_role: "None Leader",
1008                    context: format!(
1009                        "Leader node {} receives RaftEvent::RaftLogCleanUp",
1010                        ctx.node_id
1011                    ),
1012                }
1013                .into());
1014            }
1015
1016            RaftEvent::CreateSnapshotEvent => {
1017                // Prevent duplicate snapshot creation
1018                if self.snapshot_in_progress.load(std::sync::atomic::Ordering::Acquire) {
1019                    info!("Snapshot creation already in progress. Skipping duplicate request.");
1020                    return Ok(());
1021                }
1022
1023                self.snapshot_in_progress.store(true, std::sync::atomic::Ordering::Release);
1024                let state_machine_handler = ctx.state_machine_handler().clone();
1025
1026                // Use spawn to perform snapshot creation in the background
1027                tokio::spawn(async move {
1028                    let result = state_machine_handler.create_snapshot().await;
1029                    info!("SnapshotCreated event will be processed in another event thread");
1030                    if let Err(e) =
1031                        send_replay_raft_event(&role_tx, RaftEvent::SnapshotCreated(result))
1032                    {
1033                        error!("Failed to send snapshot creation result: {}", e);
1034                    }
1035                });
1036            }
1037
1038            RaftEvent::SnapshotCreated(result) => {
1039                self.snapshot_in_progress.store(false, Ordering::SeqCst);
1040                let my_id = self.shared_state.node_id;
1041                let my_term = self.current_term();
1042
1043                match result {
1044                    Err(e) => {
1045                        error!(%e, "State machine snapshot creation failed");
1046                    }
1047                    Ok((
1048                        SnapshotMetadata {
1049                            last_included: last_included_option,
1050                            checksum,
1051                        },
1052                        _final_path,
1053                    )) => {
1054                        info!("Initiating log purge after snapshot creation");
1055
1056                        if let Some(last_included) = last_included_option {
1057                            // ----------------------
1058                            // Phase 1: Schedule log purge if possible
1059                            // ----------------------
1060                            trace!("Phase 1: Schedule log purge if possible");
1061                            if self.can_purge_logs(self.last_purged_index, last_included) {
1062                                trace!(?last_included, "Phase 1: Scheduling log purge");
1063                                self.scheduled_purge_upto(last_included);
1064                            }
1065
1066                            // ----------------------
1067                            // Phase 2.1: Pre-Checks before sending Purge request
1068                            // ----------------------
1069                            trace!("Phase 2.1: Pre-Checks before sending Purge request");
1070                            let membership = ctx.membership();
1071                            let members = membership.voters().await;
1072                            if members.is_empty() {
1073                                warn!("no peer found for leader({})", my_id);
1074                                return Err(MembershipError::NoPeersAvailable.into());
1075                            }
1076
1077                            // ----------------------
1078                            // Phase 2.2: Send Purge request to the other nodes
1079                            // ----------------------
1080                            trace!("Phase 2.2: Send Purge request to the other nodes");
1081                            let transport = ctx.transport();
1082                            match transport
1083                                .send_purge_requests(
1084                                    PurgeLogRequest {
1085                                        term: my_term,
1086                                        leader_id: my_id,
1087                                        last_included: Some(last_included),
1088                                        snapshot_checksum: checksum.clone(),
1089                                        leader_commit: self.commit_index(),
1090                                    },
1091                                    &self.node_config.retry,
1092                                    membership,
1093                                )
1094                                .await
1095                            {
1096                                Ok(result) => {
1097                                    info!(?result, "receive PurgeLogResult");
1098
1099                                    self.peer_purge_progress(result, &role_tx)?;
1100                                }
1101                                Err(e) => {
1102                                    error!(?e, "RaftEvent::CreateSnapshotEvent");
1103                                    return Err(e);
1104                                }
1105                            }
1106
1107                            // ----------------------
1108                            // Phase 3: Execute scheduled purge task
1109                            // ----------------------
1110                            trace!("Phase 3: Execute scheduled purge task");
1111                            debug!(?last_included, "Execute scheduled purge task");
1112                            if let Some(scheduled) = self.scheduled_purge_upto {
1113                                let purge_executor = ctx.purge_executor();
1114                                // //TODO: bug
1115                                // self.last_purged_index = Some(scheduled);
1116                                match purge_executor.execute_purge(scheduled).await {
1117                                    Ok(_) => {
1118                                        if let Err(e) = send_replay_raft_event(
1119                                            &role_tx,
1120                                            RaftEvent::LogPurgeCompleted(scheduled),
1121                                        ) {
1122                                            error!(%e, "Failed to notify purge completion");
1123                                        }
1124                                    }
1125                                    Err(e) => {
1126                                        error!(?e, ?scheduled, "Log purge execution failed");
1127                                    }
1128                                }
1129                            }
1130                        }
1131                    }
1132                }
1133            }
1134
1135            RaftEvent::LogPurgeCompleted(purged_id) => {
1136                // Ensure we don't regress the purge index
1137                if self.last_purged_index.map_or(true, |current| purged_id.index > current.index) {
1138                    debug!(
1139                        ?purged_id,
1140                        "Updating last purged index after successful execution"
1141                    );
1142                    self.last_purged_index = Some(purged_id);
1143                } else {
1144                    warn!(
1145                        ?purged_id,
1146                        ?self.last_purged_index,
1147                        "Received outdated purge completion, ignoring"
1148                    );
1149                }
1150            }
1151
1152            RaftEvent::JoinCluster(join_request, sender) => {
1153                debug!(?join_request, "Leader::RaftEvent::JoinCluster");
1154                self.handle_join_cluster(join_request, sender, ctx, &role_tx).await?;
1155            }
1156
1157            RaftEvent::DiscoverLeader(request, sender) => {
1158                debug!(?request, "Leader::RaftEvent::DiscoverLeader");
1159
1160                if let Some(meta) = ctx.membership().retrieve_node_meta(my_id).await {
1161                    let response = LeaderDiscoveryResponse {
1162                        leader_id: my_id,
1163                        leader_address: meta.address,
1164                        term: my_term,
1165                    };
1166                    sender.send(Ok(response)).map_err(|e| {
1167                        let error_str = format!("{e:?}");
1168                        error!("Failed to send: {}", error_str);
1169                        NetworkError::SingalSendFailed(error_str)
1170                    })?;
1171                    return Ok(());
1172                } else {
1173                    let msg = "Leader can not find its address? It must be a bug.";
1174                    error!("{}", msg);
1175                    panic!("{}", msg);
1176                }
1177            }
1178            RaftEvent::StreamSnapshot(request, sender) => {
1179                debug!("Leader::RaftEvent::StreamSnapshot");
1180
1181                // Get the latest snapshot metadata
1182                if let Some(metadata) = ctx.state_machine().snapshot_metadata() {
1183                    // Create response channel
1184                    let (response_tx, response_rx) =
1185                        mpsc::channel::<std::result::Result<Arc<SnapshotChunk>, Status>>(32);
1186                    // Convert to properly encoded tonic stream
1187                    let size = 1024 * 1024 * 1024; // 1GB max message size
1188                    let response_stream = create_production_snapshot_stream(response_rx, size);
1189                    // Immediately respond with the stream
1190                    sender.send(Ok(response_stream)).map_err(|e| {
1191                        let error_str = format!("{e:?}");
1192                        error!("Stream response failed: {}", error_str);
1193                        NetworkError::SingalSendFailed(error_str)
1194                    })?;
1195
1196                    // Spawn background transfer task
1197                    let state_machine_handler = ctx.state_machine_handler().clone();
1198                    let config = ctx.node_config.raft.snapshot.clone();
1199                    // Load snapshot data stream
1200                    let data_stream =
1201                        state_machine_handler.load_snapshot_data(metadata.clone()).await?;
1202
1203                    tokio::spawn(async move {
1204                        if let Err(e) = BackgroundSnapshotTransfer::<T>::run_pull_transfer(
1205                            request,
1206                            response_tx,
1207                            data_stream,
1208                            config,
1209                        )
1210                        .await
1211                        {
1212                            error!("StreamSnapshot failed: {:?}", e);
1213                        }
1214                    });
1215                } else {
1216                    warn!("No snapshot available for streaming");
1217                    sender.send(Err(Status::not_found("Snapshot not found"))).map_err(|e| {
1218                        let error_str = format!("{e:?}");
1219                        error!("Stream response failed: {}", error_str);
1220                        NetworkError::SingalSendFailed(error_str)
1221                    })?;
1222                }
1223            }
1224            RaftEvent::TriggerSnapshotPush { peer_id } => {
1225                if let Some(lastest_snapshot_metadata) = ctx.state_machine().snapshot_metadata() {
1226                    Self::trigger_background_snapshot(
1227                        peer_id,
1228                        lastest_snapshot_metadata,
1229                        ctx.state_machine_handler().clone(),
1230                        ctx.membership(),
1231                        ctx.node_config.raft.snapshot.clone(),
1232                    )
1233                    .await?;
1234                }
1235            }
1236
1237            RaftEvent::PromoteReadyLearners => {
1238                // SAFETY: Called from main event loop, no reentrancy issues
1239                info!(
1240                    "[Leader {}] โšก PromoteReadyLearners event received, pending_promotions: {:?}",
1241                    self.node_id(),
1242                    self.pending_promotions.iter().map(|p| p.node_id).collect::<Vec<_>>()
1243                );
1244                self.process_pending_promotions(ctx, &role_tx).await?;
1245            }
1246
1247            RaftEvent::MembershipApplied => {
1248                // Save old membership for comparison
1249                let old_replication_targets = self.cluster_metadata.replication_targets.clone();
1250
1251                // Refresh cluster metadata cache after membership change is applied
1252                debug!("Refreshing cluster metadata cache after membership change");
1253                if let Err(e) = self.update_cluster_metadata(&ctx.membership()).await {
1254                    warn!("Failed to update cluster metadata: {:?}", e);
1255                }
1256
1257                // CRITICAL FIX #218: Initialize replication state for newly added peers
1258                // Per Raft protocol: When new members join, Leader must initialize their next_index
1259                let newly_added: Vec<u32> = self
1260                    .cluster_metadata
1261                    .replication_targets
1262                    .iter()
1263                    .filter(|new_peer| {
1264                        !old_replication_targets.iter().any(|old_peer| old_peer.id == new_peer.id)
1265                    })
1266                    .map(|peer| peer.id)
1267                    .collect();
1268
1269                if !newly_added.is_empty() {
1270                    debug!(
1271                        "Initializing replication state for {} new peer(s): {:?}",
1272                        newly_added.len(),
1273                        newly_added
1274                    );
1275                    let last_entry_id = ctx.raft_log().last_entry_id();
1276                    if let Err(e) =
1277                        self.init_peers_next_index_and_match_index(last_entry_id, newly_added)
1278                    {
1279                        warn!("Failed to initialize next_index for new peers: {:?}", e);
1280                        // Non-fatal: next_index will use default value of 1,
1281                        // replication will still work but may be less efficient
1282                    }
1283                }
1284            }
1285
1286            RaftEvent::StepDownSelfRemoved => {
1287                // Only Leader can propose configuration changes and remove itself
1288                // Per Raft protocol: Leader steps down immediately after self-removal
1289                warn!(
1290                    "[Leader-{}] Removed from cluster membership, stepping down to Follower",
1291                    self.node_id()
1292                );
1293                role_tx.send(RoleEvent::BecomeFollower(None)).map_err(|e| {
1294                    error!(
1295                        "[Leader-{}] Failed to send BecomeFollower after self-removal: {:?}",
1296                        self.node_id(),
1297                        e
1298                    );
1299                    NetworkError::SingalSendFailed(format!(
1300                        "BecomeFollower after self-removal: {e:?}"
1301                    ))
1302                })?;
1303                return Ok(());
1304            }
1305        }
1306
1307        Ok(())
1308    }
1309}
1310
1311impl<T: TypeConfig> LeaderState<T> {
1312    /// Initialize cluster metadata after becoming leader.
1313    /// Update cluster metadata when membership changes.
1314    pub async fn update_cluster_metadata(
1315        &mut self,
1316        membership: &Arc<T::M>,
1317    ) -> Result<()> {
1318        // Calculate total voter count including self as leader
1319        let voters = membership.voters().await;
1320        let total_voters = voters.len() + 1; // +1 for leader (self)
1321
1322        // Get all replication targets (voters + learners, excluding self)
1323        let replication_targets = membership.replication_peers().await;
1324
1325        // Single-voter cluster: only this node is a voter (quorum = 1)
1326        let single_voter = total_voters == 1;
1327
1328        self.cluster_metadata = ClusterMetadata {
1329            single_voter,
1330            total_voters,
1331            replication_targets: replication_targets.clone(),
1332        };
1333
1334        debug!(
1335            "Updated cluster metadata: single_voter={}, total_voters={}, replication_targets={}",
1336            single_voter,
1337            total_voters,
1338            replication_targets.len()
1339        );
1340        Ok(())
1341    }
1342
1343    /// The fun will retrieve current state snapshot
1344    pub fn state_snapshot(&self) -> StateSnapshot {
1345        StateSnapshot {
1346            current_term: self.current_term(),
1347            voted_for: None,
1348            commit_index: self.commit_index(),
1349            role: Leader as i32,
1350        }
1351    }
1352
1353    /// The fun will retrieve current Leader state snapshot
1354    #[tracing::instrument]
1355    pub fn leader_state_snapshot(&self) -> LeaderStateSnapshot {
1356        LeaderStateSnapshot {
1357            next_index: self.next_index.clone(),
1358            match_index: self.match_index.clone(),
1359            noop_log_id: self.noop_log_id,
1360        }
1361    }
1362
1363    /// # Params
1364    /// - `execute_now`: should this propose been executed immediatelly. e.g.
1365    ///   enforce_quorum_consensus expected to be executed immediatelly
1366    pub async fn process_raft_request(
1367        &mut self,
1368        raft_request_with_signal: RaftRequestWithSignal,
1369        ctx: &RaftContext<T>,
1370        execute_now: bool,
1371        role_tx: &mpsc::UnboundedSender<RoleEvent>,
1372    ) -> Result<()> {
1373        debug!(
1374            "Leader::process_raft_request, request_id: {}",
1375            raft_request_with_signal.id
1376        );
1377
1378        let push_result = self.batch_buffer.push(raft_request_with_signal);
1379        // only buffer exceeds the max, the size will return
1380        if execute_now || push_result.is_some() {
1381            let batch = self.batch_buffer.take();
1382
1383            trace!(
1384                "replication_handler.handle_raft_request_in_batch: batch size:{:?}",
1385                batch.len()
1386            );
1387
1388            self.process_batch(batch, role_tx, ctx).await?;
1389        }
1390
1391        Ok(())
1392    }
1393
1394    /// Scenario handling summary:
1395    ///
1396    /// 1. Quorum achieved:
1397    ///    - Client response: `write_success()`
1398    ///    - State update: update peer indexes and commit index
1399    ///    - Return: `Ok(())`
1400    ///
1401    /// 2. Quorum NOT achieved (verifiable):
1402    ///    - Client response: `RetryRequired`
1403    ///    - State update: update peer indexes
1404    ///    - Return: `Ok(())`
1405    ///
1406    /// 3. Quorum NOT achieved (non-verifiable):
1407    ///    - Client response: `ProposeFailed`
1408    ///    - State update: update peer indexes
1409    ///    - Return: `Ok(())`
1410    ///
1411    /// 4. Partial timeouts:
1412    ///    - Client response: `ProposeFailed`
1413    ///    - State update: update only the peer indexes that responded
1414    ///    - Return: `Ok(())`
1415    ///
1416    /// 5. All timeouts:
1417    ///    - Client response: `ProposeFailed`
1418    ///    - State update: no update
1419    ///    - Return: `Ok(())`
1420    ///
1421    /// 6. Higher term detected:
1422    ///    - Client response: `TermOutdated`
1423    ///    - State update: update term and convert to follower
1424    ///    - Return: `Err(HigherTerm)`
1425    ///
1426    /// 7. Critical failure (e.g., system or logic error):
1427    ///    - Client response: `ProposeFailed`
1428    ///    - State update: none
1429    ///    - Return: original error
1430    pub async fn process_batch(
1431        &mut self,
1432        batch: VecDeque<RaftRequestWithSignal>,
1433        role_tx: &mpsc::UnboundedSender<RoleEvent>,
1434        ctx: &RaftContext<T>,
1435    ) -> Result<()> {
1436        // 1. Prepare batch data
1437        let entry_payloads: Vec<EntryPayload> =
1438            batch.iter().flat_map(|req| &req.payloads).cloned().collect();
1439        if !entry_payloads.is_empty() {
1440            trace!(?entry_payloads, "[Node-{} process_batch..", ctx.node_id);
1441        }
1442
1443        // 2. Execute the copy
1444        let cluster_size = self.cluster_metadata.total_voters;
1445        trace!(%cluster_size);
1446
1447        let result = ctx
1448            .replication_handler()
1449            .handle_raft_request_in_batch(
1450                entry_payloads,
1451                self.state_snapshot(),
1452                self.leader_state_snapshot(),
1453                &self.cluster_metadata,
1454                ctx,
1455            )
1456            .await;
1457        debug!(?result, "replication_handler::handle_raft_request_in_batch");
1458
1459        // 3. Unify the processing results
1460        match result {
1461            // Case 1: Successfully reached majority
1462            Ok(AppendResults {
1463                commit_quorum_achieved: true,
1464                peer_updates,
1465                learner_progress,
1466            }) => {
1467                // 1. Update all peer index
1468                self.update_peer_indexes(&peer_updates);
1469
1470                // 2. Check Learner's catch-up status
1471                if let Err(e) = self.check_learner_progress(&learner_progress, ctx, role_tx).await {
1472                    error!(?e, "check_learner_progress failed");
1473                };
1474
1475                // 3. Update commit index
1476                // Single-voter cluster: commit index = last log index (quorum of 1)
1477                // Multi-voter cluster: calculate commit index based on majority quorum
1478                let new_commit_index = if self.cluster_metadata.single_voter {
1479                    let last_log_index = ctx.raft_log().last_entry_id();
1480                    if last_log_index > self.commit_index() {
1481                        Some(last_log_index)
1482                    } else {
1483                        None
1484                    }
1485                } else {
1486                    self.calculate_new_commit_index(ctx.raft_log(), &peer_updates)
1487                };
1488
1489                if let Some(new_commit_index) = new_commit_index {
1490                    debug!(
1491                        "[Leader-{}] New commit been acknowledged: {}",
1492                        self.node_id(),
1493                        new_commit_index
1494                    );
1495                    self.update_commit_index_with_signal(
1496                        Leader as i32,
1497                        self.current_term(),
1498                        new_commit_index,
1499                        role_tx,
1500                    )?;
1501                }
1502
1503                // 4. Notify all clients of success
1504                for request in batch {
1505                    let _ = request.sender.send(Ok(ClientResponse::write_success()));
1506                }
1507            }
1508
1509            // Case 2: Failed to reach majority
1510            Ok(AppendResults {
1511                commit_quorum_achieved: false,
1512                peer_updates,
1513                learner_progress,
1514            }) => {
1515                // 1. Update all peer index
1516                self.update_peer_indexes(&peer_updates);
1517
1518                // 2. Check Learner's catch-up status
1519                if let Err(e) = self.check_learner_progress(&learner_progress, ctx, role_tx).await {
1520                    error!(?e, "check_learner_progress failed");
1521                };
1522
1523                // 3. Determine error code based on verifiability
1524                let responses_received = peer_updates.len();
1525                let error_code = if is_majority(responses_received, cluster_size) {
1526                    ErrorCode::RetryRequired
1527                } else {
1528                    ErrorCode::ProposeFailed
1529                };
1530
1531                // 4. Notify all clients of failure
1532                for request in batch {
1533                    let _ = request.sender.send(Ok(ClientResponse::client_error(error_code)));
1534                }
1535            }
1536
1537            // Case 3: High term found
1538            Err(Error::Consensus(ConsensusError::Replication(ReplicationError::HigherTerm(
1539                higher_term,
1540            )))) => {
1541                warn!("Higher term detected: {}", higher_term);
1542                self.update_current_term(higher_term);
1543                self.send_become_follower_event(None, role_tx)?;
1544
1545                // Notify client of term expiration
1546                for request in batch {
1547                    let _ = request
1548                        .sender
1549                        .send(Ok(ClientResponse::client_error(ErrorCode::TermOutdated)));
1550                }
1551
1552                return Err(ReplicationError::HigherTerm(higher_term).into());
1553            }
1554
1555            // Case 4: Other errors
1556            Err(e) => {
1557                error!("Batch processing failed: {:?}", e);
1558
1559                // Notify all clients of failure
1560                for request in batch {
1561                    let _ = request
1562                        .sender
1563                        .send(Ok(ClientResponse::client_error(ErrorCode::ProposeFailed)));
1564                }
1565
1566                return Err(e);
1567            }
1568        }
1569
1570        Ok(())
1571    }
1572
1573    /// Update peer node index
1574    #[instrument(skip(self))]
1575    fn update_peer_indexes(
1576        &mut self,
1577        peer_updates: &HashMap<u32, PeerUpdate>,
1578    ) {
1579        for (peer_id, update) in peer_updates {
1580            if let Err(e) = self.update_next_index(*peer_id, update.next_index) {
1581                error!("Failed to update next index: {:?}", e);
1582            }
1583            trace!(
1584                "Updated next index for peer {}-{}",
1585                peer_id, update.next_index
1586            );
1587            if let Some(match_index) = update.match_index {
1588                if let Err(e) = self.update_match_index(*peer_id, match_index) {
1589                    error!("Failed to update match index: {:?}", e);
1590                }
1591                trace!("Updated match index for peer {}-{}", peer_id, match_index);
1592            }
1593        }
1594    }
1595
1596    pub async fn check_learner_progress(
1597        &mut self,
1598        learner_progress: &HashMap<u32, Option<u64>>,
1599        ctx: &RaftContext<T>,
1600        role_tx: &mpsc::UnboundedSender<RoleEvent>,
1601    ) -> Result<()> {
1602        debug!(?learner_progress, "check_learner_progress");
1603
1604        if !self.should_check_learner_progress(ctx) {
1605            return Ok(());
1606        }
1607
1608        if learner_progress.is_empty() {
1609            return Ok(());
1610        }
1611
1612        let ready_learners = self.find_promotable_learners(learner_progress, ctx).await;
1613        let new_promotions = self.deduplicate_promotions(ready_learners);
1614
1615        if !new_promotions.is_empty() {
1616            self.enqueue_and_notify_promotions(new_promotions, role_tx)?;
1617        }
1618
1619        Ok(())
1620    }
1621
1622    /// Check if enough time has elapsed since last learner progress check
1623    fn should_check_learner_progress(
1624        &mut self,
1625        ctx: &RaftContext<T>,
1626    ) -> bool {
1627        let throttle_interval =
1628            Duration::from_millis(ctx.node_config().raft.learner_check_throttle_ms);
1629        if self.last_learner_check.elapsed() < throttle_interval {
1630            return false;
1631        }
1632        self.last_learner_check = Instant::now();
1633        true
1634    }
1635
1636    /// Find learners that are caught up and eligible for promotion
1637    async fn find_promotable_learners(
1638        &self,
1639        learner_progress: &HashMap<u32, Option<u64>>,
1640        ctx: &RaftContext<T>,
1641    ) -> Vec<u32> {
1642        let leader_commit = self.commit_index();
1643        let threshold = ctx.node_config().raft.learner_catchup_threshold;
1644        let membership = ctx.membership();
1645
1646        let mut ready_learners = Vec::new();
1647
1648        for (&node_id, &match_index_opt) in learner_progress.iter() {
1649            if !membership.contains_node(node_id).await {
1650                continue;
1651            }
1652
1653            if !self.is_learner_caught_up(match_index_opt, leader_commit, threshold) {
1654                continue;
1655            }
1656
1657            let node_status =
1658                membership.get_node_status(node_id).await.unwrap_or(NodeStatus::ReadOnly);
1659            if !node_status.is_promotable() {
1660                debug!(
1661                    ?node_id,
1662                    ?node_status,
1663                    "Learner caught up but status is not Promotable, skipping"
1664                );
1665                continue;
1666            }
1667
1668            debug!(
1669                ?node_id,
1670                match_index = ?match_index_opt.unwrap_or(0),
1671                ?leader_commit,
1672                gap = leader_commit.saturating_sub(match_index_opt.unwrap_or(0)),
1673                "Learner caught up"
1674            );
1675            ready_learners.push(node_id);
1676        }
1677
1678        ready_learners
1679    }
1680
1681    /// Check if learner has caught up with leader based on log gap
1682    fn is_learner_caught_up(
1683        &self,
1684        match_index: Option<u64>,
1685        leader_commit: u64,
1686        threshold: u64,
1687    ) -> bool {
1688        let match_index = match_index.unwrap_or(0);
1689        let gap = leader_commit.saturating_sub(match_index);
1690        gap <= threshold
1691    }
1692
1693    /// Remove learners already in pending promotions queue
1694    fn deduplicate_promotions(
1695        &self,
1696        ready_learners: Vec<u32>,
1697    ) -> Vec<u32> {
1698        let already_pending: std::collections::HashSet<_> =
1699            self.pending_promotions.iter().map(|p| p.node_id).collect();
1700
1701        ready_learners.into_iter().filter(|id| !already_pending.contains(id)).collect()
1702    }
1703
1704    /// Add promotions to queue and send notification event
1705    fn enqueue_and_notify_promotions(
1706        &mut self,
1707        new_promotions: Vec<u32>,
1708        role_tx: &mpsc::UnboundedSender<RoleEvent>,
1709    ) -> Result<()> {
1710        info!(
1711            ?new_promotions,
1712            "Learners caught up, adding to pending promotions"
1713        );
1714
1715        for node_id in new_promotions {
1716            self.pending_promotions
1717                .push_back(PendingPromotion::new(node_id, Instant::now()));
1718        }
1719
1720        role_tx
1721            .send(RoleEvent::ReprocessEvent(Box::new(
1722                RaftEvent::PromoteReadyLearners,
1723            )))
1724            .map_err(|e| {
1725                let error_str = format!("{e:?}");
1726                error!("Failed to send PromoteReadyLearners: {}", error_str);
1727                Error::System(SystemError::Network(NetworkError::SingalSendFailed(
1728                    error_str,
1729                )))
1730            })?;
1731
1732        Ok(())
1733    }
1734
1735    #[allow(dead_code)]
1736    pub async fn batch_promote_learners(
1737        &mut self,
1738        ready_learners_ids: Vec<u32>,
1739        ctx: &RaftContext<T>,
1740        role_tx: &mpsc::UnboundedSender<RoleEvent>,
1741    ) -> Result<()> {
1742        // 1. Determine optimal promotion status based on quorum safety
1743        debug!("1. Determine optimal promotion status based on quorum safety");
1744        let membership = ctx.membership();
1745        let current_voters = membership.voters().await.len();
1746        // let syncings = membership.nodes_with_status(NodeStatus::Syncing).len();
1747        let new_active_count = current_voters + ready_learners_ids.len();
1748
1749        // Determine target status based on quorum safety
1750        trace!(
1751            ?current_voters,
1752            ?ready_learners_ids,
1753            "[Node-{}] new_active_count: {}",
1754            self.node_id(),
1755            new_active_count
1756        );
1757        let target_status = if ensure_safe_join(self.node_id(), new_active_count).is_ok() {
1758            trace!(
1759                "Going to update nodes-{:?} status to Active",
1760                ready_learners_ids
1761            );
1762            NodeStatus::Active
1763        } else {
1764            trace!(
1765                "Not enough quorum to promote learners: {:?}",
1766                ready_learners_ids
1767            );
1768            return Ok(());
1769        };
1770
1771        // 2. Create configuration change payload
1772        debug!("2. Create configuration change payload");
1773        let config_change = Change::BatchPromote(BatchPromote {
1774            node_ids: ready_learners_ids.clone(),
1775            new_status: target_status as i32,
1776        });
1777
1778        info!(?config_change, "Replicating cluster config");
1779
1780        // 3. Submit single config change for all ready learners
1781        debug!("3. Submit single config change for all ready learners");
1782        match self
1783            .verify_leadership_limited_retry(
1784                vec![EntryPayload::config(config_change)],
1785                true,
1786                ctx,
1787                role_tx,
1788            )
1789            .await
1790        {
1791            Ok(true) => {
1792                info!(
1793                    "Batch promotion committed for nodes: {:?}",
1794                    ready_learners_ids
1795                );
1796            }
1797            Ok(false) => {
1798                warn!("Failed to commit batch promotion");
1799            }
1800            Err(e) => {
1801                error!("Batch promotion error: {:?}", e);
1802                return Err(e);
1803            }
1804        }
1805
1806        Ok(())
1807    }
1808
1809    /// Calculate new submission index
1810    #[instrument(skip(self))]
1811    fn calculate_new_commit_index(
1812        &mut self,
1813        raft_log: &Arc<ROF<T>>,
1814        peer_updates: &HashMap<u32, PeerUpdate>,
1815    ) -> Option<u64> {
1816        let old_commit_index = self.commit_index();
1817        let current_term = self.current_term();
1818
1819        let matched_ids: Vec<u64> =
1820            peer_updates.keys().filter_map(|&id| self.match_index(id)).collect();
1821
1822        let new_commit_index =
1823            raft_log.calculate_majority_matched_index(current_term, old_commit_index, matched_ids);
1824
1825        if new_commit_index.is_some() && new_commit_index.unwrap() > old_commit_index {
1826            new_commit_index
1827        } else {
1828            None
1829        }
1830    }
1831
1832    #[allow(dead_code)]
1833    fn if_update_commit_index(
1834        &self,
1835        new_commit_index_option: Option<u64>,
1836    ) -> (bool, u64) {
1837        let current_commit_index = self.commit_index();
1838        if let Some(new_commit_index) = new_commit_index_option {
1839            debug!("Leader::update_commit_index: {:?}", new_commit_index);
1840            if current_commit_index < new_commit_index {
1841                return (true, new_commit_index);
1842            }
1843        }
1844        debug!("Leader::update_commit_index: false");
1845        (false, current_commit_index)
1846    }
1847
1848    pub async fn ensure_state_machine_upto_commit_index(
1849        &self,
1850        state_machine_handler: &Arc<SMHOF<T>>,
1851        last_applied: u64,
1852    ) -> Result<()> {
1853        let commit_index = self.commit_index();
1854
1855        debug!(
1856            "ensure_state_machine_upto_commit_index: last_applied:{} < commit_index:{} ?",
1857            last_applied, commit_index
1858        );
1859        if last_applied < commit_index {
1860            state_machine_handler.update_pending(commit_index);
1861
1862            // Wait for state machine to catch up
1863            // This ensures linearizable reads see all committed writes
1864            let timeout_ms = self.node_config.raft.read_consistency.state_machine_sync_timeout_ms;
1865            state_machine_handler
1866                .wait_applied(commit_index, std::time::Duration::from_millis(timeout_ms))
1867                .await?;
1868
1869            debug!("ensure_state_machine_upto_commit_index success");
1870        }
1871        Ok(())
1872    }
1873
1874    #[instrument(skip(self))]
1875    fn scheduled_purge_upto(
1876        &mut self,
1877        received_last_included: LogId,
1878    ) {
1879        if let Some(existing) = self.scheduled_purge_upto {
1880            if existing.index >= received_last_included.index {
1881                warn!(
1882                    ?received_last_included,
1883                    ?existing,
1884                    "Will not update scheduled_purge_upto, received invalid last_included log"
1885                );
1886                return;
1887            }
1888        }
1889        info!(?self.scheduled_purge_upto, ?received_last_included, "Updte scheduled_purge_upto.");
1890        self.scheduled_purge_upto = Some(received_last_included);
1891    }
1892
1893    fn peer_purge_progress(
1894        &mut self,
1895        responses: Vec<Result<PurgeLogResponse>>,
1896        role_tx: &mpsc::UnboundedSender<RoleEvent>,
1897    ) -> Result<()> {
1898        if responses.is_empty() {
1899            return Ok(());
1900        }
1901        for r in responses.iter().flatten() {
1902            if r.term > self.current_term() {
1903                self.update_current_term(r.term);
1904                self.send_become_follower_event(None, role_tx)?;
1905            }
1906
1907            if let Some(last_purged) = r.last_purged {
1908                self.peer_purge_progress
1909                    .entry(r.node_id)
1910                    .and_modify(|v| *v = last_purged.index)
1911                    .or_insert(last_purged.index);
1912            }
1913        }
1914
1915        Ok(())
1916    }
1917
1918    fn send_become_follower_event(
1919        &self,
1920        new_leader_id: Option<u32>,
1921        role_tx: &mpsc::UnboundedSender<RoleEvent>,
1922    ) -> Result<()> {
1923        info!(
1924            ?new_leader_id,
1925            "Leader is going to step down as Follower..."
1926        );
1927        role_tx.send(RoleEvent::BecomeFollower(new_leader_id)).map_err(|e| {
1928            let error_str = format!("{e:?}");
1929            error!("Failed to send: {}", error_str);
1930            NetworkError::SingalSendFailed(error_str)
1931        })?;
1932
1933        Ok(())
1934    }
1935
1936    /// Determines if logs prior to `last_included_in_snapshot` can be permanently discarded.
1937    ///
1938    /// Implements Leader-side log compaction safety checks per Raft paper ยง7.2:
1939    /// > "The leader uses a new RPC called InstallSnapshot to send snapshots to followers that are
1940    /// > too far behind"
1941    ///
1942    /// # Safety Invariants (ALL must hold)
1943    ///
1944    /// 1. **Committed Entry Guarantee**   `last_included_in_snapshot.index < self.commit_index`
1945    ///    - Ensures we never discard uncommitted entries (Raft ยง5.4.2)
1946    ///    - Maintains at least one committed entry after purge for log matching property
1947    ///
1948    /// 2. **Monotonic Snapshot Advancement**   `last_purge_index < last_included_in_snapshot.index`
1949    ///    - Enforces snapshot indices strictly increase (prevents rollback attacks)
1950    ///    - Maintains sequential purge ordering (FSM safety requirement)
1951    ///
1952    /// 3. **Cluster-wide Progress Validation**   `peer_purge_progress.values().all(โ‰ฅ
1953    ///    snapshot.index)`
1954    ///    - Ensures ALL followers have confirmed ability to reach this snapshot
1955    ///    - Prevents leadership changes from causing log inconsistencies
1956    ///
1957    /// 4. **Operation Atomicity**   `pending_purge.is_none()`
1958    ///    - Ensures only one concurrent purge operation
1959    ///    - Critical for linearizable state machine semantics
1960    ///
1961    /// # Implementation Notes
1962    /// - Leader must maintain `peer_purge_progress` through AppendEntries responses
1963    /// - Actual log discard should be deferred until storage confirms snapshot persistence
1964    /// - Design differs from followers by requiring full cluster confirmation (Raft extension for
1965    ///   enhanced durability)
1966    #[instrument(skip(self))]
1967    pub fn can_purge_logs(
1968        &self,
1969        last_purge_index: Option<LogId>,
1970        last_included_in_snapshot: LogId,
1971    ) -> bool {
1972        let commit_index = self.commit_index();
1973        debug!(?self
1974                    .peer_purge_progress, ?commit_index, ?last_purge_index, ?last_included_in_snapshot, "can_purge_logs");
1975        let monotonic_check = last_purge_index
1976            .map(|lid| lid.index < last_included_in_snapshot.index)
1977            .unwrap_or(true);
1978
1979        last_included_in_snapshot.index < commit_index
1980            && monotonic_check
1981            && self.peer_purge_progress.values().all(|&v| v >= last_included_in_snapshot.index)
1982    }
1983
1984    pub async fn handle_join_cluster(
1985        &mut self,
1986        join_request: JoinRequest,
1987        sender: MaybeCloneOneshotSender<std::result::Result<JoinResponse, Status>>,
1988        ctx: &RaftContext<T>,
1989        role_tx: &mpsc::UnboundedSender<RoleEvent>,
1990    ) -> Result<()> {
1991        let node_id = join_request.node_id;
1992        let node_role = join_request.node_role;
1993        let address = join_request.address;
1994        let status = join_request.status;
1995        let membership = ctx.membership();
1996
1997        // 1. Validate join request
1998        debug!("1. Validate join request");
1999        if membership.contains_node(node_id).await {
2000            let error_msg = format!("Node {node_id} already exists in cluster");
2001            warn!(%error_msg);
2002            return self.send_join_error(sender, MembershipError::NodeAlreadyExists(node_id)).await;
2003        }
2004
2005        // 2. Create configuration change payload
2006        debug!("2. Create configuration change payload");
2007        if let Err(e) = membership.can_rejoin(node_id, node_role).await {
2008            let error_msg = format!("Node {node_id} cannot rejoin: {e}",);
2009            warn!(%error_msg);
2010            return self
2011                .send_join_error(sender, MembershipError::JoinClusterError(error_msg))
2012                .await;
2013        }
2014
2015        let config_change = Change::AddNode(AddNode {
2016            node_id,
2017            address: address.clone(),
2018            status,
2019        });
2020
2021        // 3. Submit config change, and wait for quorum confirmation
2022        debug!("3. Wait for quorum confirmation");
2023        match self
2024            .verify_leadership_limited_retry(
2025                vec![EntryPayload::config(config_change)],
2026                true,
2027                ctx,
2028                role_tx,
2029            )
2030            .await
2031        {
2032            Ok(true) => {
2033                // 4. Update node status to Syncing
2034                debug!("4. Update node status to Syncing");
2035
2036                debug!(
2037                    "After updating, the replications peers: {:?}",
2038                    ctx.membership().replication_peers().await
2039                );
2040
2041                // Note: Cluster metadata cache will be updated via MembershipApplied event
2042                // after CommitHandler applies the config change to membership state
2043
2044                // 5. Send successful response
2045                debug!("5. Send successful response");
2046                info!("Join config committed for node {}", node_id);
2047                self.send_join_success(node_id, &address, sender, ctx).await?;
2048            }
2049            Ok(false) => {
2050                warn!("Failed to commit join config for node {}", node_id);
2051                self.send_join_error(sender, MembershipError::CommitTimeout).await?
2052            }
2053            Err(e) => {
2054                error!("Error waiting for commit: {:?}", e);
2055                self.send_join_error(sender, e).await?
2056            }
2057        }
2058        Ok(())
2059    }
2060
2061    async fn send_join_success(
2062        &self,
2063        node_id: u32,
2064        address: &str,
2065        sender: MaybeCloneOneshotSender<std::result::Result<JoinResponse, Status>>,
2066        ctx: &RaftContext<T>,
2067    ) -> Result<()> {
2068        // Retrieve latest snapshot metadata, if there is
2069        let snapshot_metadata = ctx.state_machine_handler().get_latest_snapshot_metadata();
2070
2071        // Prepare response
2072        let response = JoinResponse {
2073            success: true,
2074            error: String::new(),
2075            config: Some(
2076                ctx.membership()
2077                    .retrieve_cluster_membership_config(self.shared_state().current_leader())
2078                    .await,
2079            ),
2080            config_version: ctx.membership().get_cluster_conf_version().await,
2081            snapshot_metadata,
2082            leader_id: self.node_id(),
2083        };
2084
2085        sender.send(Ok(response)).map_err(|e| {
2086            error!("Failed to send join response: {:?}", e);
2087            NetworkError::SingalSendFailed(format!("{e:?}"))
2088        })?;
2089
2090        info!(
2091            "Node {} ({}) successfully added as learner",
2092            node_id, address
2093        );
2094
2095        // Print leader accepting new node message (Plan B)
2096        crate::utils::cluster_printer::print_leader_accepting_new_node(
2097            self.node_id(),
2098            node_id,
2099            address,
2100            d_engine_proto::common::NodeRole::Learner as i32,
2101        );
2102
2103        Ok(())
2104    }
2105
2106    async fn send_join_error(
2107        &self,
2108        sender: MaybeCloneOneshotSender<std::result::Result<JoinResponse, Status>>,
2109        error: impl Into<Error>,
2110    ) -> Result<()> {
2111        let error = error.into();
2112        let status = Status::failed_precondition(error.to_string());
2113
2114        sender.send(Err(status)).map_err(|e| {
2115            error!("Failed to send join error: {:?}", e);
2116            NetworkError::SingalSendFailed(format!("{e:?}"))
2117        })?;
2118
2119        Err(error)
2120    }
2121
2122    #[cfg(any(test, feature = "test-utils"))]
2123    pub fn new(
2124        node_id: u32,
2125        node_config: Arc<RaftNodeConfig>,
2126    ) -> Self {
2127        let ReplicationConfig {
2128            rpc_append_entries_in_batch_threshold,
2129            rpc_append_entries_batch_process_delay_in_ms,
2130            rpc_append_entries_clock_in_ms,
2131            ..
2132        } = node_config.raft.replication;
2133
2134        LeaderState {
2135            cluster_metadata: ClusterMetadata {
2136                single_voter: false,
2137                total_voters: 0,
2138                replication_targets: vec![],
2139            },
2140            shared_state: SharedState::new(node_id, None, None),
2141            timer: Box::new(ReplicationTimer::new(
2142                rpc_append_entries_clock_in_ms,
2143                rpc_append_entries_batch_process_delay_in_ms,
2144            )),
2145            next_index: HashMap::new(),
2146            match_index: HashMap::new(),
2147            noop_log_id: None,
2148
2149            batch_buffer: Box::new(BatchBuffer::new(
2150                rpc_append_entries_in_batch_threshold,
2151                Duration::from_millis(rpc_append_entries_batch_process_delay_in_ms),
2152            )),
2153
2154            node_config,
2155            scheduled_purge_upto: None,
2156            last_purged_index: None, //TODO
2157            last_learner_check: Instant::now(),
2158            peer_purge_progress: HashMap::new(),
2159            snapshot_in_progress: AtomicBool::new(false),
2160            next_membership_maintenance_check: Instant::now(),
2161            pending_promotions: VecDeque::new(),
2162            _marker: PhantomData,
2163            lease_timestamp: AtomicU64::new(0),
2164        }
2165    }
2166
2167    pub async fn trigger_background_snapshot(
2168        node_id: u32,
2169        metadata: SnapshotMetadata,
2170        state_machine_handler: Arc<SMHOF<T>>,
2171        membership: Arc<MOF<T>>,
2172        config: SnapshotConfig,
2173    ) -> Result<()> {
2174        let (result_tx, result_rx) = oneshot::channel();
2175
2176        // Delegate the actual transfer to a dedicated thread pool
2177        tokio::task::spawn_blocking(move || {
2178            let rt = tokio::runtime::Handle::current();
2179            let result = rt.block_on(async move {
2180                let bulk_channel = membership
2181                    .get_peer_channel(node_id, ConnectionType::Bulk)
2182                    .await
2183                    .ok_or(NetworkError::PeerConnectionNotFound(node_id))?;
2184
2185                let data_stream =
2186                    state_machine_handler.load_snapshot_data(metadata.clone()).await?;
2187
2188                BackgroundSnapshotTransfer::<T>::run_push_transfer(
2189                    node_id,
2190                    data_stream,
2191                    bulk_channel,
2192                    config,
2193                )
2194                .await
2195            });
2196
2197            // Non-blocking send result
2198            let _ = result_tx.send(result);
2199        });
2200
2201        // Non-blocking check result
2202        tokio::spawn(async move {
2203            match result_rx.await {
2204                Ok(Ok(_)) => info!("Snapshot to {} completed", node_id),
2205                Ok(Err(e)) => error!("Snapshot to {} failed: {:?}", node_id, e),
2206                Err(_) => warn!("Snapshot result channel closed unexpectedly"),
2207            }
2208        });
2209
2210        Ok(())
2211    }
2212
2213    /// Processes all pending promotions while respecting the cluster's odd-node constraint
2214    pub async fn process_pending_promotions(
2215        &mut self,
2216        ctx: &RaftContext<T>,
2217        role_tx: &mpsc::UnboundedSender<RoleEvent>,
2218    ) -> Result<()> {
2219        debug!(
2220            "[Leader {}] ๐Ÿ”„ process_pending_promotions called, pending: {:?}",
2221            self.node_id(),
2222            self.pending_promotions.iter().map(|p| p.node_id).collect::<Vec<_>>()
2223        );
2224
2225        // Get promotion configuration from the node config
2226        let config = &ctx.node_config().raft.membership.promotion;
2227
2228        // Step 1: Remove stale entries (older than configured threshold)
2229        let now = Instant::now();
2230        self.pending_promotions.retain(|entry| {
2231            now.duration_since(entry.ready_since) <= config.stale_learner_threshold
2232        });
2233
2234        if self.pending_promotions.is_empty() {
2235            debug!(
2236                "[Leader {}] โŒ pending_promotions is empty after stale cleanup",
2237                self.node_id()
2238            );
2239            return Ok(());
2240        }
2241
2242        // Step 2: Get current voter count (including self as leader)
2243        let membership = ctx.membership();
2244        let current_voters = membership.voters().await.len() + 1; // +1 for self (leader)
2245        debug!(
2246            "[Leader {}] ๐Ÿ“Š current_voters: {}, pending: {}",
2247            self.node_id(),
2248            current_voters,
2249            self.pending_promotions.len()
2250        );
2251
2252        // Step 3: Calculate the maximum batch size that preserves an odd total
2253        let max_batch_size =
2254            calculate_safe_batch_size(current_voters, self.pending_promotions.len());
2255        debug!(
2256            "[Leader {}] ๐ŸŽฏ max_batch_size: {}",
2257            self.node_id(),
2258            max_batch_size
2259        );
2260
2261        if max_batch_size == 0 {
2262            // Nothing we can safely promote now
2263            debug!(
2264                "[Leader {}] โš ๏ธ max_batch_size is 0, cannot promote now",
2265                self.node_id()
2266            );
2267            return Ok(());
2268        }
2269
2270        // Step 4: Extract the batch from the queue (FIFO order)
2271        let promotion_entries = self.drain_batch(max_batch_size);
2272        let promotion_node_ids = promotion_entries.iter().map(|e| e.node_id).collect::<Vec<_>>();
2273
2274        // Step 5: Execute batch promotion
2275        if !promotion_node_ids.is_empty() {
2276            // Log the batch promotion
2277            info!(
2278                "Promoting learner batch of {} nodes: {:?} (total voters: {} -> {})",
2279                promotion_node_ids.len(),
2280                promotion_node_ids,
2281                current_voters,
2282                current_voters + promotion_node_ids.len()
2283            );
2284
2285            // Attempt promotion and restore batch on failure
2286            let result = self.safe_batch_promote(promotion_node_ids.clone(), ctx, role_tx).await;
2287
2288            if let Err(e) = result {
2289                // Restore entries to the front of the queue in reverse order
2290                for entry in promotion_entries.into_iter().rev() {
2291                    self.pending_promotions.push_front(entry);
2292                }
2293                return Err(e);
2294            }
2295
2296            info!(
2297                "Promotion successful. Cluster members: {:?}",
2298                membership.voters().await
2299            );
2300        }
2301
2302        trace!(
2303            ?self.pending_promotions,
2304            "Step 6: Reschedule if any pending promotions remain"
2305        );
2306        // Step 6: Reschedule if any pending promotions remain
2307        if !self.pending_promotions.is_empty() {
2308            debug!(
2309                "[Leader {}] ๐Ÿ” Re-sending PromoteReadyLearners for remaining pending: {:?}",
2310                self.node_id(),
2311                self.pending_promotions.iter().map(|p| p.node_id).collect::<Vec<_>>()
2312            );
2313            // Important: Re-send the event to trigger next cycle
2314            role_tx
2315                .send(RoleEvent::ReprocessEvent(Box::new(
2316                    RaftEvent::PromoteReadyLearners,
2317                )))
2318                .map_err(|e| {
2319                    let error_str = format!("{e:?}");
2320                    error!("Send PromoteReadyLearners event failed: {}", error_str);
2321                    NetworkError::SingalSendFailed(error_str)
2322                })?;
2323        }
2324
2325        Ok(())
2326    }
2327
2328    /// Removes the first `count` nodes from the pending queue and returns them
2329    pub(super) fn drain_batch(
2330        &mut self,
2331        count: usize,
2332    ) -> Vec<PendingPromotion> {
2333        let mut batch = Vec::with_capacity(count);
2334        for _ in 0..count {
2335            if let Some(entry) = self.pending_promotions.pop_front() {
2336                batch.push(entry);
2337            } else {
2338                break;
2339            }
2340        }
2341        batch
2342    }
2343
2344    async fn safe_batch_promote(
2345        &mut self,
2346        batch: Vec<u32>,
2347        ctx: &RaftContext<T>,
2348        role_tx: &mpsc::UnboundedSender<RoleEvent>,
2349    ) -> Result<()> {
2350        let change = Change::BatchPromote(BatchPromote {
2351            node_ids: batch.clone(),
2352            new_status: NodeStatus::Active as i32,
2353        });
2354
2355        // Submit batch activation
2356        self.verify_leadership_limited_retry(
2357            vec![EntryPayload::config(change)],
2358            true,
2359            ctx,
2360            role_tx,
2361        )
2362        .await?;
2363
2364        Ok(())
2365    }
2366
2367    async fn run_periodic_maintenance(
2368        &mut self,
2369        role_tx: &mpsc::UnboundedSender<RoleEvent>,
2370        ctx: &RaftContext<T>,
2371    ) -> Result<()> {
2372        if let Err(e) = self.conditionally_purge_stale_learners(role_tx, ctx).await {
2373            error!("Stale learner purge failed: {}", e);
2374        }
2375
2376        if let Err(e) = self.conditionally_purge_zombie_nodes(role_tx, ctx).await {
2377            error!("Zombie node purge failed: {}", e);
2378        }
2379
2380        // Regardless of whether a stale node is found, we set the next check according to a fixed
2381        // period
2382        self.reset_next_membership_maintenance_check(
2383            ctx.node_config().raft.membership.membership_maintenance_interval,
2384        );
2385        Ok(())
2386    }
2387
2388    /// Periodic check triggered every ~30s in the worst-case scenario
2389    /// using priority-based lazy scheduling. Actual average frequency
2390    /// is inversely proportional to system load.
2391    pub async fn conditionally_purge_stale_learners(
2392        &mut self,
2393        role_tx: &mpsc::UnboundedSender<RoleEvent>,
2394        ctx: &RaftContext<T>,
2395    ) -> Result<()> {
2396        let config = &ctx.node_config.raft.membership.promotion;
2397
2398        // Optimization: Skip check 99.9% of the time using scheduled trial method
2399        if self.pending_promotions.is_empty()
2400            || self.next_membership_maintenance_check > Instant::now()
2401        {
2402            trace!("Skipping stale learner check");
2403            return Ok(());
2404        }
2405
2406        let now = Instant::now();
2407        let queue_len = self.pending_promotions.len();
2408
2409        // Inspect only oldest 1% of items or max 100 entries per rules
2410        let inspect_count = queue_len.min(100).min(1.max(queue_len / 100));
2411        let mut stale_entries = Vec::new();
2412
2413        trace!("Inspecting {} entries", inspect_count);
2414        for _ in 0..inspect_count {
2415            if let Some(entry) = self.pending_promotions.pop_front() {
2416                trace!(
2417                    "Inspecting entry: {:?} - {:?} - {:?}",
2418                    entry,
2419                    now.duration_since(entry.ready_since),
2420                    &config.stale_learner_threshold
2421                );
2422                if now.duration_since(entry.ready_since) > config.stale_learner_threshold {
2423                    stale_entries.push(entry);
2424                } else {
2425                    // Return non-stale entry and stop
2426                    self.pending_promotions.push_front(entry);
2427                    break;
2428                }
2429            } else {
2430                break;
2431            }
2432        }
2433
2434        trace!("Stale learner check completed: {:?}", stale_entries);
2435
2436        // Process collected stale entries
2437        for entry in stale_entries {
2438            if let Err(e) = self.handle_stale_learner(entry.node_id, role_tx, ctx).await {
2439                error!("Failed to handle stale learner: {}", e);
2440            }
2441        }
2442
2443        Ok(())
2444    }
2445
2446    /// Remove non-Active zombie nodes that exceed failure threshold
2447    async fn conditionally_purge_zombie_nodes(
2448        &mut self,
2449        role_tx: &mpsc::UnboundedSender<RoleEvent>,
2450        ctx: &RaftContext<T>,
2451    ) -> Result<()> {
2452        // Optimize: get membership reference once to reduce lock contention
2453        let membership = ctx.membership();
2454        let zombie_candidates = membership.get_zombie_candidates().await;
2455        let mut nodes_to_remove = Vec::new();
2456
2457        for node_id in zombie_candidates {
2458            if let Some(status) = membership.get_node_status(node_id).await {
2459                if status != NodeStatus::Active {
2460                    nodes_to_remove.push(node_id);
2461                }
2462            }
2463        }
2464        // Batch removal if we have candidates
2465        if !nodes_to_remove.is_empty() {
2466            let change = Change::BatchRemove(BatchRemove {
2467                node_ids: nodes_to_remove.clone(),
2468            });
2469
2470            info!(
2471                "Proposing batch removal of zombie nodes: {:?}",
2472                nodes_to_remove
2473            );
2474
2475            // Submit single config change for all nodes
2476            match self
2477                .verify_leadership_limited_retry(
2478                    vec![EntryPayload::config(change)],
2479                    false,
2480                    ctx,
2481                    role_tx,
2482                )
2483                .await
2484            {
2485                Ok(true) => {
2486                    info!("Batch removal committed for nodes: {:?}", nodes_to_remove);
2487                }
2488                Ok(false) => {
2489                    warn!("Failed to commit batch removal");
2490                }
2491                Err(e) => {
2492                    error!("Batch removal error: {:?}", e);
2493                    return Err(e);
2494                }
2495            }
2496        }
2497
2498        Ok(())
2499    }
2500
2501    pub fn reset_next_membership_maintenance_check(
2502        &mut self,
2503        membership_maintenance_interval: Duration,
2504    ) {
2505        self.next_membership_maintenance_check = Instant::now() + membership_maintenance_interval;
2506    }
2507
2508    /// Remove stalled learner via membership change consensus
2509    pub async fn handle_stale_learner(
2510        &mut self,
2511        node_id: u32,
2512        role_tx: &mpsc::UnboundedSender<RoleEvent>,
2513        ctx: &RaftContext<T>,
2514    ) -> Result<()> {
2515        // Stalled learner detected - remove via membership change (requires consensus)
2516        warn!(
2517            "Learner {} is stalled, removing from cluster via consensus",
2518            node_id
2519        );
2520
2521        let change = Change::BatchRemove(BatchRemove {
2522            node_ids: vec![node_id],
2523        });
2524
2525        // Submit removal through membership change entry (requires quorum consensus)
2526        match self
2527            .verify_leadership_limited_retry(vec![EntryPayload::config(change)], true, ctx, role_tx)
2528            .await
2529        {
2530            Ok(true) => {
2531                info!(
2532                    "Stalled learner {} successfully removed from cluster",
2533                    node_id
2534                );
2535            }
2536            Ok(false) => {
2537                warn!("Failed to commit removal of stalled learner {}", node_id);
2538            }
2539            Err(e) => {
2540                error!("Error removing stalled learner {}: {:?}", node_id, e);
2541                return Err(e);
2542            }
2543        }
2544
2545        Ok(())
2546    }
2547
2548    /// Check if current lease is still valid for LeaseRead policy
2549    pub fn is_lease_valid(
2550        &self,
2551        ctx: &RaftContext<T>,
2552    ) -> bool {
2553        let now = std::time::SystemTime::now()
2554            .duration_since(std::time::UNIX_EPOCH)
2555            .unwrap_or_default()
2556            .as_millis() as u64;
2557
2558        let last_confirmed = self.lease_timestamp.load(std::sync::atomic::Ordering::Acquire);
2559        let lease_duration = ctx.node_config().raft.read_consistency.lease_duration_ms;
2560
2561        if now <= last_confirmed {
2562            // Clock moved backwards or equal: conservatively treat lease as invalid
2563            error!("Clock moved backwards or equal: Now {now}, Last Confirmed {last_confirmed}");
2564            return false;
2565        }
2566        (now - last_confirmed) < lease_duration
2567    }
2568
2569    /// Update lease timestamp after successful leadership verification
2570    fn update_lease_timestamp(&self) {
2571        let now = std::time::SystemTime::now()
2572            .duration_since(std::time::UNIX_EPOCH)
2573            .unwrap_or_default()
2574            .as_millis() as u64;
2575
2576        self.lease_timestamp.store(now, std::sync::atomic::Ordering::Release);
2577    }
2578
2579    #[cfg(any(test, feature = "test-utils"))]
2580    pub fn test_update_lease_timestamp(&self) {
2581        self.update_lease_timestamp();
2582    }
2583
2584    #[cfg(any(test, feature = "test-utils"))]
2585    pub fn test_set_lease_timestamp(
2586        &self,
2587        timestamp: u64,
2588    ) {
2589        self.lease_timestamp.store(timestamp, std::sync::atomic::Ordering::Release);
2590    }
2591}
2592
2593impl<T: TypeConfig> From<&CandidateState<T>> for LeaderState<T> {
2594    fn from(candidate: &CandidateState<T>) -> Self {
2595        let ReplicationConfig {
2596            rpc_append_entries_in_batch_threshold,
2597            rpc_append_entries_batch_process_delay_in_ms,
2598            rpc_append_entries_clock_in_ms,
2599            ..
2600        } = candidate.node_config.raft.replication;
2601
2602        // Clone shared_state and set self as leader immediately
2603        let shared_state = candidate.shared_state.clone();
2604        shared_state.set_current_leader(candidate.node_id());
2605
2606        Self {
2607            shared_state,
2608            timer: Box::new(ReplicationTimer::new(
2609                rpc_append_entries_clock_in_ms,
2610                rpc_append_entries_batch_process_delay_in_ms,
2611            )),
2612            next_index: HashMap::new(),
2613            match_index: HashMap::new(),
2614            noop_log_id: None,
2615
2616            batch_buffer: Box::new(BatchBuffer::new(
2617                rpc_append_entries_in_batch_threshold,
2618                Duration::from_millis(rpc_append_entries_batch_process_delay_in_ms),
2619            )),
2620
2621            node_config: candidate.node_config.clone(),
2622
2623            scheduled_purge_upto: None,
2624            last_purged_index: candidate.last_purged_index,
2625            last_learner_check: Instant::now(),
2626            snapshot_in_progress: AtomicBool::new(false),
2627            peer_purge_progress: HashMap::new(),
2628            next_membership_maintenance_check: Instant::now(),
2629            pending_promotions: VecDeque::new(),
2630            cluster_metadata: ClusterMetadata {
2631                single_voter: false,
2632                total_voters: 0,
2633                replication_targets: vec![],
2634            },
2635
2636            _marker: PhantomData,
2637            lease_timestamp: AtomicU64::new(0),
2638        }
2639    }
2640}
2641
2642impl<T: TypeConfig> Debug for LeaderState<T> {
2643    fn fmt(
2644        &self,
2645        f: &mut std::fmt::Formatter<'_>,
2646    ) -> std::fmt::Result {
2647        f.debug_struct("LeaderState")
2648            .field("shared_state", &self.shared_state)
2649            .field("next_index", &self.next_index)
2650            .field("match_index", &self.match_index)
2651            .field("noop_log_id", &self.noop_log_id)
2652            .finish()
2653    }
2654}
2655
2656/// Calculates the maximum number of nodes we can promote while keeping the total voter count
2657/// odd
2658///
2659/// - `current`: current number of voting nodes
2660/// - `available`: number of ready learners pending promotion
2661///
2662/// Returns the maximum number of nodes to promote (0 if no safe promotion exists)
2663pub fn calculate_safe_batch_size(
2664    current: usize,
2665    available: usize,
2666) -> usize {
2667    if (current + available) % 2 == 1 {
2668        // Promoting all is safe
2669        available
2670    } else {
2671        // We can only promote (available - 1) to keep the invariant
2672        // But if available - 1 == 0, then we cannot promote any?
2673        available.saturating_sub(1)
2674    }
2675}
2676
2677pub(super) fn send_replay_raft_event(
2678    role_tx: &mpsc::UnboundedSender<RoleEvent>,
2679    raft_event: RaftEvent,
2680) -> Result<()> {
2681    role_tx.send(RoleEvent::ReprocessEvent(Box::new(raft_event))).map_err(|e| {
2682        let error_str = format!("{e:?}");
2683        error!("Failed to send: {}", error_str);
2684        NetworkError::SingalSendFailed(error_str).into()
2685    })
2686}