d_engine_core/raft_role/
leader_state.rs

1use std::collections::HashMap;
2use std::collections::VecDeque;
3use std::fmt::Debug;
4use std::marker::PhantomData;
5use std::sync::Arc;
6use std::sync::atomic::AtomicBool;
7use std::sync::atomic::AtomicU64;
8use std::sync::atomic::Ordering;
9use std::time::Duration;
10
11use d_engine_proto::client::ClientReadRequest;
12use d_engine_proto::client::ClientResponse;
13use d_engine_proto::client::ReadConsistencyPolicy as ClientReadConsistencyPolicy;
14use d_engine_proto::common::AddNode;
15use d_engine_proto::common::BatchPromote;
16use d_engine_proto::common::BatchRemove;
17use d_engine_proto::common::EntryPayload;
18use d_engine_proto::common::LogId;
19use d_engine_proto::common::NodeRole::Leader;
20use d_engine_proto::common::NodeStatus;
21use d_engine_proto::common::membership_change::Change;
22use d_engine_proto::error::ErrorCode;
23use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
24use d_engine_proto::server::cluster::JoinRequest;
25use d_engine_proto::server::cluster::JoinResponse;
26use d_engine_proto::server::cluster::LeaderDiscoveryResponse;
27use d_engine_proto::server::cluster::NodeMeta;
28use d_engine_proto::server::election::VoteResponse;
29use d_engine_proto::server::election::VotedFor;
30use d_engine_proto::server::replication::AppendEntriesResponse;
31use d_engine_proto::server::storage::PurgeLogRequest;
32use d_engine_proto::server::storage::PurgeLogResponse;
33use d_engine_proto::server::storage::SnapshotChunk;
34use d_engine_proto::server::storage::SnapshotMetadata;
35use nanoid::nanoid;
36use tokio::sync::mpsc;
37use tokio::sync::oneshot;
38use tokio::time::Instant;
39use tokio::time::sleep;
40use tokio::time::timeout;
41use tonic::Status;
42use tonic::async_trait;
43use tracing::debug;
44use tracing::error;
45use tracing::info;
46use tracing::instrument;
47use tracing::trace;
48use tracing::warn;
49
50use super::LeaderStateSnapshot;
51use super::RaftRole;
52use super::SharedState;
53use super::StateSnapshot;
54use super::candidate_state::CandidateState;
55use super::role_state::RaftRoleState;
56use crate::AppendResults;
57use crate::BackgroundSnapshotTransfer;
58use crate::BatchBuffer;
59use crate::ConnectionType;
60use crate::ConsensusError;
61use crate::Error;
62use crate::MaybeCloneOneshot;
63use crate::MaybeCloneOneshotSender;
64use crate::Membership;
65use crate::MembershipError;
66use crate::NetworkError;
67use crate::PeerUpdate;
68use crate::PurgeExecutor;
69use crate::QuorumVerificationResult;
70use crate::RaftContext;
71use crate::RaftEvent;
72use crate::RaftLog;
73use crate::RaftNodeConfig;
74use crate::RaftOneshot;
75use crate::RaftRequestWithSignal;
76use crate::ReadConsistencyPolicy as ServerReadConsistencyPolicy;
77use crate::ReplicationConfig;
78use crate::ReplicationCore;
79use crate::ReplicationError;
80use crate::ReplicationTimer;
81use crate::Result;
82use crate::RoleEvent;
83use crate::SnapshotConfig;
84use crate::StateMachine;
85use crate::StateMachineHandler;
86use crate::StateTransitionError;
87use crate::SystemError;
88use crate::Transport;
89use crate::TypeConfig;
90use crate::alias::MOF;
91use crate::alias::ROF;
92use crate::alias::SMHOF;
93use crate::client_command_to_entry_payloads;
94use crate::cluster::is_majority;
95use crate::ensure_safe_join;
96use crate::scoped_timer::ScopedTimer;
97use crate::stream::create_production_snapshot_stream;
98use crate::utils::cluster::error;
99
100// Supporting data structures
101#[derive(Debug, Clone)]
102pub struct PendingPromotion {
103    pub node_id: u32,
104    pub ready_since: Instant,
105}
106
107impl PendingPromotion {
108    pub fn new(
109        node_id: u32,
110        ready_since: Instant,
111    ) -> Self {
112        PendingPromotion {
113            node_id,
114            ready_since,
115        }
116    }
117}
118
119/// Cached cluster topology metadata for hot path optimization.
120///
121/// This metadata is immutable during leadership and only updated on membership changes.
122/// Caching avoids repeated async calls to membership queries in hot paths like append_entries.
123#[derive(Debug, Clone)]
124pub struct ClusterMetadata {
125    /// Single-voter cluster (quorum = 1, used for election and commit optimization)
126    pub single_voter: bool,
127    /// Total number of voters including self (updated on membership changes)
128    pub total_voters: usize,
129    /// Cached replication targets (voters + learners, excluding self)
130    pub replication_targets: Vec<NodeMeta>,
131}
132
133/// Leader node's state in Raft consensus algorithm.
134///
135/// This structure maintains all state that should be persisted on leader crashes,
136/// including replication progress tracking and log compaction management.
137///
138/// # Type Parameters
139/// - `T`: Application-specific Raft type configuration
140pub struct LeaderState<T: TypeConfig> {
141    // -- Core Raft Leader State --
142    /// Shared cluster state with lock protection
143    pub shared_state: SharedState,
144
145    /// === Volatile State ===
146    /// For each server (node_id), index of the next log entry to send to that server
147    ///
148    /// Raft Paper: ยง5.3 Figure 2 (nextIndex)
149    pub next_index: HashMap<u32, u64>,
150
151    /// === Volatile State ===
152    /// For each server (node_id), index of highest log entry known to be replicated
153    ///
154    /// Raft Paper: ยง5.3 Figure 2 (matchIndex)
155    pub(super) match_index: HashMap<u32, u64>,
156
157    /// === Volatile State ===
158    /// Temporary storage for no-op entry log ID during leader initialization
159    #[doc(hidden)]
160    pub noop_log_id: Option<u64>,
161
162    // -- Log Compaction & Purge --
163    /// === Volatile State ===
164    /// The upper bound (exclusive) of log entries scheduled for asynchronous physical deletion.
165    ///
166    /// This value is set immediately after a new snapshot is successfully created.
167    /// It represents the next log position that will trigger compaction.
168    ///
169    /// The actual log purge is performed by a background task, which may be delayed
170    /// due to resource constraints or retry mechanisms.
171    pub scheduled_purge_upto: Option<LogId>,
172
173    /// === Persistent State (MUST be on disk) ===
174    /// The last log position that has been **physically removed** from stable storage.
175    ///
176    /// This value is atomically updated when:
177    /// 1. A new snapshot is persisted (marking logs up to `last_included_index` as purgeable)
178    /// 2. The background purge task completes successfully
179    ///
180    /// Raft safety invariant:
181    /// Any log entry with index โ‰ค `last_purged_index` is guaranteed to be
182    /// reflected in the latest snapshot.
183    pub last_purged_index: Option<LogId>,
184
185    /// === Volatile State ===
186    /// Peer purge progress tracking for flow control
187    ///
188    /// Key: Peer node ID
189    /// Value: Last confirmed purge index from peer
190    pub peer_purge_progress: HashMap<u32, u64>,
191
192    /// Record if there is on-going snapshot creation activity
193    pub snapshot_in_progress: AtomicBool,
194
195    // -- Request Processing --
196    /// Batched proposal buffer for client requests
197    ///
198    /// Accumulates requests until either:
199    /// 1. Batch reaches configured size limit
200    /// 2. Explicit flush is triggered
201    batch_buffer: Box<BatchBuffer<RaftRequestWithSignal>>,
202
203    // -- Timing & Scheduling --
204    /// Replication heartbeat timer manager
205    ///
206    /// Handles:
207    /// - Heartbeat interval tracking
208    /// - Election timeout prevention
209    /// - Batch proposal flushing
210    timer: Box<ReplicationTimer>,
211
212    // -- Cluster Configuration --
213    /// Cached Raft node configuration (shared reference)
214    ///
215    /// This includes:
216    /// - Cluster membership
217    /// - Timeout parameters
218    /// - Performance tuning knobs
219    pub(super) node_config: Arc<RaftNodeConfig>,
220
221    /// Last time we checked for learners
222    pub last_learner_check: Instant,
223
224    // -- Stale Learner Handling --
225    /// The next scheduled time to check for stale learners.
226    ///
227    /// This is used to implement periodic checking of learners that have been in the promotion
228    /// queue for too long without making progress. The check interval is configurable via the
229    /// node configuration (`node_config.promotion.stale_check_interval`).
230    ///
231    /// When the current time reaches or exceeds this instant, the leader will perform a scan
232    /// of a subset of the pending promotions to detect stale learners. After the scan, the field
233    /// is updated to `current_time + stale_check_interval`.
234    ///
235    /// Rationale: Avoiding frequent full scans improves batching efficiency and reduces CPU spikes
236    /// in high-load environments (particularly crucial for RocketMQ-on-DLedger workflows).
237    pub next_membership_maintenance_check: Instant,
238
239    /// Queue of learners that have caught up and are pending promotion to voter.
240    pub pending_promotions: VecDeque<PendingPromotion>,
241
242    /// Lease timestamp for LeaseRead policy
243    /// Tracks when leadership was last confirmed with quorum
244    pub(super) lease_timestamp: AtomicU64,
245
246    // -- Cluster Topology Cache --
247    /// Cached cluster metadata (updated on membership changes)
248    /// Avoids repeated async calls in hot paths
249    pub(super) cluster_metadata: ClusterMetadata,
250
251    /// Buffered read requests awaiting batch processing
252    pub(super) read_buffer: Vec<(
253        ClientReadRequest,
254        MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
255    )>,
256
257    /// Timestamp when first request entered buffer (for metrics/debugging)
258    pub(super) read_buffer_start_time: Option<Instant>,
259
260    // -- Type System Marker --
261    /// Phantom data for type parameter anchoring
262    _marker: PhantomData<T>,
263}
264
265#[async_trait]
266impl<T: TypeConfig> RaftRoleState for LeaderState<T> {
267    type T = T;
268
269    fn shared_state(&self) -> &SharedState {
270        &self.shared_state
271    }
272
273    fn shared_state_mut(&mut self) -> &mut SharedState {
274        &mut self.shared_state
275    }
276
277    ///Overwrite default behavior.
278    /// As leader, I should not receive commit index,
279    ///     which is lower than my current one
280    #[tracing::instrument]
281    fn update_commit_index(
282        &mut self,
283        new_commit_index: u64,
284    ) -> Result<()> {
285        if self.commit_index() < new_commit_index {
286            debug!("update_commit_index to: {:?}", new_commit_index);
287            self.shared_state.commit_index = new_commit_index;
288        } else {
289            warn!(
290                "Illegal operation, might be a bug! I am Leader old_commit_index({}) >= new_commit_index:({})",
291                self.commit_index(),
292                new_commit_index
293            )
294        }
295        Ok(())
296    }
297
298    /// As Leader should not vote any more
299    fn voted_for(&self) -> Result<Option<VotedFor>> {
300        self.shared_state().voted_for()
301    }
302
303    /// As Leader might also be able to vote ,
304    ///     if new legal Leader found
305    fn update_voted_for(
306        &mut self,
307        voted_for: VotedFor,
308    ) -> Result<bool> {
309        self.shared_state_mut().update_voted_for(voted_for)
310    }
311
312    fn next_index(
313        &self,
314        node_id: u32,
315    ) -> Option<u64> {
316        Some(if let Some(n) = self.next_index.get(&node_id) {
317            *n
318        } else {
319            1
320        })
321    }
322
323    fn update_next_index(
324        &mut self,
325        node_id: u32,
326        new_next_id: u64,
327    ) -> Result<()> {
328        debug!("update_next_index({}) to {}", node_id, new_next_id);
329        self.next_index.insert(node_id, new_next_id);
330        Ok(())
331    }
332
333    fn update_match_index(
334        &mut self,
335        node_id: u32,
336        new_match_id: u64,
337    ) -> Result<()> {
338        self.match_index.insert(node_id, new_match_id);
339        Ok(())
340    }
341
342    fn match_index(
343        &self,
344        node_id: u32,
345    ) -> Option<u64> {
346        self.match_index.get(&node_id).copied()
347    }
348
349    fn init_peers_next_index_and_match_index(
350        &mut self,
351        last_entry_id: u64,
352        peer_ids: Vec<u32>,
353    ) -> Result<()> {
354        for peer_id in peer_ids {
355            debug!("init leader state for peer_id: {:?}", peer_id);
356            let new_next_id = last_entry_id + 1;
357            self.update_next_index(peer_id, new_next_id)?;
358            self.update_match_index(peer_id, 0)?;
359        }
360        Ok(())
361    }
362
363    /// Track no-op entry index for linearizable read optimization.
364    /// Called after no-op entry is committed on leadership transition.
365    fn on_noop_committed(
366        &mut self,
367        ctx: &RaftContext<Self::T>,
368    ) -> Result<()> {
369        let noop_index = ctx.raft_log().last_entry_id();
370        self.noop_log_id = Some(noop_index);
371        debug!("Tracked noop_log_id: {}", noop_index);
372        Ok(())
373    }
374    fn noop_log_id(&self) -> Result<Option<u64>> {
375        Ok(self.noop_log_id)
376    }
377
378    /// Verifies leadership status using persistent retry until timeout.
379    ///
380    /// This function is designed for critical operations like configuration changes
381    /// that must eventually succeed. It implements:
382    ///   - Infinite retries with exponential backoff
383    ///   - Jitter randomization to prevent synchronization
384    ///   - Termination only on success, leadership loss, or global timeout
385    ///
386    /// # Parameters
387    /// - `payloads`: Log entries to verify
388    /// - `bypass_queue`: Whether to skip request queues for direct transmission
389    /// - `ctx`: Raft execution context
390    /// - `role_tx`: Channel for role transition events
391    ///
392    /// # Returns
393    /// - `Ok(true)`: Quorum verification succeeded
394    /// - `Ok(false)`: Leadership definitively lost during verification
395    /// - `Err(_)`: Global timeout exceeded or critical failure occurred
396    async fn verify_leadership_persistent(
397        &mut self,
398        payloads: Vec<EntryPayload>,
399        bypass_queue: bool,
400        ctx: &RaftContext<T>,
401        role_tx: &mpsc::UnboundedSender<RoleEvent>,
402    ) -> Result<bool> {
403        let initial_delay =
404            Duration::from_millis(ctx.node_config.retry.internal_quorum.base_delay_ms);
405        let max_delay = Duration::from_millis(ctx.node_config.retry.internal_quorum.max_delay_ms);
406        let global_timeout = ctx.node_config.raft.membership.verify_leadership_persistent_timeout;
407
408        let mut current_delay = initial_delay;
409        let start_time = Instant::now();
410
411        loop {
412            match self.verify_internal_quorum(payloads.clone(), bypass_queue, ctx, role_tx).await {
413                Ok(QuorumVerificationResult::Success) => {
414                    // Update lease timestamp after successful quorum verification.
415                    // This allows subsequent LeaseRead requests to benefit from this verification,
416                    // avoiding redundant quorum checks and improving read performance.
417                    self.update_lease_timestamp();
418                    return Ok(true);
419                }
420                Ok(QuorumVerificationResult::LeadershipLost) => return Ok(false),
421                Ok(QuorumVerificationResult::RetryRequired) => {
422                    // Check global timeout before retrying
423                    if start_time.elapsed() > global_timeout {
424                        return Err(NetworkError::GlobalTimeout(
425                            "Leadership verification timed out".to_string(),
426                        )
427                        .into());
428                    }
429
430                    current_delay =
431                        current_delay.checked_mul(2).unwrap_or(max_delay).min(max_delay);
432                    let jitter = Duration::from_millis(rand::random::<u64>() % 500);
433                    sleep(current_delay + jitter).await;
434                }
435                Err(e) => return Err(e),
436            }
437        }
438    }
439
440    /// Note: This method acts as the centerialized internal Raft client
441    ///
442    /// Check leadership quorum verification Immidiatelly
443    ///
444    /// - Bypasses all queues with direct RPC transmission
445    /// - Enforces synchronous quorum validation
446    /// - Guarantees real-time network visibility
447    ///
448    /// Scenario handling summary:
449    ///
450    /// 1. Quorum achieved:
451    ///    - Return: `Ok(QuorumVerificationResult::Success)`
452    ///
453    /// 2. Quorum NOT achieved (verifiable):
454    ///    - Return: `Ok(QuorumVerificationResult::RetryRequired)`
455    ///
456    /// 3. Quorum NOT achieved (non-verifiable):
457    ///    - Return: `Ok(QuorumVerificationResult::LeadershipLost)`
458    ///
459    /// 4. Partial timeouts:
460    ///    - Return: `Ok(QuorumVerificationResult::RetryRequired)`
461    ///
462    /// 5. All timeouts:
463    ///    - Return: `Ok(QuorumVerificationResult::RetryRequired)`
464    ///
465    /// 6. Higher term detected:
466    ///    - Return: `Err(HigherTerm)`
467    ///
468    /// 7. Critical failure (e.g., system or logic error):
469    ///    - Return: original error
470    /// Override: Initialize cluster metadata after becoming leader.
471    /// Must be called once after leader election succeeds.
472    async fn init_cluster_metadata(
473        &mut self,
474        membership: &Arc<T::M>,
475    ) -> Result<()> {
476        // Calculate total voter count including self as leader
477        let voters = membership.voters().await;
478        let total_voters = voters.len() + 1; // +1 for leader (self)
479
480        // Get all replication targets (voters + learners, excluding self)
481        let replication_targets = membership.replication_peers().await;
482
483        // Single-voter cluster: only this node is a voter (quorum = 1)
484        let single_voter = total_voters == 1;
485
486        self.cluster_metadata = ClusterMetadata {
487            single_voter,
488            total_voters,
489            replication_targets: replication_targets.clone(),
490        };
491
492        debug!(
493            "Initialized cluster metadata: single_voter={}, total_voters={}, replication_targets={}",
494            single_voter,
495            total_voters,
496            replication_targets.len()
497        );
498        Ok(())
499    }
500
501    async fn verify_internal_quorum(
502        &mut self,
503        payloads: Vec<EntryPayload>,
504        bypass_queue: bool,
505        ctx: &RaftContext<T>,
506        role_tx: &mpsc::UnboundedSender<RoleEvent>,
507    ) -> Result<QuorumVerificationResult> {
508        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
509
510        self.process_raft_request(
511            RaftRequestWithSignal {
512                id: nanoid!(),
513                payloads,
514                sender: resp_tx,
515            },
516            ctx,
517            bypass_queue,
518            role_tx,
519        )
520        .await?;
521
522        // Wait for response with timeout
523        let timeout_duration =
524            Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
525        match timeout(timeout_duration, resp_rx).await {
526            // Case 1: Response received successfully and verification passed
527            Ok(Ok(Ok(response))) => {
528                debug!("Leadership check response: {:?}", response);
529
530                // Handle different response cases
531                Ok(if response.is_write_success() {
532                    QuorumVerificationResult::Success
533                } else if response.is_retry_required() {
534                    // Verifiable quorum failure
535                    QuorumVerificationResult::RetryRequired
536                } else {
537                    // Non-verifiable failure or explicit rejection
538                    QuorumVerificationResult::LeadershipLost
539                })
540            }
541
542            // Case 2: Received explicit rejection status
543            Ok(Ok(Err(status))) => {
544                warn!("Leadership rejected by follower: {status:?}");
545                Ok(QuorumVerificationResult::LeadershipLost)
546            }
547
548            // Case 3: Channel communication failure (unrecoverable error)
549            Ok(Err(e)) => {
550                error!("Channel error during leadership check: {:?}", e);
551                Err(NetworkError::SingalReceiveFailed(e.to_string()).into())
552            }
553
554            // Case 4: Waiting for response timeout
555            Err(_) => {
556                warn!("Leadership check timed out after {:?}", timeout_duration);
557                Err(NetworkError::Timeout {
558                    node_id: self.node_id(),
559                    duration: timeout_duration,
560                }
561                .into())
562            }
563        }
564    }
565
566    fn is_leader(&self) -> bool {
567        true
568    }
569
570    fn become_leader(&self) -> Result<RaftRole<T>> {
571        warn!("I am leader already");
572
573        Err(StateTransitionError::InvalidTransition.into())
574    }
575
576    fn become_candidate(&self) -> Result<RaftRole<T>> {
577        error!("Leader can not become Candidate");
578
579        Err(StateTransitionError::InvalidTransition.into())
580    }
581
582    fn become_follower(&self) -> Result<RaftRole<T>> {
583        info!(
584            "Node {} term {} transitioning to Follower",
585            self.node_id(),
586            self.current_term(),
587        );
588        println!(
589            "[Node {}] Leader โ†’ Follower (term {})",
590            self.node_id(),
591            self.current_term()
592        );
593        Ok(RaftRole::Follower(Box::new(self.into())))
594    }
595
596    fn become_learner(&self) -> Result<RaftRole<T>> {
597        error!("Leader can not become Learner");
598
599        Err(StateTransitionError::InvalidTransition.into())
600    }
601
602    fn is_timer_expired(&self) -> bool {
603        self.timer.is_expired()
604    }
605
606    /// Raft starts, we will check if we need reset all timer
607    fn reset_timer(&mut self) {
608        self.timer.reset_batch();
609        self.timer.reset_replication();
610    }
611
612    fn next_deadline(&self) -> Instant {
613        self.timer.next_deadline()
614    }
615
616    /// Trigger heartbeat now
617    async fn tick(
618        &mut self,
619        role_tx: &mpsc::UnboundedSender<RoleEvent>,
620        _raft_tx: &mpsc::Sender<RaftEvent>,
621        ctx: &RaftContext<T>,
622    ) -> Result<()> {
623        let now = Instant::now();
624        // Keep syncing leader_id (hot-path: ~5ns atomic store)
625        self.shared_state().set_current_leader(self.node_id());
626
627        // 1. Clear expired learners
628        if let Err(e) = self.run_periodic_maintenance(role_tx, ctx).await {
629            error!("Failed to run periodic maintenance: {}", e);
630        }
631
632        // 3. Batch trigger check (should be prioritized before heartbeat check)
633        if now >= self.timer.batch_deadline() {
634            self.timer.reset_batch();
635
636            if self.batch_buffer.should_flush() {
637                debug!(?now, "tick::reset_batch batch timer");
638                self.timer.reset_replication();
639
640                // Take out the batched messages and send them immediately
641                // Do not move batch out of this block
642                let batch = self.batch_buffer.take();
643                self.process_batch(batch, role_tx, ctx).await?;
644            }
645        }
646
647        // 4. Heartbeat trigger check
648        // Send heartbeat if the replication timer expires
649        if now >= self.timer.replication_deadline() {
650            debug!(?now, "tick::reset_replication timer");
651            self.timer.reset_replication();
652
653            // Do not move batch out of this block
654            let batch = self.batch_buffer.take();
655            self.process_batch(batch, role_tx, ctx).await?;
656        }
657
658        Ok(())
659    }
660
661    fn drain_read_buffer(&mut self) -> Result<()> {
662        if !self.read_buffer.is_empty() {
663            warn!(
664                "Read batch: draining {} requests due to role change",
665                self.read_buffer.len()
666            );
667            for (_, sender) in std::mem::take(&mut self.read_buffer) {
668                let _ = sender.send(Err(tonic::Status::unavailable("Leader stepped down")));
669            }
670            self.read_buffer_start_time = None;
671        }
672
673        Ok(())
674    }
675
676    async fn handle_raft_event(
677        &mut self,
678        raft_event: RaftEvent,
679        ctx: &RaftContext<T>,
680        role_tx: mpsc::UnboundedSender<RoleEvent>,
681    ) -> Result<()> {
682        let my_id = self.shared_state.node_id;
683        let my_term = self.current_term();
684
685        match raft_event {
686            // Leader receives RequestVote(term=X, candidate=Y)
687            // 1. If X > currentTerm:
688            // - Leader โ†’ Follower, currentTerm = X
689            // - Replay event
690            // 2. Else:
691            // - Reply with VoteGranted=false, currentTerm=currentTerm
692            RaftEvent::ReceiveVoteRequest(vote_request, sender) => {
693                debug!(
694                    "handle_raft_event::RaftEvent::ReceiveVoteRequest: {:?}",
695                    &vote_request
696                );
697
698                let my_term = self.current_term();
699                if my_term < vote_request.term {
700                    self.update_current_term(vote_request.term);
701                    // Step down as Follower
702                    self.send_become_follower_event(None, &role_tx)?;
703
704                    info!("Leader will not process Vote request, it should let Follower do it.");
705                    send_replay_raft_event(
706                        &role_tx,
707                        RaftEvent::ReceiveVoteRequest(vote_request, sender),
708                    )?;
709                } else {
710                    let last_log_id =
711                        ctx.raft_log().last_log_id().unwrap_or(LogId { index: 0, term: 0 });
712                    let response = VoteResponse {
713                        term: my_term,
714                        vote_granted: false,
715                        last_log_index: last_log_id.index,
716                        last_log_term: last_log_id.term,
717                    };
718                    sender.send(Ok(response)).map_err(|e| {
719                        let error_str = format!("{e:?}");
720                        error!("Failed to send: {}", error_str);
721                        NetworkError::SingalSendFailed(error_str)
722                    })?;
723                }
724            }
725
726            RaftEvent::ClusterConf(_metadata_request, sender) => {
727                let cluster_conf = ctx
728                    .membership()
729                    .retrieve_cluster_membership_config(self.shared_state().current_leader())
730                    .await;
731                debug!("Leader receive ClusterConf: {:?}", &cluster_conf);
732
733                sender.send(Ok(cluster_conf)).map_err(|e| {
734                    let error_str = format!("{e:?}");
735                    error!("Failed to send: {}", error_str);
736                    NetworkError::SingalSendFailed(error_str)
737                })?;
738            }
739
740            RaftEvent::ClusterConfUpdate(cluste_conf_change_request, sender) => {
741                let current_conf_version = ctx.membership().get_cluster_conf_version().await;
742                debug!(%current_conf_version, ?cluste_conf_change_request,
743                    "handle_raft_event::RaftEvent::ClusterConfUpdate",
744                );
745
746                // Reject the fake Leader append entries request
747                if my_term >= cluste_conf_change_request.term {
748                    let response = ClusterConfUpdateResponse::higher_term(
749                        my_id,
750                        my_term,
751                        current_conf_version,
752                    );
753
754                    sender.send(Ok(response)).map_err(|e| {
755                        let error_str = format!("{e:?}");
756                        error!("Failed to send: {}", error_str);
757                        NetworkError::SingalSendFailed(error_str)
758                    })?;
759                } else {
760                    // Step down as Follower as new Leader found
761                    info!(
762                        "my({}) term < request one, now I will step down to Follower",
763                        my_id
764                    );
765                    //TODO: if there is a bug?  self.update_current_term(vote_request.term);
766                    self.send_become_follower_event(Some(cluste_conf_change_request.id), &role_tx)?;
767
768                    info!(
769                        "Leader will not process append_entries_request, it should let Follower do it."
770                    );
771                    send_replay_raft_event(
772                        &role_tx,
773                        RaftEvent::ClusterConfUpdate(cluste_conf_change_request, sender),
774                    )?;
775                }
776            }
777
778            RaftEvent::AppendEntries(append_entries_request, sender) => {
779                debug!(
780                    "handle_raft_event::RaftEvent::AppendEntries: {:?}",
781                    &append_entries_request
782                );
783
784                // Reject the fake Leader append entries request
785                if my_term >= append_entries_request.term {
786                    let response = AppendEntriesResponse::higher_term(my_id, my_term);
787
788                    sender.send(Ok(response)).map_err(|e| {
789                        let error_str = format!("{e:?}");
790                        error!("Failed to send: {}", error_str);
791                        NetworkError::SingalSendFailed(error_str)
792                    })?;
793                } else {
794                    // Step down as Follower as new Leader found
795                    info!(
796                        "my({}) term < request one, now I will step down to Follower",
797                        my_id
798                    );
799                    //TODO: if there is a bug?  self.update_current_term(vote_request.term);
800                    self.send_become_follower_event(
801                        Some(append_entries_request.leader_id),
802                        &role_tx,
803                    )?;
804
805                    info!(
806                        "Leader will not process append_entries_request, it should let Follower do it."
807                    );
808                    send_replay_raft_event(
809                        &role_tx,
810                        RaftEvent::AppendEntries(append_entries_request, sender),
811                    )?;
812                }
813            }
814
815            RaftEvent::ClientPropose(client_write_request, sender) => {
816                if let Err(e) = self
817                    .process_raft_request(
818                        RaftRequestWithSignal {
819                            id: nanoid!(),
820                            payloads: client_command_to_entry_payloads(
821                                client_write_request.commands,
822                            ),
823                            sender,
824                        },
825                        ctx,
826                        false,
827                        &role_tx,
828                    )
829                    .await
830                {
831                    error("Leader::process_raft_request", &e);
832                    return Err(e);
833                }
834            }
835
836            RaftEvent::ClientReadRequest(client_read_request, sender) => {
837                let _timer = ScopedTimer::new("leader_linear_read");
838                debug!(
839                    "Leader::ClientReadRequest client_read_request:{:?}",
840                    &client_read_request
841                );
842
843                let keys = client_read_request.keys.clone();
844                let response: std::result::Result<ClientResponse, tonic::Status> = {
845                    let read_operation =
846                        || -> std::result::Result<ClientResponse, tonic::Status> {
847                            let results = ctx
848                                .handlers
849                                .state_machine_handler
850                                .read_from_state_machine(keys)
851                                .unwrap_or_default();
852                            debug!("handle_client_read results: {:?}", results);
853                            Ok(ClientResponse::read_results(results))
854                        };
855
856                    // Determine effective consistency policy using server configuration
857                    let effective_policy = if client_read_request.has_consistency_policy() {
858                        // Client explicitly specified policy
859                        if ctx.node_config().raft.read_consistency.allow_client_override {
860                            match client_read_request.consistency_policy() {
861                                ClientReadConsistencyPolicy::LeaseRead => {
862                                    ServerReadConsistencyPolicy::LeaseRead
863                                }
864                                ClientReadConsistencyPolicy::LinearizableRead => {
865                                    ServerReadConsistencyPolicy::LinearizableRead
866                                }
867                                ClientReadConsistencyPolicy::EventualConsistency => {
868                                    ServerReadConsistencyPolicy::EventualConsistency
869                                }
870                            }
871                        } else {
872                            // Client override not allowed - use server default
873                            ctx.node_config().raft.read_consistency.default_policy.clone()
874                        }
875                    } else {
876                        // No client policy specified - use server default
877                        ctx.node_config().raft.read_consistency.default_policy.clone()
878                    };
879
880                    // Apply consistency policy
881                    match effective_policy {
882                        ServerReadConsistencyPolicy::LinearizableRead => {
883                            // 1. Add to buffer
884                            self.read_buffer.push((client_read_request, sender));
885
886                            // 2. First request: spawn timeout task
887                            if self.read_buffer.len() == 1 {
888                                self.read_buffer_start_time = Some(Instant::now());
889
890                                let role_tx = role_tx.clone(); // Use role_tx (already available in signature)
891                                let timeout_ms = self
892                                    .node_config
893                                    .raft
894                                    .read_consistency
895                                    .read_batching
896                                    .time_threshold_ms;
897
898                                debug!(
899                                    "Read batch: first request, spawning {}ms timeout",
900                                    timeout_ms
901                                );
902
903                                tokio::spawn(async move {
904                                    tokio::time::sleep(Duration::from_millis(timeout_ms)).await;
905                                    debug!(
906                                        "Read batch: timeout expired, sending flush signal via ReprocessEvent"
907                                    );
908                                    // Use RoleEvent::ReprocessEvent to re-inject FlushReadBuffer into event loop
909                                    let _ = role_tx.send(RoleEvent::ReprocessEvent(Box::new(
910                                        RaftEvent::FlushReadBuffer,
911                                    )));
912                                });
913                            }
914
915                            // 3. Size threshold: immediate flush
916                            let size_threshold =
917                                self.node_config.raft.read_consistency.read_batching.size_threshold;
918                            if self.read_buffer.len() >= size_threshold {
919                                debug!(
920                                    "Read batch: size threshold reached ({}), flushing",
921                                    size_threshold
922                                );
923                                self.process_linearizable_read_batch(ctx, &role_tx).await?;
924                            }
925                            // Early return: response will be sent via process_linearizable_read_batch
926                            return Ok(());
927                        }
928                        ServerReadConsistencyPolicy::LeaseRead => {
929                            // Lease-based read: verify only if lease expired
930                            if !self.is_lease_valid(ctx) {
931                                // Lease expired, refresh with quorum verification
932                                self.verify_leadership_and_refresh_lease(ctx, &role_tx).await?;
933                            }
934                            // Lease is valid, serve read locally
935                            read_operation()
936                        }
937                        ServerReadConsistencyPolicy::EventualConsistency => {
938                            // Eventual consistency: serve immediately without verification
939                            // Leader can always serve eventually consistent reads
940                            debug!("EventualConsistency: serving local read without verification");
941                            read_operation()
942                        }
943                    }
944                };
945
946                debug!(
947                    "Leader::ClientReadRequest is going to response: {:?}",
948                    &response
949                );
950                sender.send(response).map_err(|e| {
951                    let error_str = format!("{e:?}");
952                    error!("Failed to send: {}", error_str);
953                    NetworkError::SingalSendFailed(error_str)
954                })?;
955            }
956
957            RaftEvent::FlushReadBuffer => {
958                if !self.read_buffer.is_empty() {
959                    debug!(
960                        "Read batch: flushing {} pending reads (timeout)",
961                        self.read_buffer.len()
962                    );
963                    self.process_linearizable_read_batch(ctx, &role_tx).await?;
964                } else {
965                    debug!("Read batch: flush signal received but buffer empty (already flushed)");
966                }
967            }
968
969            RaftEvent::InstallSnapshotChunk(_streaming, sender) => {
970                sender
971                    .send(Err(Status::permission_denied("Not Follower or Learner. ")))
972                    .map_err(|e| {
973                        let error_str = format!("{e:?}");
974                        error!("Failed to send: {}", error_str);
975                        NetworkError::SingalSendFailed(error_str)
976                    })?;
977
978                return Err(ConsensusError::RoleViolation {
979                    current_role: "Leader",
980                    required_role: "Follower or Learner",
981                    context: format!(
982                        "Leader node {} receives RaftEvent::InstallSnapshotChunk",
983                        ctx.node_id
984                    ),
985                }
986                .into());
987            }
988
989            RaftEvent::RaftLogCleanUp(_purchase_log_request, sender) => {
990                sender
991                    .send(Err(Status::permission_denied(
992                        "Leader should not receive RaftLogCleanUp event.",
993                    )))
994                    .map_err(|e| {
995                        let error_str = format!("{e:?}");
996                        error!("Failed to send: {}", error_str);
997                        NetworkError::SingalSendFailed(error_str)
998                    })?;
999
1000                return Err(ConsensusError::RoleViolation {
1001                    current_role: "Leader",
1002                    required_role: "None Leader",
1003                    context: format!(
1004                        "Leader node {} receives RaftEvent::RaftLogCleanUp",
1005                        ctx.node_id
1006                    ),
1007                }
1008                .into());
1009            }
1010
1011            RaftEvent::CreateSnapshotEvent => {
1012                // Prevent duplicate snapshot creation
1013                if self.snapshot_in_progress.load(std::sync::atomic::Ordering::Acquire) {
1014                    info!("Snapshot creation already in progress. Skipping duplicate request.");
1015                    return Ok(());
1016                }
1017
1018                self.snapshot_in_progress.store(true, std::sync::atomic::Ordering::Release);
1019                let state_machine_handler = ctx.state_machine_handler().clone();
1020
1021                // Use spawn to perform snapshot creation in the background
1022                tokio::spawn(async move {
1023                    let result = state_machine_handler.create_snapshot().await;
1024                    info!("SnapshotCreated event will be processed in another event thread");
1025                    if let Err(e) =
1026                        send_replay_raft_event(&role_tx, RaftEvent::SnapshotCreated(result))
1027                    {
1028                        error!("Failed to send snapshot creation result: {}", e);
1029                    }
1030                });
1031            }
1032
1033            RaftEvent::SnapshotCreated(result) => {
1034                self.snapshot_in_progress.store(false, Ordering::SeqCst);
1035                let my_id = self.shared_state.node_id;
1036                let my_term = self.current_term();
1037
1038                match result {
1039                    Err(e) => {
1040                        error!(%e, "State machine snapshot creation failed");
1041                    }
1042                    Ok((
1043                        SnapshotMetadata {
1044                            last_included: last_included_option,
1045                            checksum,
1046                        },
1047                        _final_path,
1048                    )) => {
1049                        info!("Initiating log purge after snapshot creation");
1050
1051                        if let Some(last_included) = last_included_option {
1052                            // ----------------------
1053                            // Phase 1: Schedule log purge if possible
1054                            // ----------------------
1055                            trace!("Phase 1: Schedule log purge if possible");
1056                            if self.can_purge_logs(self.last_purged_index, last_included) {
1057                                trace!(?last_included, "Phase 1: Scheduling log purge");
1058                                self.scheduled_purge_upto(last_included);
1059                            }
1060
1061                            // ----------------------
1062                            // Phase 2: Send Purge request (multi-node only)
1063                            // ----------------------
1064                            // Check replication_targets to support:
1065                            // 1. True single-node (no peers) - skip this phase
1066                            // 2. Single-voter + Learner - send to Learner
1067                            // 3. Multi-voter cluster - send to all peers
1068                            if !self.cluster_metadata.replication_targets.is_empty() {
1069                                trace!("Phase 2: Send Purge request to replication targets");
1070
1071                                // Defensive check: verify cluster_metadata is initialized
1072                                // This should never happen in production (init_cluster_metadata is called
1073                                // automatically after leader election), but if it does, it indicates a
1074                                // critical bug in state management
1075                                if self.cluster_metadata.total_voters == 0 {
1076                                    error!(
1077                                        "BUG: cluster_metadata not initialized! Leader must call init_cluster_metadata() after election"
1078                                    );
1079                                    return Err(
1080                                        MembershipError::ClusterMetadataNotInitialized.into()
1081                                    );
1082                                }
1083
1084                                let membership = ctx.membership();
1085                                let transport = ctx.transport();
1086                                match transport
1087                                    .send_purge_requests(
1088                                        PurgeLogRequest {
1089                                            term: my_term,
1090                                            leader_id: my_id,
1091                                            last_included: Some(last_included),
1092                                            snapshot_checksum: checksum.clone(),
1093                                            leader_commit: self.commit_index(),
1094                                        },
1095                                        &self.node_config.retry,
1096                                        membership,
1097                                    )
1098                                    .await
1099                                {
1100                                    Ok(result) => {
1101                                        info!(?result, "receive PurgeLogResult");
1102                                        // Process peer purge responses and check for higher term
1103                                        // If a peer reports a higher term, this leader must step down
1104                                        // immediately and skip local purge execution (Phase 3)
1105                                        let should_step_down =
1106                                            self.peer_purge_progress(result, &role_tx)?;
1107                                        if should_step_down {
1108                                            return Ok(()); // Early return, skip Phase 3
1109                                        }
1110                                    }
1111                                    Err(e) => {
1112                                        error!(?e, "RaftEvent::CreateSnapshotEvent");
1113                                        return Err(e);
1114                                    }
1115                                }
1116                            } else {
1117                                debug!(
1118                                    "Standalone node (leader={}): no replication targets, skip PurgeRequest",
1119                                    my_id
1120                                );
1121                            }
1122
1123                            // ----------------------
1124                            // Phase 3: Execute local purge (all nodes)
1125                            // ----------------------
1126                            trace!("Phase 3: Execute scheduled purge task");
1127                            debug!(?last_included, "Execute scheduled purge task");
1128                            if let Some(scheduled) = self.scheduled_purge_upto {
1129                                let purge_executor = ctx.purge_executor();
1130                                match purge_executor.execute_purge(scheduled).await {
1131                                    Ok(_) => {
1132                                        if let Err(e) = send_replay_raft_event(
1133                                            &role_tx,
1134                                            RaftEvent::LogPurgeCompleted(scheduled),
1135                                        ) {
1136                                            error!(%e, "Failed to notify purge completion");
1137                                        }
1138                                    }
1139                                    Err(e) => {
1140                                        error!(?e, ?scheduled, "Log purge execution failed");
1141                                    }
1142                                }
1143                            }
1144                        }
1145                    }
1146                }
1147            }
1148
1149            RaftEvent::LogPurgeCompleted(purged_id) => {
1150                // Ensure we don't regress the purge index
1151                if self.last_purged_index.map_or(true, |current| purged_id.index > current.index) {
1152                    debug!(
1153                        ?purged_id,
1154                        "Updating last purged index after successful execution"
1155                    );
1156                    self.last_purged_index = Some(purged_id);
1157                } else {
1158                    warn!(
1159                        ?purged_id,
1160                        ?self.last_purged_index,
1161                        "Received outdated purge completion, ignoring"
1162                    );
1163                }
1164            }
1165
1166            RaftEvent::JoinCluster(join_request, sender) => {
1167                debug!(?join_request, "Leader::RaftEvent::JoinCluster");
1168                self.handle_join_cluster(join_request, sender, ctx, &role_tx).await?;
1169            }
1170
1171            RaftEvent::DiscoverLeader(request, sender) => {
1172                debug!(?request, "Leader::RaftEvent::DiscoverLeader");
1173
1174                if let Some(meta) = ctx.membership().retrieve_node_meta(my_id).await {
1175                    let response = LeaderDiscoveryResponse {
1176                        leader_id: my_id,
1177                        leader_address: meta.address,
1178                        term: my_term,
1179                    };
1180                    sender.send(Ok(response)).map_err(|e| {
1181                        let error_str = format!("{e:?}");
1182                        error!("Failed to send: {}", error_str);
1183                        NetworkError::SingalSendFailed(error_str)
1184                    })?;
1185                    return Ok(());
1186                } else {
1187                    let msg = "Leader can not find its address? It must be a bug.";
1188                    error!("{}", msg);
1189                    panic!("{}", msg);
1190                }
1191            }
1192            RaftEvent::StreamSnapshot(request, sender) => {
1193                debug!("Leader::RaftEvent::StreamSnapshot");
1194
1195                // Get the latest snapshot metadata
1196                if let Some(metadata) = ctx.state_machine().snapshot_metadata() {
1197                    // Create response channel
1198                    let (response_tx, response_rx) =
1199                        mpsc::channel::<std::result::Result<Arc<SnapshotChunk>, Status>>(32);
1200                    // Convert to properly encoded tonic stream
1201                    let size = 1024 * 1024 * 1024; // 1GB max message size
1202                    let response_stream = create_production_snapshot_stream(response_rx, size);
1203                    // Immediately respond with the stream
1204                    sender.send(Ok(response_stream)).map_err(|e| {
1205                        let error_str = format!("{e:?}");
1206                        error!("Stream response failed: {}", error_str);
1207                        NetworkError::SingalSendFailed(error_str)
1208                    })?;
1209
1210                    // Spawn background transfer task
1211                    let state_machine_handler = ctx.state_machine_handler().clone();
1212                    let config = ctx.node_config.raft.snapshot.clone();
1213                    // Load snapshot data stream
1214                    let data_stream =
1215                        state_machine_handler.load_snapshot_data(metadata.clone()).await?;
1216
1217                    tokio::spawn(async move {
1218                        if let Err(e) = BackgroundSnapshotTransfer::<T>::run_pull_transfer(
1219                            request,
1220                            response_tx,
1221                            data_stream,
1222                            config,
1223                        )
1224                        .await
1225                        {
1226                            error!("StreamSnapshot failed: {:?}", e);
1227                        }
1228                    });
1229                } else {
1230                    warn!("No snapshot available for streaming");
1231                    sender.send(Err(Status::not_found("Snapshot not found"))).map_err(|e| {
1232                        let error_str = format!("{e:?}");
1233                        error!("Stream response failed: {}", error_str);
1234                        NetworkError::SingalSendFailed(error_str)
1235                    })?;
1236                }
1237            }
1238            RaftEvent::TriggerSnapshotPush { peer_id } => {
1239                if let Some(lastest_snapshot_metadata) = ctx.state_machine().snapshot_metadata() {
1240                    Self::trigger_background_snapshot(
1241                        peer_id,
1242                        lastest_snapshot_metadata,
1243                        ctx.state_machine_handler().clone(),
1244                        ctx.membership(),
1245                        ctx.node_config.raft.snapshot.clone(),
1246                    )
1247                    .await?;
1248                }
1249            }
1250
1251            RaftEvent::PromoteReadyLearners => {
1252                // SAFETY: Called from main event loop, no reentrancy issues
1253                info!(
1254                    "[Leader {}] โšก PromoteReadyLearners event received, pending_promotions: {:?}",
1255                    self.node_id(),
1256                    self.pending_promotions.iter().map(|p| p.node_id).collect::<Vec<_>>()
1257                );
1258                self.process_pending_promotions(ctx, &role_tx).await?;
1259            }
1260
1261            RaftEvent::MembershipApplied => {
1262                // Save old membership for comparison
1263                let old_replication_targets = self.cluster_metadata.replication_targets.clone();
1264
1265                // Refresh cluster metadata cache after membership change is applied
1266                debug!("Refreshing cluster metadata cache after membership change");
1267                if let Err(e) = self.update_cluster_metadata(&ctx.membership()).await {
1268                    warn!("Failed to update cluster metadata: {:?}", e);
1269                }
1270
1271                // CRITICAL FIX #218: Initialize replication state for newly added peers
1272                // Per Raft protocol: When new members join, Leader must initialize their next_index
1273                let newly_added: Vec<u32> = self
1274                    .cluster_metadata
1275                    .replication_targets
1276                    .iter()
1277                    .filter(|new_peer| {
1278                        !old_replication_targets.iter().any(|old_peer| old_peer.id == new_peer.id)
1279                    })
1280                    .map(|peer| peer.id)
1281                    .collect();
1282
1283                if !newly_added.is_empty() {
1284                    debug!(
1285                        "Initializing replication state for {} new peer(s): {:?}",
1286                        newly_added.len(),
1287                        newly_added
1288                    );
1289                    let last_entry_id = ctx.raft_log().last_entry_id();
1290                    if let Err(e) =
1291                        self.init_peers_next_index_and_match_index(last_entry_id, newly_added)
1292                    {
1293                        warn!("Failed to initialize next_index for new peers: {:?}", e);
1294                        // Non-fatal: next_index will use default value of 1,
1295                        // replication will still work but may be less efficient
1296                    }
1297                }
1298            }
1299
1300            RaftEvent::StepDownSelfRemoved => {
1301                // Only Leader can propose configuration changes and remove itself
1302                // Per Raft protocol: Leader steps down immediately after self-removal
1303                warn!(
1304                    "[Leader-{}] Removed from cluster membership, stepping down to Follower",
1305                    self.node_id()
1306                );
1307                role_tx.send(RoleEvent::BecomeFollower(None)).map_err(|e| {
1308                    error!(
1309                        "[Leader-{}] Failed to send BecomeFollower after self-removal: {:?}",
1310                        self.node_id(),
1311                        e
1312                    );
1313                    NetworkError::SingalSendFailed(format!(
1314                        "BecomeFollower after self-removal: {e:?}"
1315                    ))
1316                })?;
1317                return Ok(());
1318            }
1319        }
1320
1321        Ok(())
1322    }
1323}
1324
1325impl<T: TypeConfig> LeaderState<T> {
1326    /// Initialize cluster metadata after becoming leader.
1327    /// Update cluster metadata when membership changes.
1328    pub async fn update_cluster_metadata(
1329        &mut self,
1330        membership: &Arc<T::M>,
1331    ) -> Result<()> {
1332        // Calculate total voter count including self as leader
1333        let voters = membership.voters().await;
1334        let total_voters = voters.len() + 1; // +1 for leader (self)
1335
1336        // Get all replication targets (voters + learners, excluding self)
1337        let replication_targets = membership.replication_peers().await;
1338
1339        // Single-voter cluster: only this node is a voter (quorum = 1)
1340        let single_voter = total_voters == 1;
1341
1342        self.cluster_metadata = ClusterMetadata {
1343            single_voter,
1344            total_voters,
1345            replication_targets: replication_targets.clone(),
1346        };
1347
1348        debug!(
1349            "Updated cluster metadata: single_voter={}, total_voters={}, replication_targets={}",
1350            single_voter,
1351            total_voters,
1352            replication_targets.len()
1353        );
1354        Ok(())
1355    }
1356
1357    /// The fun will retrieve current state snapshot
1358    pub fn state_snapshot(&self) -> StateSnapshot {
1359        StateSnapshot {
1360            current_term: self.current_term(),
1361            voted_for: None,
1362            commit_index: self.commit_index(),
1363            role: Leader as i32,
1364        }
1365    }
1366
1367    /// The fun will retrieve current Leader state snapshot
1368    #[tracing::instrument]
1369    pub fn leader_state_snapshot(&self) -> LeaderStateSnapshot {
1370        LeaderStateSnapshot {
1371            next_index: self.next_index.clone(),
1372            match_index: self.match_index.clone(),
1373            noop_log_id: self.noop_log_id,
1374        }
1375    }
1376
1377    /// # Params
1378    /// - `execute_now`: should this propose been executed immediatelly. e.g.
1379    ///   enforce_quorum_consensus expected to be executed immediatelly
1380    pub async fn process_raft_request(
1381        &mut self,
1382        raft_request_with_signal: RaftRequestWithSignal,
1383        ctx: &RaftContext<T>,
1384        execute_now: bool,
1385        role_tx: &mpsc::UnboundedSender<RoleEvent>,
1386    ) -> Result<()> {
1387        debug!(
1388            "Leader::process_raft_request, request_id: {}",
1389            raft_request_with_signal.id
1390        );
1391
1392        let push_result = self.batch_buffer.push(raft_request_with_signal);
1393        // only buffer exceeds the max, the size will return
1394        if execute_now || push_result.is_some() {
1395            let batch = self.batch_buffer.take();
1396
1397            trace!(
1398                "replication_handler.handle_raft_request_in_batch: batch size:{:?}",
1399                batch.len()
1400            );
1401
1402            self.process_batch(batch, role_tx, ctx).await?;
1403        }
1404
1405        Ok(())
1406    }
1407
1408    /// Scenario handling summary:
1409    ///
1410    /// 1. Quorum achieved:
1411    ///    - Client response: `write_success()`
1412    ///    - State update: update peer indexes and commit index
1413    ///    - Return: `Ok(())`
1414    ///
1415    /// 2. Quorum NOT achieved (verifiable):
1416    ///    - Client response: `RetryRequired`
1417    ///    - State update: update peer indexes
1418    ///    - Return: `Ok(())`
1419    ///
1420    /// 3. Quorum NOT achieved (non-verifiable):
1421    ///    - Client response: `ProposeFailed`
1422    ///    - State update: update peer indexes
1423    ///    - Return: `Ok(())`
1424    ///
1425    /// 4. Partial timeouts:
1426    ///    - Client response: `ProposeFailed`
1427    ///    - State update: update only the peer indexes that responded
1428    ///    - Return: `Ok(())`
1429    ///
1430    /// 5. All timeouts:
1431    ///    - Client response: `ProposeFailed`
1432    ///    - State update: no update
1433    ///    - Return: `Ok(())`
1434    ///
1435    /// 6. Higher term detected:
1436    ///    - Client response: `TermOutdated`
1437    ///    - State update: update term and convert to follower
1438    ///    - Return: `Err(HigherTerm)`
1439    ///
1440    /// 7. Critical failure (e.g., system or logic error):
1441    ///    - Client response: `ProposeFailed`
1442    ///    - State update: none
1443    ///    - Return: original error
1444    pub async fn process_batch(
1445        &mut self,
1446        batch: VecDeque<RaftRequestWithSignal>,
1447        role_tx: &mpsc::UnboundedSender<RoleEvent>,
1448        ctx: &RaftContext<T>,
1449    ) -> Result<()> {
1450        // 1. Prepare batch data
1451        let entry_payloads: Vec<EntryPayload> =
1452            batch.iter().flat_map(|req| &req.payloads).cloned().collect();
1453        if !entry_payloads.is_empty() {
1454            trace!(?entry_payloads, "[Node-{} process_batch..", ctx.node_id);
1455        }
1456
1457        // 2. Execute the copy
1458        let cluster_size = self.cluster_metadata.total_voters;
1459        trace!(%cluster_size);
1460
1461        let result = ctx
1462            .replication_handler()
1463            .handle_raft_request_in_batch(
1464                entry_payloads,
1465                self.state_snapshot(),
1466                self.leader_state_snapshot(),
1467                &self.cluster_metadata,
1468                ctx,
1469            )
1470            .await;
1471        debug!(?result, "replication_handler::handle_raft_request_in_batch");
1472
1473        // 3. Unify the processing results
1474        match result {
1475            // Case 1: Successfully reached majority
1476            Ok(AppendResults {
1477                commit_quorum_achieved: true,
1478                peer_updates,
1479                learner_progress,
1480            }) => {
1481                // 1. Update all peer index
1482                self.update_peer_indexes(&peer_updates);
1483
1484                // 2. Check Learner's catch-up status
1485                if let Err(e) = self.check_learner_progress(&learner_progress, ctx, role_tx).await {
1486                    error!(?e, "check_learner_progress failed");
1487                };
1488
1489                // 3. Update commit index
1490                // Single-voter cluster: commit index = last log index (quorum of 1)
1491                // Multi-voter cluster: calculate commit index based on majority quorum
1492                let new_commit_index = if self.cluster_metadata.single_voter {
1493                    let last_log_index = ctx.raft_log().last_entry_id();
1494                    if last_log_index > self.commit_index() {
1495                        Some(last_log_index)
1496                    } else {
1497                        None
1498                    }
1499                } else {
1500                    self.calculate_new_commit_index(ctx.raft_log(), &peer_updates)
1501                };
1502
1503                if let Some(new_commit_index) = new_commit_index {
1504                    debug!(
1505                        "[Leader-{}] New commit been acknowledged: {}",
1506                        self.node_id(),
1507                        new_commit_index
1508                    );
1509                    self.update_commit_index_with_signal(
1510                        Leader as i32,
1511                        self.current_term(),
1512                        new_commit_index,
1513                        role_tx,
1514                    )?;
1515                }
1516
1517                // 4. Notify all clients of success
1518                for request in batch {
1519                    let _ = request.sender.send(Ok(ClientResponse::write_success()));
1520                }
1521            }
1522
1523            // Case 2: Failed to reach majority
1524            Ok(AppendResults {
1525                commit_quorum_achieved: false,
1526                peer_updates,
1527                learner_progress,
1528            }) => {
1529                // 1. Update all peer index
1530                self.update_peer_indexes(&peer_updates);
1531
1532                // 2. Check Learner's catch-up status
1533                if let Err(e) = self.check_learner_progress(&learner_progress, ctx, role_tx).await {
1534                    error!(?e, "check_learner_progress failed");
1535                };
1536
1537                // 3. Determine error code based on verifiability
1538                let responses_received = peer_updates.len();
1539                let error_code = if is_majority(responses_received, cluster_size) {
1540                    ErrorCode::RetryRequired
1541                } else {
1542                    ErrorCode::ProposeFailed
1543                };
1544
1545                // 4. Notify all clients of failure
1546                for request in batch {
1547                    let _ = request.sender.send(Ok(ClientResponse::client_error(error_code)));
1548                }
1549            }
1550
1551            // Case 3: High term found
1552            Err(Error::Consensus(ConsensusError::Replication(ReplicationError::HigherTerm(
1553                higher_term,
1554            )))) => {
1555                warn!("Higher term detected: {}", higher_term);
1556                self.update_current_term(higher_term);
1557                self.send_become_follower_event(None, role_tx)?;
1558
1559                // Notify client of term expiration
1560                for request in batch {
1561                    let _ = request
1562                        .sender
1563                        .send(Ok(ClientResponse::client_error(ErrorCode::TermOutdated)));
1564                }
1565
1566                return Err(ReplicationError::HigherTerm(higher_term).into());
1567            }
1568
1569            // Case 4: Other errors
1570            Err(e) => {
1571                error!("Batch processing failed: {:?}", e);
1572
1573                // Notify all clients of failure
1574                for request in batch {
1575                    let _ = request
1576                        .sender
1577                        .send(Ok(ClientResponse::client_error(ErrorCode::ProposeFailed)));
1578                }
1579
1580                return Err(e);
1581            }
1582        }
1583
1584        Ok(())
1585    }
1586
1587    /// Update peer node index
1588    #[instrument(skip(self))]
1589    fn update_peer_indexes(
1590        &mut self,
1591        peer_updates: &HashMap<u32, PeerUpdate>,
1592    ) {
1593        for (peer_id, update) in peer_updates {
1594            if let Err(e) = self.update_next_index(*peer_id, update.next_index) {
1595                error!("Failed to update next index: {:?}", e);
1596            }
1597            trace!(
1598                "Updated next index for peer {}-{}",
1599                peer_id, update.next_index
1600            );
1601            if let Some(match_index) = update.match_index {
1602                if let Err(e) = self.update_match_index(*peer_id, match_index) {
1603                    error!("Failed to update match index: {:?}", e);
1604                }
1605                trace!("Updated match index for peer {}-{}", peer_id, match_index);
1606            }
1607        }
1608    }
1609
1610    pub async fn check_learner_progress(
1611        &mut self,
1612        learner_progress: &HashMap<u32, Option<u64>>,
1613        ctx: &RaftContext<T>,
1614        role_tx: &mpsc::UnboundedSender<RoleEvent>,
1615    ) -> Result<()> {
1616        debug!(?learner_progress, "check_learner_progress");
1617
1618        if !self.should_check_learner_progress(ctx) {
1619            return Ok(());
1620        }
1621
1622        if learner_progress.is_empty() {
1623            return Ok(());
1624        }
1625
1626        let ready_learners = self.find_promotable_learners(learner_progress, ctx).await;
1627        let new_promotions = self.deduplicate_promotions(ready_learners);
1628
1629        if !new_promotions.is_empty() {
1630            self.enqueue_and_notify_promotions(new_promotions, role_tx)?;
1631        }
1632
1633        Ok(())
1634    }
1635
1636    /// Check if enough time has elapsed since last learner progress check
1637    fn should_check_learner_progress(
1638        &mut self,
1639        ctx: &RaftContext<T>,
1640    ) -> bool {
1641        let throttle_interval =
1642            Duration::from_millis(ctx.node_config().raft.learner_check_throttle_ms);
1643        if self.last_learner_check.elapsed() < throttle_interval {
1644            return false;
1645        }
1646        self.last_learner_check = Instant::now();
1647        true
1648    }
1649
1650    /// Find learners that are caught up and eligible for promotion
1651    async fn find_promotable_learners(
1652        &self,
1653        learner_progress: &HashMap<u32, Option<u64>>,
1654        ctx: &RaftContext<T>,
1655    ) -> Vec<u32> {
1656        let leader_commit = self.commit_index();
1657        let threshold = ctx.node_config().raft.learner_catchup_threshold;
1658        let membership = ctx.membership();
1659
1660        let mut ready_learners = Vec::new();
1661
1662        for (&node_id, &match_index_opt) in learner_progress.iter() {
1663            if !membership.contains_node(node_id).await {
1664                continue;
1665            }
1666
1667            if !self.is_learner_caught_up(match_index_opt, leader_commit, threshold) {
1668                continue;
1669            }
1670
1671            let node_status =
1672                membership.get_node_status(node_id).await.unwrap_or(NodeStatus::ReadOnly);
1673            if !node_status.is_promotable() {
1674                debug!(
1675                    ?node_id,
1676                    ?node_status,
1677                    "Learner caught up but status is not Promotable, skipping"
1678                );
1679                continue;
1680            }
1681
1682            debug!(
1683                ?node_id,
1684                match_index = ?match_index_opt.unwrap_or(0),
1685                ?leader_commit,
1686                gap = leader_commit.saturating_sub(match_index_opt.unwrap_or(0)),
1687                "Learner caught up"
1688            );
1689            ready_learners.push(node_id);
1690        }
1691
1692        ready_learners
1693    }
1694
1695    /// Check if learner has caught up with leader based on log gap
1696    fn is_learner_caught_up(
1697        &self,
1698        match_index: Option<u64>,
1699        leader_commit: u64,
1700        threshold: u64,
1701    ) -> bool {
1702        let match_index = match_index.unwrap_or(0);
1703        let gap = leader_commit.saturating_sub(match_index);
1704        gap <= threshold
1705    }
1706
1707    /// Remove learners already in pending promotions queue
1708    fn deduplicate_promotions(
1709        &self,
1710        ready_learners: Vec<u32>,
1711    ) -> Vec<u32> {
1712        let already_pending: std::collections::HashSet<_> =
1713            self.pending_promotions.iter().map(|p| p.node_id).collect();
1714
1715        ready_learners.into_iter().filter(|id| !already_pending.contains(id)).collect()
1716    }
1717
1718    /// Add promotions to queue and send notification event
1719    fn enqueue_and_notify_promotions(
1720        &mut self,
1721        new_promotions: Vec<u32>,
1722        role_tx: &mpsc::UnboundedSender<RoleEvent>,
1723    ) -> Result<()> {
1724        info!(
1725            ?new_promotions,
1726            "Learners caught up, adding to pending promotions"
1727        );
1728
1729        for node_id in new_promotions {
1730            self.pending_promotions
1731                .push_back(PendingPromotion::new(node_id, Instant::now()));
1732        }
1733
1734        role_tx
1735            .send(RoleEvent::ReprocessEvent(Box::new(
1736                RaftEvent::PromoteReadyLearners,
1737            )))
1738            .map_err(|e| {
1739                let error_str = format!("{e:?}");
1740                error!("Failed to send PromoteReadyLearners: {}", error_str);
1741                Error::System(SystemError::Network(NetworkError::SingalSendFailed(
1742                    error_str,
1743                )))
1744            })?;
1745
1746        Ok(())
1747    }
1748
1749    #[allow(dead_code)]
1750    pub async fn batch_promote_learners(
1751        &mut self,
1752        ready_learners_ids: Vec<u32>,
1753        ctx: &RaftContext<T>,
1754        role_tx: &mpsc::UnboundedSender<RoleEvent>,
1755    ) -> Result<()> {
1756        // 1. Determine optimal promotion status based on quorum safety
1757        debug!("1. Determine optimal promotion status based on quorum safety");
1758        let membership = ctx.membership();
1759        let current_voters = membership.voters().await.len();
1760        // let syncings = membership.nodes_with_status(NodeStatus::Syncing).len();
1761        let new_active_count = current_voters + ready_learners_ids.len();
1762
1763        // Determine target status based on quorum safety
1764        trace!(
1765            ?current_voters,
1766            ?ready_learners_ids,
1767            "[Node-{}] new_active_count: {}",
1768            self.node_id(),
1769            new_active_count
1770        );
1771        let target_status = if ensure_safe_join(self.node_id(), new_active_count).is_ok() {
1772            trace!(
1773                "Going to update nodes-{:?} status to Active",
1774                ready_learners_ids
1775            );
1776            NodeStatus::Active
1777        } else {
1778            trace!(
1779                "Not enough quorum to promote learners: {:?}",
1780                ready_learners_ids
1781            );
1782            return Ok(());
1783        };
1784
1785        // 2. Create configuration change payload
1786        debug!("2. Create configuration change payload");
1787        let config_change = Change::BatchPromote(BatchPromote {
1788            node_ids: ready_learners_ids.clone(),
1789            new_status: target_status as i32,
1790        });
1791
1792        info!(?config_change, "Replicating cluster config");
1793
1794        // 3. Submit single config change for all ready learners
1795        debug!("3. Submit single config change for all ready learners");
1796        // Use verify_leadership_persistent for config changes (must eventually succeed)
1797        match self
1798            .verify_leadership_persistent(
1799                vec![EntryPayload::config(config_change)],
1800                true,
1801                ctx,
1802                role_tx,
1803            )
1804            .await
1805        {
1806            Ok(true) => {
1807                info!(
1808                    "Batch promotion committed for nodes: {:?}",
1809                    ready_learners_ids
1810                );
1811            }
1812            Ok(false) => {
1813                warn!("Failed to commit batch promotion");
1814            }
1815            Err(e) => {
1816                error!("Batch promotion error: {:?}", e);
1817                return Err(e);
1818            }
1819        }
1820
1821        Ok(())
1822    }
1823
1824    /// Calculate new submission index
1825    #[instrument(skip(self))]
1826    fn calculate_new_commit_index(
1827        &mut self,
1828        raft_log: &Arc<ROF<T>>,
1829        peer_updates: &HashMap<u32, PeerUpdate>,
1830    ) -> Option<u64> {
1831        let old_commit_index = self.commit_index();
1832        let current_term = self.current_term();
1833
1834        let matched_ids: Vec<u64> =
1835            peer_updates.keys().filter_map(|&id| self.match_index(id)).collect();
1836
1837        let new_commit_index =
1838            raft_log.calculate_majority_matched_index(current_term, old_commit_index, matched_ids);
1839
1840        if new_commit_index.is_some() && new_commit_index.unwrap() > old_commit_index {
1841            new_commit_index
1842        } else {
1843            None
1844        }
1845    }
1846
1847    #[allow(dead_code)]
1848    fn if_update_commit_index(
1849        &self,
1850        new_commit_index_option: Option<u64>,
1851    ) -> (bool, u64) {
1852        let current_commit_index = self.commit_index();
1853        if let Some(new_commit_index) = new_commit_index_option {
1854            debug!("Leader::update_commit_index: {:?}", new_commit_index);
1855            if current_commit_index < new_commit_index {
1856                return (true, new_commit_index);
1857            }
1858        }
1859        debug!("Leader::update_commit_index: false");
1860        (false, current_commit_index)
1861    }
1862
1863    /// Calculate safe read index for linearizable reads.
1864    ///
1865    /// Returns max(commitIndex, noopIndex) to ensure:
1866    /// 1. All committed data is visible
1867    /// 2. Current term's no-op entry is accounted for
1868    /// 3. Read index is fixed at request arrival time (avoids waiting for concurrent writes)
1869    ///
1870    /// This optimization prevents reads from waiting for writes that arrived
1871    /// after the read request started processing.
1872    #[doc(hidden)]
1873    pub fn calculate_read_index(&self) -> u64 {
1874        let commit_index = self.commit_index();
1875        let noop_index = self.noop_log_id.unwrap_or(0);
1876        std::cmp::max(commit_index, noop_index)
1877    }
1878
1879    /// Wait for state machine to apply up to target index.
1880    ///
1881    /// This is used by linearizable reads to ensure they see all committed data
1882    /// up to the read_index calculated at request arrival time.
1883    #[doc(hidden)]
1884    pub async fn wait_until_applied(
1885        &self,
1886        target_index: u64,
1887        state_machine_handler: &Arc<SMHOF<T>>,
1888        last_applied: u64,
1889    ) -> Result<()> {
1890        if last_applied < target_index {
1891            state_machine_handler.update_pending(target_index);
1892
1893            let timeout_ms = self.node_config.raft.read_consistency.state_machine_sync_timeout_ms;
1894            state_machine_handler
1895                .wait_applied(target_index, std::time::Duration::from_millis(timeout_ms))
1896                .await?;
1897
1898            debug!("wait_until_applied: target_index={} success", target_index);
1899        }
1900        Ok(())
1901    }
1902
1903    #[instrument(skip(self))]
1904    fn scheduled_purge_upto(
1905        &mut self,
1906        received_last_included: LogId,
1907    ) {
1908        if let Some(existing) = self.scheduled_purge_upto {
1909            if existing.index >= received_last_included.index {
1910                warn!(
1911                    ?received_last_included,
1912                    ?existing,
1913                    "Will not update scheduled_purge_upto, received invalid last_included log"
1914                );
1915                return;
1916            }
1917        }
1918        info!(?self.scheduled_purge_upto, ?received_last_included, "Updte scheduled_purge_upto.");
1919        self.scheduled_purge_upto = Some(received_last_included);
1920    }
1921
1922    /// Process peer purge responses and handle higher term detection.
1923    ///
1924    /// # Returns
1925    /// - `Ok(true)` if a higher term was detected and leader should step down
1926    /// - `Ok(false)` if processing completed normally
1927    ///
1928    /// # Higher Term Handling
1929    /// According to Raft ยง5.1: "If a server receives a request with a stale term number,
1930    /// it rejects the request." Conversely, if we receive a response with a higher term,
1931    /// we must immediately:
1932    /// 1. Update our term to the higher value
1933    /// 2. Step down to Follower role
1934    /// 3. Signal caller to abort ongoing operations (e.g., skip Phase 3 local purge)
1935    ///
1936    /// # Safety
1937    /// This function sends `BecomeFollower` event which triggers role transition.
1938    /// Caller MUST check return value and skip subsequent leader-only operations.
1939    pub(crate) fn peer_purge_progress(
1940        &mut self,
1941        responses: Vec<Result<PurgeLogResponse>>,
1942        role_tx: &mpsc::UnboundedSender<RoleEvent>,
1943    ) -> Result<bool> {
1944        if responses.is_empty() {
1945            return Ok(false);
1946        }
1947        for r in responses.iter().flatten() {
1948            // Higher term detection: peer has advanced to a newer term
1949            // This violates our leadership validity (Raft ยง5.1)
1950            if r.term > self.current_term() {
1951                self.update_current_term(r.term);
1952                self.send_become_follower_event(None, role_tx)?;
1953                return Ok(true); // Signal step-down, caller must abort leader operations
1954            }
1955
1956            if let Some(last_purged) = r.last_purged {
1957                self.peer_purge_progress
1958                    .entry(r.node_id)
1959                    .and_modify(|v| *v = last_purged.index)
1960                    .or_insert(last_purged.index);
1961            }
1962        }
1963
1964        Ok(false)
1965    }
1966
1967    fn send_become_follower_event(
1968        &self,
1969        new_leader_id: Option<u32>,
1970        role_tx: &mpsc::UnboundedSender<RoleEvent>,
1971    ) -> Result<()> {
1972        info!(
1973            ?new_leader_id,
1974            "Leader is going to step down as Follower..."
1975        );
1976        role_tx.send(RoleEvent::BecomeFollower(new_leader_id)).map_err(|e| {
1977            let error_str = format!("{e:?}");
1978            error!("Failed to send: {}", error_str);
1979            NetworkError::SingalSendFailed(error_str)
1980        })?;
1981
1982        Ok(())
1983    }
1984
1985    /// Determines if logs prior to `last_included_in_snapshot` can be permanently discarded.
1986    ///
1987    /// Implements Leader-side log compaction safety checks per Raft paper ยง7.2:
1988    /// > "The leader uses a new RPC called InstallSnapshot to send snapshots to followers that are
1989    /// > too far behind"
1990    ///
1991    /// # Safety Invariants (ALL must hold)
1992    ///
1993    /// 1. **Committed Entry Guarantee**   `last_included_in_snapshot.index < self.commit_index`
1994    ///    - Ensures we never discard uncommitted entries (Raft ยง5.4.2)
1995    ///    - Maintains at least one committed entry after purge for log matching property
1996    ///
1997    /// 2. **Monotonic Snapshot Advancement**   `last_purge_index < last_included_in_snapshot.index`
1998    ///    - Enforces snapshot indices strictly increase (prevents rollback attacks)
1999    ///    - Maintains sequential purge ordering (FSM safety requirement)
2000    ///
2001    /// 3. **Cluster-wide Progress Validation**   `peer_purge_progress.values().all(โ‰ฅ
2002    ///    snapshot.index)`
2003    ///    - Ensures ALL followers have confirmed ability to reach this snapshot
2004    ///    - Prevents leadership changes from causing log inconsistencies
2005    ///
2006    /// 4. **Operation Atomicity**   `pending_purge.is_none()`
2007    ///    - Ensures only one concurrent purge operation
2008    ///    - Critical for linearizable state machine semantics
2009    ///
2010    /// # Implementation Notes
2011    /// - Leader must maintain `peer_purge_progress` through AppendEntries responses
2012    /// - Actual log discard should be deferred until storage confirms snapshot persistence
2013    /// - Design differs from followers by requiring full cluster confirmation (Raft extension for
2014    ///   enhanced durability)
2015    #[instrument(skip(self))]
2016    pub fn can_purge_logs(
2017        &self,
2018        last_purge_index: Option<LogId>,
2019        last_included_in_snapshot: LogId,
2020    ) -> bool {
2021        let commit_index = self.commit_index();
2022        debug!(?self
2023                    .peer_purge_progress, ?commit_index, ?last_purge_index, ?last_included_in_snapshot, "can_purge_logs");
2024        let monotonic_check = last_purge_index
2025            .map(|lid| lid.index < last_included_in_snapshot.index)
2026            .unwrap_or(true);
2027
2028        last_included_in_snapshot.index < commit_index
2029            && monotonic_check
2030            && self.peer_purge_progress.values().all(|&v| v >= last_included_in_snapshot.index)
2031    }
2032
2033    pub async fn handle_join_cluster(
2034        &mut self,
2035        join_request: JoinRequest,
2036        sender: MaybeCloneOneshotSender<std::result::Result<JoinResponse, Status>>,
2037        ctx: &RaftContext<T>,
2038        role_tx: &mpsc::UnboundedSender<RoleEvent>,
2039    ) -> Result<()> {
2040        let node_id = join_request.node_id;
2041        let node_role = join_request.node_role;
2042        let address = join_request.address;
2043        let status = join_request.status;
2044        let membership = ctx.membership();
2045
2046        // 1. Validate join request
2047        debug!("1. Validate join request");
2048        if membership.contains_node(node_id).await {
2049            let error_msg = format!("Node {node_id} already exists in cluster");
2050            warn!(%error_msg);
2051            return self.send_join_error(sender, MembershipError::NodeAlreadyExists(node_id)).await;
2052        }
2053
2054        // 2. Create configuration change payload
2055        debug!("2. Create configuration change payload");
2056        if let Err(e) = membership.can_rejoin(node_id, node_role).await {
2057            let error_msg = format!("Node {node_id} cannot rejoin: {e}",);
2058            warn!(%error_msg);
2059            return self
2060                .send_join_error(sender, MembershipError::JoinClusterError(error_msg))
2061                .await;
2062        }
2063
2064        let config_change = Change::AddNode(AddNode {
2065            node_id,
2066            address: address.clone(),
2067            status,
2068        });
2069
2070        // 3. Submit config change, and wait for quorum confirmation
2071        debug!("3. Wait for quorum confirmation");
2072        // Use verify_leadership_persistent for config changes (must eventually succeed)
2073        match self
2074            .verify_leadership_persistent(
2075                vec![EntryPayload::config(config_change)],
2076                true,
2077                ctx,
2078                role_tx,
2079            )
2080            .await
2081        {
2082            Ok(true) => {
2083                // 4. Update node status to Syncing
2084                debug!("4. Update node status to Syncing");
2085
2086                debug!(
2087                    "After updating, the replications peers: {:?}",
2088                    ctx.membership().replication_peers().await
2089                );
2090
2091                // Note: Cluster metadata cache will be updated via MembershipApplied event
2092                // after CommitHandler applies the config change to membership state
2093
2094                // 5. Send successful response
2095                debug!("5. Send successful response");
2096                info!("Join config committed for node {}", node_id);
2097                self.send_join_success(node_id, &address, sender, ctx).await?;
2098            }
2099            Ok(false) => {
2100                warn!("Failed to commit join config for node {}", node_id);
2101                self.send_join_error(sender, MembershipError::CommitTimeout).await?
2102            }
2103            Err(e) => {
2104                error!("Error waiting for commit: {:?}", e);
2105                self.send_join_error(sender, e).await?
2106            }
2107        }
2108        Ok(())
2109    }
2110
2111    async fn send_join_success(
2112        &self,
2113        node_id: u32,
2114        address: &str,
2115        sender: MaybeCloneOneshotSender<std::result::Result<JoinResponse, Status>>,
2116        ctx: &RaftContext<T>,
2117    ) -> Result<()> {
2118        // Retrieve latest snapshot metadata, if there is
2119        let snapshot_metadata = ctx.state_machine_handler().get_latest_snapshot_metadata();
2120
2121        // Prepare response
2122        let response = JoinResponse {
2123            success: true,
2124            error: String::new(),
2125            config: Some(
2126                ctx.membership()
2127                    .retrieve_cluster_membership_config(self.shared_state().current_leader())
2128                    .await,
2129            ),
2130            config_version: ctx.membership().get_cluster_conf_version().await,
2131            snapshot_metadata,
2132            leader_id: self.node_id(),
2133        };
2134
2135        sender.send(Ok(response)).map_err(|e| {
2136            error!("Failed to send join response: {:?}", e);
2137            NetworkError::SingalSendFailed(format!("{e:?}"))
2138        })?;
2139
2140        info!(
2141            "Node {} ({}) successfully added as learner",
2142            node_id, address
2143        );
2144
2145        // Print leader accepting new node message (Plan B)
2146        crate::utils::cluster_printer::print_leader_accepting_new_node(
2147            self.node_id(),
2148            node_id,
2149            address,
2150            d_engine_proto::common::NodeRole::Learner as i32,
2151        );
2152
2153        Ok(())
2154    }
2155
2156    async fn send_join_error(
2157        &self,
2158        sender: MaybeCloneOneshotSender<std::result::Result<JoinResponse, Status>>,
2159        error: impl Into<Error>,
2160    ) -> Result<()> {
2161        let error = error.into();
2162        let status = Status::failed_precondition(error.to_string());
2163
2164        sender.send(Err(status)).map_err(|e| {
2165            error!("Failed to send join error: {:?}", e);
2166            NetworkError::SingalSendFailed(format!("{e:?}"))
2167        })?;
2168
2169        Err(error)
2170    }
2171
2172    #[cfg(any(test, feature = "__test_support"))]
2173    pub fn new(
2174        node_id: u32,
2175        node_config: Arc<RaftNodeConfig>,
2176    ) -> Self {
2177        let ReplicationConfig {
2178            rpc_append_entries_in_batch_threshold,
2179            rpc_append_entries_batch_process_delay_in_ms,
2180            rpc_append_entries_clock_in_ms,
2181            ..
2182        } = node_config.raft.replication;
2183
2184        LeaderState {
2185            cluster_metadata: ClusterMetadata {
2186                single_voter: false,
2187                total_voters: 0,
2188                replication_targets: vec![],
2189            },
2190            shared_state: SharedState::new(node_id, None, None),
2191            timer: Box::new(ReplicationTimer::new(
2192                rpc_append_entries_clock_in_ms,
2193                rpc_append_entries_batch_process_delay_in_ms,
2194            )),
2195            next_index: HashMap::new(),
2196            match_index: HashMap::new(),
2197            noop_log_id: None,
2198
2199            batch_buffer: Box::new(BatchBuffer::new(
2200                rpc_append_entries_in_batch_threshold,
2201                Duration::from_millis(rpc_append_entries_batch_process_delay_in_ms),
2202            )),
2203
2204            node_config,
2205            scheduled_purge_upto: None,
2206            last_purged_index: None, //TODO
2207            last_learner_check: Instant::now(),
2208            peer_purge_progress: HashMap::new(),
2209            snapshot_in_progress: AtomicBool::new(false),
2210            next_membership_maintenance_check: Instant::now(),
2211            pending_promotions: VecDeque::new(),
2212            lease_timestamp: AtomicU64::new(0),
2213            read_buffer: Vec::new(),
2214            read_buffer_start_time: None,
2215            _marker: PhantomData,
2216        }
2217    }
2218
2219    pub async fn trigger_background_snapshot(
2220        node_id: u32,
2221        metadata: SnapshotMetadata,
2222        state_machine_handler: Arc<SMHOF<T>>,
2223        membership: Arc<MOF<T>>,
2224        config: SnapshotConfig,
2225    ) -> Result<()> {
2226        let (result_tx, result_rx) = oneshot::channel();
2227
2228        // Delegate the actual transfer to a dedicated thread pool
2229        tokio::task::spawn_blocking(move || {
2230            let rt = tokio::runtime::Handle::current();
2231            let result = rt.block_on(async move {
2232                let bulk_channel = membership
2233                    .get_peer_channel(node_id, ConnectionType::Bulk)
2234                    .await
2235                    .ok_or(NetworkError::PeerConnectionNotFound(node_id))?;
2236
2237                let data_stream =
2238                    state_machine_handler.load_snapshot_data(metadata.clone()).await?;
2239
2240                BackgroundSnapshotTransfer::<T>::run_push_transfer(
2241                    node_id,
2242                    data_stream,
2243                    bulk_channel,
2244                    config,
2245                )
2246                .await
2247            });
2248
2249            // Non-blocking send result
2250            let _ = result_tx.send(result);
2251        });
2252
2253        // Non-blocking check result
2254        tokio::spawn(async move {
2255            match result_rx.await {
2256                Ok(Ok(_)) => info!("Snapshot to {} completed", node_id),
2257                Ok(Err(e)) => error!("Snapshot to {} failed: {:?}", node_id, e),
2258                Err(_) => warn!("Snapshot result channel closed unexpectedly"),
2259            }
2260        });
2261
2262        Ok(())
2263    }
2264
2265    /// Processes all pending promotions while respecting the cluster's odd-node constraint
2266    pub async fn process_pending_promotions(
2267        &mut self,
2268        ctx: &RaftContext<T>,
2269        role_tx: &mpsc::UnboundedSender<RoleEvent>,
2270    ) -> Result<()> {
2271        debug!(
2272            "[Leader {}] ๐Ÿ”„ process_pending_promotions called, pending: {:?}",
2273            self.node_id(),
2274            self.pending_promotions.iter().map(|p| p.node_id).collect::<Vec<_>>()
2275        );
2276
2277        // Get promotion configuration from the node config
2278        let config = &ctx.node_config().raft.membership.promotion;
2279
2280        // Step 1: Remove stale entries (older than configured threshold)
2281        let now = Instant::now();
2282        self.pending_promotions.retain(|entry| {
2283            now.duration_since(entry.ready_since) <= config.stale_learner_threshold
2284        });
2285
2286        if self.pending_promotions.is_empty() {
2287            debug!(
2288                "[Leader {}] โŒ pending_promotions is empty after stale cleanup",
2289                self.node_id()
2290            );
2291            return Ok(());
2292        }
2293
2294        // Step 2: Get current voter count (including self as leader)
2295        let membership = ctx.membership();
2296        let current_voters = membership.voters().await.len() + 1; // +1 for self (leader)
2297        debug!(
2298            "[Leader {}] ๐Ÿ“Š current_voters: {}, pending: {}",
2299            self.node_id(),
2300            current_voters,
2301            self.pending_promotions.len()
2302        );
2303
2304        // Step 3: Calculate the maximum batch size that preserves an odd total
2305        let max_batch_size =
2306            calculate_safe_batch_size(current_voters, self.pending_promotions.len());
2307        debug!(
2308            "[Leader {}] ๐ŸŽฏ max_batch_size: {}",
2309            self.node_id(),
2310            max_batch_size
2311        );
2312
2313        if max_batch_size == 0 {
2314            // Nothing we can safely promote now
2315            debug!(
2316                "[Leader {}] โš ๏ธ max_batch_size is 0, cannot promote now",
2317                self.node_id()
2318            );
2319            return Ok(());
2320        }
2321
2322        // Step 4: Extract the batch from the queue (FIFO order)
2323        let promotion_entries = self.drain_batch(max_batch_size);
2324        let promotion_node_ids = promotion_entries.iter().map(|e| e.node_id).collect::<Vec<_>>();
2325
2326        // Step 5: Execute batch promotion
2327        if !promotion_node_ids.is_empty() {
2328            // Log the batch promotion
2329            info!(
2330                "Promoting learner batch of {} nodes: {:?} (total voters: {} -> {})",
2331                promotion_node_ids.len(),
2332                promotion_node_ids,
2333                current_voters,
2334                current_voters + promotion_node_ids.len()
2335            );
2336
2337            // Attempt promotion and restore batch on failure
2338            let result = self.safe_batch_promote(promotion_node_ids.clone(), ctx, role_tx).await;
2339
2340            if let Err(e) = result {
2341                // Restore entries to the front of the queue in reverse order
2342                for entry in promotion_entries.into_iter().rev() {
2343                    self.pending_promotions.push_front(entry);
2344                }
2345                return Err(e);
2346            }
2347
2348            info!(
2349                "Promotion successful. Cluster members: {:?}",
2350                membership.voters().await
2351            );
2352        }
2353
2354        trace!(
2355            ?self.pending_promotions,
2356            "Step 6: Reschedule if any pending promotions remain"
2357        );
2358        // Step 6: Reschedule if any pending promotions remain
2359        if !self.pending_promotions.is_empty() {
2360            debug!(
2361                "[Leader {}] ๐Ÿ” Re-sending PromoteReadyLearners for remaining pending: {:?}",
2362                self.node_id(),
2363                self.pending_promotions.iter().map(|p| p.node_id).collect::<Vec<_>>()
2364            );
2365            // Important: Re-send the event to trigger next cycle
2366            role_tx
2367                .send(RoleEvent::ReprocessEvent(Box::new(
2368                    RaftEvent::PromoteReadyLearners,
2369                )))
2370                .map_err(|e| {
2371                    let error_str = format!("{e:?}");
2372                    error!("Send PromoteReadyLearners event failed: {}", error_str);
2373                    NetworkError::SingalSendFailed(error_str)
2374                })?;
2375        }
2376
2377        Ok(())
2378    }
2379
2380    /// Removes the first `count` nodes from the pending queue and returns them
2381    pub(super) fn drain_batch(
2382        &mut self,
2383        count: usize,
2384    ) -> Vec<PendingPromotion> {
2385        let mut batch = Vec::with_capacity(count);
2386        for _ in 0..count {
2387            if let Some(entry) = self.pending_promotions.pop_front() {
2388                batch.push(entry);
2389            } else {
2390                break;
2391            }
2392        }
2393        batch
2394    }
2395
2396    async fn safe_batch_promote(
2397        &mut self,
2398        batch: Vec<u32>,
2399        ctx: &RaftContext<T>,
2400        role_tx: &mpsc::UnboundedSender<RoleEvent>,
2401    ) -> Result<()> {
2402        let change = Change::BatchPromote(BatchPromote {
2403            node_ids: batch.clone(),
2404            new_status: NodeStatus::Active as i32,
2405        });
2406
2407        // Submit batch activation
2408        // Use verify_leadership_persistent for config changes (must eventually succeed)
2409        self.verify_leadership_persistent(vec![EntryPayload::config(change)], true, ctx, role_tx)
2410            .await?;
2411
2412        Ok(())
2413    }
2414
2415    async fn run_periodic_maintenance(
2416        &mut self,
2417        role_tx: &mpsc::UnboundedSender<RoleEvent>,
2418        ctx: &RaftContext<T>,
2419    ) -> Result<()> {
2420        if let Err(e) = self.conditionally_purge_stale_learners(role_tx, ctx).await {
2421            error!("Stale learner purge failed: {}", e);
2422        }
2423
2424        if let Err(e) = self.conditionally_purge_zombie_nodes(role_tx, ctx).await {
2425            error!("Zombie node purge failed: {}", e);
2426        }
2427
2428        // Regardless of whether a stale node is found, we set the next check according to a fixed
2429        // period
2430        self.reset_next_membership_maintenance_check(
2431            ctx.node_config().raft.membership.membership_maintenance_interval,
2432        );
2433        Ok(())
2434    }
2435
2436    /// Periodic check triggered every ~30s in the worst-case scenario
2437    /// using priority-based lazy scheduling. Actual average frequency
2438    /// is inversely proportional to system load.
2439    pub async fn conditionally_purge_stale_learners(
2440        &mut self,
2441        role_tx: &mpsc::UnboundedSender<RoleEvent>,
2442        ctx: &RaftContext<T>,
2443    ) -> Result<()> {
2444        let config = &ctx.node_config.raft.membership.promotion;
2445
2446        // Optimization: Skip check 99.9% of the time using scheduled trial method
2447        if self.pending_promotions.is_empty()
2448            || self.next_membership_maintenance_check > Instant::now()
2449        {
2450            trace!("Skipping stale learner check");
2451            return Ok(());
2452        }
2453
2454        let now = Instant::now();
2455        let queue_len = self.pending_promotions.len();
2456
2457        // Inspect only oldest 1% of items or max 100 entries per rules
2458        let inspect_count = queue_len.min(100).min(1.max(queue_len / 100));
2459        let mut stale_entries = Vec::new();
2460
2461        trace!("Inspecting {} entries", inspect_count);
2462        for _ in 0..inspect_count {
2463            if let Some(entry) = self.pending_promotions.pop_front() {
2464                trace!(
2465                    "Inspecting entry: {:?} - {:?} - {:?}",
2466                    entry,
2467                    now.duration_since(entry.ready_since),
2468                    &config.stale_learner_threshold
2469                );
2470                if now.duration_since(entry.ready_since) > config.stale_learner_threshold {
2471                    stale_entries.push(entry);
2472                } else {
2473                    // Return non-stale entry and stop
2474                    self.pending_promotions.push_front(entry);
2475                    break;
2476                }
2477            } else {
2478                break;
2479            }
2480        }
2481
2482        trace!("Stale learner check completed: {:?}", stale_entries);
2483
2484        // Process collected stale entries
2485        for entry in stale_entries {
2486            if let Err(e) = self.handle_stale_learner(entry.node_id, role_tx, ctx).await {
2487                error!("Failed to handle stale learner: {}", e);
2488            }
2489        }
2490
2491        Ok(())
2492    }
2493
2494    /// Remove non-Active zombie nodes that exceed failure threshold
2495    async fn conditionally_purge_zombie_nodes(
2496        &mut self,
2497        role_tx: &mpsc::UnboundedSender<RoleEvent>,
2498        ctx: &RaftContext<T>,
2499    ) -> Result<()> {
2500        // Optimize: get membership reference once to reduce lock contention
2501        let membership = ctx.membership();
2502        let zombie_candidates = membership.get_zombie_candidates().await;
2503        let mut nodes_to_remove = Vec::new();
2504
2505        for node_id in zombie_candidates {
2506            if let Some(status) = membership.get_node_status(node_id).await {
2507                if status != NodeStatus::Active {
2508                    nodes_to_remove.push(node_id);
2509                }
2510            }
2511        }
2512        // Batch removal if we have candidates
2513        if !nodes_to_remove.is_empty() {
2514            let change = Change::BatchRemove(BatchRemove {
2515                node_ids: nodes_to_remove.clone(),
2516            });
2517
2518            info!(
2519                "Proposing batch removal of zombie nodes: {:?}",
2520                nodes_to_remove
2521            );
2522
2523            // Submit single config change for all nodes
2524            // Use verify_leadership_persistent for config changes (must eventually succeed)
2525            match self
2526                .verify_leadership_persistent(
2527                    vec![EntryPayload::config(change)],
2528                    false,
2529                    ctx,
2530                    role_tx,
2531                )
2532                .await
2533            {
2534                Ok(true) => {
2535                    info!("Batch removal committed for nodes: {:?}", nodes_to_remove);
2536                }
2537                Ok(false) => {
2538                    warn!("Failed to commit batch removal");
2539                }
2540                Err(e) => {
2541                    error!("Batch removal error: {:?}", e);
2542                    return Err(e);
2543                }
2544            }
2545        }
2546
2547        Ok(())
2548    }
2549
2550    pub fn reset_next_membership_maintenance_check(
2551        &mut self,
2552        membership_maintenance_interval: Duration,
2553    ) {
2554        self.next_membership_maintenance_check = Instant::now() + membership_maintenance_interval;
2555    }
2556
2557    /// Remove stalled learner via membership change consensus
2558    pub async fn handle_stale_learner(
2559        &mut self,
2560        node_id: u32,
2561        role_tx: &mpsc::UnboundedSender<RoleEvent>,
2562        ctx: &RaftContext<T>,
2563    ) -> Result<()> {
2564        // Stalled learner detected - remove via membership change (requires consensus)
2565        warn!(
2566            "Learner {} is stalled, removing from cluster via consensus",
2567            node_id
2568        );
2569
2570        let change = Change::BatchRemove(BatchRemove {
2571            node_ids: vec![node_id],
2572        });
2573
2574        // Submit removal through membership change entry (requires quorum consensus)
2575        // Use verify_leadership_persistent for config changes (must eventually succeed)
2576        match self
2577            .verify_leadership_persistent(vec![EntryPayload::config(change)], true, ctx, role_tx)
2578            .await
2579        {
2580            Ok(true) => {
2581                info!(
2582                    "Stalled learner {} successfully removed from cluster",
2583                    node_id
2584                );
2585            }
2586            Ok(false) => {
2587                warn!("Failed to commit removal of stalled learner {}", node_id);
2588            }
2589            Err(e) => {
2590                error!("Error removing stalled learner {}: {:?}", node_id, e);
2591                return Err(e);
2592            }
2593        }
2594
2595        Ok(())
2596    }
2597
2598    /// Check if current lease is still valid for LeaseRead policy
2599    pub fn is_lease_valid(
2600        &self,
2601        ctx: &RaftContext<T>,
2602    ) -> bool {
2603        let now = std::time::SystemTime::now()
2604            .duration_since(std::time::UNIX_EPOCH)
2605            .unwrap_or_default()
2606            .as_millis() as u64;
2607
2608        let last_confirmed = self.lease_timestamp.load(std::sync::atomic::Ordering::Acquire);
2609        let lease_duration = ctx.node_config().raft.read_consistency.lease_duration_ms;
2610
2611        if now < last_confirmed {
2612            // Clock moved backwards: system clock issue, treat lease as invalid
2613            error!("Clock moved backwards: Now {now}, Last Confirmed {last_confirmed}");
2614            return false;
2615        }
2616
2617        // Allow multiple requests within the same millisecond (now == last_confirmed)
2618        if now == last_confirmed {
2619            return true;
2620        }
2621        (now - last_confirmed) < lease_duration
2622    }
2623
2624    /// Update lease timestamp after successful leadership verification
2625    fn update_lease_timestamp(&self) {
2626        let now = std::time::SystemTime::now()
2627            .duration_since(std::time::UNIX_EPOCH)
2628            .unwrap_or_default()
2629            .as_millis() as u64;
2630
2631        self.lease_timestamp.store(now, std::sync::atomic::Ordering::Release);
2632    }
2633
2634    /// Verify leadership with quorum and refresh lease timestamp on success.
2635    /// This is a single-shot verification (fast-fail) optimized for read operations.
2636    ///
2637    /// Returns Ok(()) if verification succeeds and lease is refreshed.
2638    /// Returns Err on any verification failure.
2639    async fn verify_leadership_and_refresh_lease(
2640        &mut self,
2641        ctx: &RaftContext<T>,
2642        role_tx: &mpsc::UnboundedSender<RoleEvent>,
2643    ) -> Result<()> {
2644        match self.verify_internal_quorum(vec![], true, ctx, role_tx).await {
2645            Ok(QuorumVerificationResult::Success) => {
2646                self.update_lease_timestamp();
2647                Ok(())
2648            }
2649            Ok(QuorumVerificationResult::LeadershipLost) => Err(ConsensusError::Replication(
2650                ReplicationError::NotLeader { leader_id: None },
2651            )
2652            .into()),
2653            Ok(QuorumVerificationResult::RetryRequired) => {
2654                Err(ConsensusError::Replication(ReplicationError::QuorumNotReached).into())
2655            }
2656            Err(e) => Err(e),
2657        }
2658    }
2659
2660    /// Process batched LinearizableRead requests with a single quorum verification.
2661    ///
2662    /// This function takes all pending read requests from the buffer, performs one
2663    /// leadership verification for the entire batch, waits for state machine to catch up,
2664    /// and executes all reads. This batching strategy significantly reduces the number
2665    /// of quorum checks required for high read throughput scenarios.
2666    pub(super) async fn process_linearizable_read_batch(
2667        &mut self,
2668        ctx: &RaftContext<T>,
2669        role_tx: &mpsc::UnboundedSender<RoleEvent>,
2670    ) -> Result<()> {
2671        // Take buffer atomically
2672        let batch = std::mem::take(&mut self.read_buffer);
2673        self.read_buffer_start_time = None;
2674
2675        if batch.is_empty() {
2676            return Ok(());
2677        }
2678
2679        let batch_size = batch.len();
2680        info!("Read batch: flushing {} requests", batch_size);
2681
2682        // All requests in read_buffer are LinearizableRead (filtered in handle_raft_event)
2683        // Perform single verification for the entire batch and refresh lease
2684        if let Err(e) = self.verify_leadership_and_refresh_lease(ctx, role_tx).await {
2685            // Leadership verification failed - drain buffer with error
2686            error!(
2687                "Read batch: verification failed - draining {} requests: {}",
2688                batch_size, e
2689            );
2690            let status = tonic::Status::failed_precondition(format!(
2691                "Leadership verification failed: {e:?}"
2692            ));
2693            for (_, sender) in batch {
2694                let _ = sender.send(Err(status.clone()));
2695            }
2696            return Ok(());
2697        }
2698
2699        // Wait for state machine to catch up
2700        let last_applied = ctx.state_machine().last_applied().index;
2701        let read_index = self.calculate_read_index();
2702
2703        if last_applied < read_index {
2704            ctx.handlers.state_machine_handler.update_pending(read_index);
2705            let timeout_ms = self.node_config.raft.read_consistency.state_machine_sync_timeout_ms;
2706            ctx.handlers
2707                .state_machine_handler
2708                .wait_applied(read_index, Duration::from_millis(timeout_ms))
2709                .await
2710                .map_err(|e| {
2711                    error!("Read batch: wait_applied failed: {}", e);
2712                    e
2713                })?;
2714        }
2715
2716        // Execute all reads and respond
2717        for (req, sender) in batch {
2718            let results = ctx
2719                .handlers
2720                .state_machine_handler
2721                .read_from_state_machine(req.keys)
2722                .unwrap_or_default();
2723            let _ = sender.send(Ok(ClientResponse::read_results(results)));
2724        }
2725
2726        // // Metrics
2727        // if let Some(metrics) = &ctx.metrics {
2728        //     metrics.read_batch_size.observe(batch_size as f64);
2729        // }
2730
2731        Ok(())
2732    }
2733
2734    #[cfg(test)]
2735    pub(crate) fn test_update_lease_timestamp(&self) {
2736        self.update_lease_timestamp();
2737    }
2738}
2739
2740impl<T: TypeConfig> From<&CandidateState<T>> for LeaderState<T> {
2741    fn from(candidate: &CandidateState<T>) -> Self {
2742        let ReplicationConfig {
2743            rpc_append_entries_in_batch_threshold,
2744            rpc_append_entries_batch_process_delay_in_ms,
2745            rpc_append_entries_clock_in_ms,
2746            ..
2747        } = candidate.node_config.raft.replication;
2748
2749        // Clone shared_state and set self as leader immediately
2750        let shared_state = candidate.shared_state.clone();
2751        shared_state.set_current_leader(candidate.node_id());
2752
2753        Self {
2754            shared_state,
2755            timer: Box::new(ReplicationTimer::new(
2756                rpc_append_entries_clock_in_ms,
2757                rpc_append_entries_batch_process_delay_in_ms,
2758            )),
2759            next_index: HashMap::new(),
2760            match_index: HashMap::new(),
2761            noop_log_id: None,
2762
2763            batch_buffer: Box::new(BatchBuffer::new(
2764                rpc_append_entries_in_batch_threshold,
2765                Duration::from_millis(rpc_append_entries_batch_process_delay_in_ms),
2766            )),
2767
2768            node_config: candidate.node_config.clone(),
2769
2770            scheduled_purge_upto: None,
2771            last_purged_index: candidate.last_purged_index,
2772            last_learner_check: Instant::now(),
2773            snapshot_in_progress: AtomicBool::new(false),
2774            peer_purge_progress: HashMap::new(),
2775            next_membership_maintenance_check: Instant::now(),
2776            pending_promotions: VecDeque::new(),
2777            cluster_metadata: ClusterMetadata {
2778                single_voter: false,
2779                total_voters: 0,
2780                replication_targets: vec![],
2781            },
2782            lease_timestamp: AtomicU64::new(0),
2783            read_buffer: Vec::new(),
2784            read_buffer_start_time: None,
2785            _marker: PhantomData,
2786        }
2787    }
2788}
2789
2790impl<T: TypeConfig> Debug for LeaderState<T> {
2791    fn fmt(
2792        &self,
2793        f: &mut std::fmt::Formatter<'_>,
2794    ) -> std::fmt::Result {
2795        f.debug_struct("LeaderState")
2796            .field("shared_state", &self.shared_state)
2797            .field("next_index", &self.next_index)
2798            .field("match_index", &self.match_index)
2799            .field("noop_log_id", &self.noop_log_id)
2800            .finish()
2801    }
2802}
2803
2804/// Calculates the maximum number of nodes we can promote while keeping the total voter count
2805/// odd
2806///
2807/// - `current`: current number of voting nodes
2808/// - `available`: number of ready learners pending promotion
2809///
2810/// Returns the maximum number of nodes to promote (0 if no safe promotion exists)
2811pub fn calculate_safe_batch_size(
2812    current: usize,
2813    available: usize,
2814) -> usize {
2815    if (current + available) % 2 == 1 {
2816        // Promoting all is safe
2817        available
2818    } else {
2819        // We can only promote (available - 1) to keep the invariant
2820        // But if available - 1 == 0, then we cannot promote any?
2821        available.saturating_sub(1)
2822    }
2823}
2824
2825pub(super) fn send_replay_raft_event(
2826    role_tx: &mpsc::UnboundedSender<RoleEvent>,
2827    raft_event: RaftEvent,
2828) -> Result<()> {
2829    role_tx.send(RoleEvent::ReprocessEvent(Box::new(raft_event))).map_err(|e| {
2830        let error_str = format!("{e:?}");
2831        error!("Failed to send: {}", error_str);
2832        NetworkError::SingalSendFailed(error_str).into()
2833    })
2834}