Skip to main content

d_engine_core/raft_role/
leader_state.rs

1use super::LeaderStateSnapshot;
2use super::RaftRole;
3use super::SharedState;
4use super::StateSnapshot;
5use super::buffers::BatchBuffer;
6use super::buffers::ProposeBatchBuffer;
7use super::candidate_state::CandidateState;
8use super::role_state::RaftRoleState;
9use super::role_state::check_and_trigger_snapshot;
10use super::role_state::send_replay_raft_event;
11use crate::AppendResults;
12use crate::BackgroundSnapshotTransfer;
13use crate::ConnectionType;
14use crate::ConsensusError;
15use crate::Error;
16use crate::MaybeCloneOneshot;
17use crate::MaybeCloneOneshotSender;
18use crate::Membership;
19use crate::MembershipError;
20use crate::NetworkError;
21use crate::PeerUpdate;
22use crate::PurgeExecutor;
23use crate::QuorumVerificationResult;
24use crate::RaftContext;
25use crate::RaftEvent;
26use crate::RaftLog;
27use crate::RaftNodeConfig;
28use crate::RaftOneshot;
29use crate::RaftRequestWithSignal;
30use crate::ReadConsistencyPolicy as ServerReadConsistencyPolicy;
31use crate::ReplicationConfig;
32use crate::ReplicationCore;
33use crate::ReplicationError;
34use crate::ReplicationTimer;
35use crate::Result;
36use crate::RoleEvent;
37use crate::SnapshotConfig;
38use crate::StateMachine;
39use crate::StateMachineHandler;
40use crate::StateTransitionError;
41use crate::SystemError;
42use crate::TypeConfig;
43use crate::alias::MOF;
44use crate::alias::ROF;
45use crate::alias::SMHOF;
46use crate::cluster::is_majority;
47use crate::ensure_safe_join;
48use crate::event::ClientCmd;
49use crate::stream::create_production_snapshot_stream;
50use d_engine_proto::client::ClientReadRequest;
51use d_engine_proto::client::ClientResponse;
52use d_engine_proto::common::AddNode;
53use d_engine_proto::common::BatchPromote;
54use d_engine_proto::common::BatchRemove;
55use d_engine_proto::common::EntryPayload;
56use d_engine_proto::common::LogId;
57use d_engine_proto::common::NodeRole::Leader;
58use d_engine_proto::common::NodeStatus;
59use d_engine_proto::common::membership_change::Change;
60use d_engine_proto::error::ErrorCode;
61use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
62use d_engine_proto::server::cluster::JoinRequest;
63use d_engine_proto::server::cluster::JoinResponse;
64use d_engine_proto::server::cluster::LeaderDiscoveryResponse;
65use d_engine_proto::server::cluster::NodeMeta;
66use d_engine_proto::server::election::VoteResponse;
67use d_engine_proto::server::election::VotedFor;
68use d_engine_proto::server::replication::AppendEntriesResponse;
69use d_engine_proto::server::storage::SnapshotChunk;
70use d_engine_proto::server::storage::SnapshotMetadata;
71use nanoid::nanoid;
72use std::collections::BTreeMap;
73use std::collections::HashMap;
74use std::collections::VecDeque;
75use std::fmt::Debug;
76use std::marker::PhantomData;
77use std::sync::Arc;
78use std::sync::atomic::AtomicBool;
79use std::sync::atomic::AtomicU32;
80use std::sync::atomic::AtomicU64;
81use std::sync::atomic::Ordering;
82use std::time::Duration;
83use tokio::sync::mpsc;
84use tokio::sync::oneshot;
85use tokio::time::Instant;
86use tokio::time::sleep;
87use tokio::time::timeout;
88use tonic::Status;
89use tonic::async_trait;
90use tracing::debug;
91use tracing::error;
92use tracing::info;
93use tracing::instrument;
94use tracing::trace;
95use tracing::warn;
96
97// Type aliases for improved readability
98type LinearizableReadRequest = (
99    ClientReadRequest,
100    MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
101);
102
103/// Write batch metadata: (start_log_index, response_senders, wait_for_apply)
104type WriteMetadata = (
105    u64,
106    Vec<MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>>,
107    bool,
108);
109
110// Supporting data structures
111#[derive(Debug, Clone)]
112pub struct PendingPromotion {
113    pub node_id: u32,
114    pub ready_since: Instant,
115}
116
117impl PendingPromotion {
118    pub fn new(
119        node_id: u32,
120        ready_since: Instant,
121    ) -> Self {
122        PendingPromotion {
123            node_id,
124            ready_since,
125        }
126    }
127}
128
129/// Cached cluster topology metadata for hot path optimization.
130///
131/// This metadata is immutable during leadership and only updated on membership changes.
132/// Caching avoids repeated async calls to membership queries in hot paths like append_entries.
133#[derive(Debug, Clone)]
134pub struct ClusterMetadata {
135    /// Single-voter cluster (quorum = 1, used for election and commit optimization)
136    pub single_voter: bool,
137    /// Total number of voters including self (updated on membership changes)
138    pub total_voters: usize,
139    /// Cached replication targets (voters + learners, excluding self)
140    pub replication_targets: Vec<NodeMeta>,
141}
142
143/// Metrics context for backpressure monitoring (zero-allocation hot path)
144///
145/// Encapsulates all metrics-related state to avoid polluting the core LeaderState structure.
146/// Pre-allocated labels ensure zero allocations in the request hot path.
147pub struct BackpressureMetrics {
148    /// Pre-allocated labels for write rejections: [("node_id", "<id>"), ("type", "write")]
149    labels_write: Arc<[(String, String)]>,
150    /// Pre-allocated labels for read rejections: [("node_id", "<id>"), ("type", "read")]
151    labels_read: Arc<[(String, String)]>,
152    /// Runtime switch (branch prediction optimized, ~0ns overhead when false)
153    enabled: bool,
154    /// Sample counter for gauge metrics (reduces overhead at high QPS)
155    sample_counter: AtomicU32,
156    /// Sampling rate (1 = no sampling, 10 = sample 1 in 10)
157    sample_rate: u32,
158}
159
160impl BackpressureMetrics {
161    /// Create new metrics context with pre-allocated labels
162    pub fn new(
163        node_id: u32,
164        enabled: bool,
165        sample_rate: u32,
166    ) -> Self {
167        let sample_rate = if sample_rate == 0 { 1 } else { sample_rate };
168        let node_id_str = node_id.to_string();
169        let labels_write = Arc::new([
170            ("node_id".to_string(), node_id_str.clone()),
171            ("type".to_string(), "write".to_string()),
172        ]);
173        let labels_read = Arc::new([
174            ("node_id".to_string(), node_id_str),
175            ("type".to_string(), "read".to_string()),
176        ]);
177
178        Self {
179            labels_write,
180            labels_read,
181            enabled,
182            sample_counter: AtomicU32::new(0),
183            sample_rate,
184        }
185    }
186
187    /// Record rejection metric (always counted, not sampled)
188    #[inline]
189    pub fn record_rejection(
190        &self,
191        is_write: bool,
192    ) {
193        if self.enabled {
194            let labels = if is_write {
195                &self.labels_write
196            } else {
197                &self.labels_read
198            };
199            metrics::counter!("backpressure.rejections", labels.as_ref()).increment(1);
200        }
201    }
202
203    /// Record buffer utilization gauge (with sampling)
204    #[inline]
205    pub fn record_buffer_utilization(
206        &self,
207        utilization: f64,
208        is_write: bool,
209    ) {
210        if self.enabled {
211            let counter = self.sample_counter.fetch_add(1, Ordering::Relaxed);
212            if counter % self.sample_rate == 0 {
213                let labels = if is_write {
214                    &self.labels_write
215                } else {
216                    &self.labels_read
217                };
218                metrics::gauge!("backpressure.buffer_utilization", labels.as_ref())
219                    .set(utilization);
220            }
221        }
222    }
223}
224
225/// Leader node's state in Raft consensus algorithm.
226///
227/// This structure maintains all state that should be persisted on leader crashes,
228/// including replication progress tracking and log compaction management.
229///
230/// # Type Parameters
231/// - `T`: Application-specific Raft type configuration
232pub struct LeaderState<T: TypeConfig> {
233    // -- Core Raft Leader State --
234    /// Shared cluster state with lock protection
235    pub shared_state: SharedState,
236
237    /// === Volatile State ===
238    /// For each server (node_id), index of the next log entry to send to that server
239    ///
240    /// Raft Paper: §5.3 Figure 2 (nextIndex)
241    pub next_index: HashMap<u32, u64>,
242
243    /// === Volatile State ===
244    /// For each server (node_id), index of highest log entry known to be replicated
245    ///
246    /// Raft Paper: §5.3 Figure 2 (matchIndex)
247    pub(super) match_index: HashMap<u32, u64>,
248
249    /// === Volatile State ===
250    /// Temporary storage for no-op entry log ID during leader initialization
251    #[doc(hidden)]
252    pub noop_log_id: Option<u64>,
253
254    // -- Log Compaction & Purge --
255    /// === Volatile State ===
256    /// The upper bound (exclusive) of log entries scheduled for asynchronous physical deletion.
257    ///
258    /// This value is set immediately after a new snapshot is successfully created.
259    /// It represents the next log position that will trigger compaction.
260    ///
261    /// The actual log purge is performed by a background task, which may be delayed
262    /// due to resource constraints or retry mechanisms.
263    pub scheduled_purge_upto: Option<LogId>,
264
265    /// === Persistent State (MUST be on disk) ===
266    /// The last log position that has been **physically removed** from stable storage.
267    ///
268    /// This value is atomically updated when:
269    /// 1. A new snapshot is persisted (marking logs up to `last_included_index` as purgeable)
270    /// 2. The background purge task completes successfully
271    ///
272    /// Raft safety invariant:
273    /// Any log entry with index ≤ `last_purged_index` is guaranteed to be
274    /// reflected in the latest snapshot.
275    pub last_purged_index: Option<LogId>,
276
277    /// Record if there is on-going snapshot creation activity
278    pub snapshot_in_progress: AtomicBool,
279
280    // -- Request Processing --
281    /// Batched proposal buffer for client requests
282    ///
283    /// Accumulates requests until either:
284    /// 1. Batch reaches configured size limit
285    /// 2. Explicit flush is triggered
286    ///    SoA layout: payloads and senders stored in separate contiguous Vecs.
287    ///    flush() builds one RaftRequestWithSignal via mem::take — true O(1), no unzip scan.
288    pub(super) propose_buffer: Box<ProposeBatchBuffer>,
289
290    // -- Timing & Scheduling --
291    /// Replication heartbeat timer manager
292    ///
293    /// Handles:
294    /// - Heartbeat interval tracking
295    /// - Election timeout prevention
296    /// - Batch proposal flushing
297    timer: Box<ReplicationTimer>,
298
299    // -- Cluster Configuration --
300    /// Cached Raft node configuration (shared reference)
301    ///
302    /// This includes:
303    /// - Cluster membership
304    /// - Timeout parameters
305    /// - Performance tuning knobs
306    pub(super) node_config: Arc<RaftNodeConfig>,
307
308    /// Last time we checked for learners
309    pub last_learner_check: Instant,
310
311    // -- Stale Learner Handling --
312    /// The next scheduled time to check for stale learners.
313    ///
314    /// This is used to implement periodic checking of learners that have been in the promotion
315    /// queue for too long without making progress. The check interval is configurable via the
316    /// node configuration (`node_config.promotion.stale_check_interval`).
317    ///
318    /// When the current time reaches or exceeds this instant, the leader will perform a scan
319    /// of a subset of the pending promotions to detect stale learners. After the scan, the field
320    /// is updated to `current_time + stale_check_interval`.
321    ///
322    /// Rationale: Avoiding frequent full scans improves batching efficiency and reduces CPU spikes
323    /// in high-load environments (particularly crucial for RocketMQ-on-DLedger workflows).
324    pub next_membership_maintenance_check: Instant,
325
326    /// Queue of learners that have caught up and are pending promotion to voter.
327    pub pending_promotions: VecDeque<PendingPromotion>,
328
329    /// Lease timestamp for LeaseRead policy
330    /// Tracks when leadership was last confirmed with quorum
331    pub(super) lease_timestamp: AtomicU64,
332
333    // -- Cluster Topology Cache --
334    /// Cached cluster metadata (updated on membership changes)
335    /// Avoids repeated async calls in hot paths
336    pub(super) cluster_metadata: ClusterMetadata,
337
338    /// Buffered linearizable read requests awaiting batch processing
339    /// Only LinearizableRead policy uses batching with size/timeout thresholds
340    pub(super) linearizable_read_buffer: Box<BatchBuffer<LinearizableReadRequest>>,
341
342    /// Lease-based read requests awaiting processing
343    /// Processed immediately in flush_cmd_buffers if lease is valid
344    pub(super) lease_read_queue: VecDeque<(
345        ClientReadRequest,
346        MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
347    )>,
348
349    /// Eventual consistency read requests awaiting processing
350    /// Processed immediately in flush_cmd_buffers without any verification
351    pub(super) eventual_read_queue: VecDeque<(
352        ClientReadRequest,
353        MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
354    )>,
355
356    // -- Client Request Tracking --
357    /// Maps log index to client response sender
358    /// Used to return apply results (e.g., CAS success/failure) to clients
359    /// Entries are removed after response is sent
360    pub(super) pending_requests:
361        HashMap<u64, MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>>,
362
363    /// Pending linearizable reads waiting for SM to apply up to read_index.
364    /// Key: read_index (SM must reach this index before reads can be served)
365    /// Value: batch of (request, sender) pairs registered at that read_index
366    /// Drained on role change; processed in ApplyCompleted handler.
367    pub(super) pending_reads: BTreeMap<u64, VecDeque<LinearizableReadRequest>>,
368
369    // -- Metrics (optional, encapsulated) --
370    /// Backpressure metrics context (None when metrics disabled)
371    backpressure_metrics: Option<Arc<BackpressureMetrics>>,
372
373    // -- Type System Marker --
374    /// Phantom data for type parameter anchoring
375    _marker: PhantomData<T>,
376}
377
378#[async_trait]
379impl<T: TypeConfig> RaftRoleState for LeaderState<T> {
380    type T = T;
381
382    fn shared_state(&self) -> &SharedState {
383        &self.shared_state
384    }
385
386    fn shared_state_mut(&mut self) -> &mut SharedState {
387        &mut self.shared_state
388    }
389
390    ///Overwrite default behavior.
391    /// As leader, I should not receive commit index,
392    ///     which is lower than my current one
393    #[tracing::instrument]
394    fn update_commit_index(
395        &mut self,
396        new_commit_index: u64,
397    ) -> Result<()> {
398        if self.commit_index() < new_commit_index {
399            debug!("update_commit_index to: {:?}", new_commit_index);
400            self.shared_state.commit_index = new_commit_index;
401        } else {
402            warn!(
403                "Illegal operation, might be a bug! I am Leader old_commit_index({}) >= new_commit_index:({})",
404                self.commit_index(),
405                new_commit_index
406            )
407        }
408        Ok(())
409    }
410
411    /// As Leader should not vote any more
412    fn voted_for(&self) -> Result<Option<VotedFor>> {
413        self.shared_state().voted_for()
414    }
415
416    /// As Leader might also be able to vote ,
417    ///     if new legal Leader found
418    fn update_voted_for(
419        &mut self,
420        voted_for: VotedFor,
421    ) -> Result<bool> {
422        self.shared_state_mut().update_voted_for(voted_for)
423    }
424
425    fn next_index(
426        &self,
427        node_id: u32,
428    ) -> Option<u64> {
429        Some(if let Some(n) = self.next_index.get(&node_id) {
430            *n
431        } else {
432            1
433        })
434    }
435
436    fn update_next_index(
437        &mut self,
438        node_id: u32,
439        new_next_id: u64,
440    ) -> Result<()> {
441        debug!("update_next_index({}) to {}", node_id, new_next_id);
442        self.next_index.insert(node_id, new_next_id);
443        Ok(())
444    }
445
446    fn update_match_index(
447        &mut self,
448        node_id: u32,
449        new_match_id: u64,
450    ) -> Result<()> {
451        self.match_index.insert(node_id, new_match_id);
452        Ok(())
453    }
454
455    fn match_index(
456        &self,
457        node_id: u32,
458    ) -> Option<u64> {
459        self.match_index.get(&node_id).copied()
460    }
461
462    fn init_peers_next_index_and_match_index(
463        &mut self,
464        last_entry_id: u64,
465        peer_ids: Vec<u32>,
466    ) -> Result<()> {
467        for peer_id in peer_ids {
468            debug!("init leader state for peer_id: {:?}", peer_id);
469            let new_next_id = last_entry_id + 1;
470            self.update_next_index(peer_id, new_next_id)?;
471            self.update_match_index(peer_id, 0)?;
472        }
473        Ok(())
474    }
475
476    /// Track no-op entry index for linearizable read optimization.
477    /// Called after no-op entry is committed on leadership transition.
478    fn on_noop_committed(
479        &mut self,
480        ctx: &RaftContext<Self::T>,
481    ) -> Result<()> {
482        let noop_index = ctx.raft_log().last_entry_id();
483        self.noop_log_id = Some(noop_index);
484        debug!("Tracked noop_log_id: {}", noop_index);
485        Ok(())
486    }
487    fn noop_log_id(&self) -> Result<Option<u64>> {
488        Ok(self.noop_log_id)
489    }
490
491    /// Verifies leadership status using persistent retry until timeout.
492    ///
493    /// This function is designed for critical operations like configuration changes
494    /// that must eventually succeed. It implements:
495    ///   - Infinite retries with exponential backoff
496    ///   - Jitter randomization to prevent synchronization
497    ///   - Termination only on success, leadership loss, or global timeout
498    ///
499    /// # Parameters
500    /// - `payloads`: Log entries to verify
501    /// - `bypass_queue`: Whether to skip request queues for direct transmission
502    /// - `ctx`: Raft execution context
503    /// - `role_tx`: Channel for role transition events
504    ///
505    /// # Returns
506    /// - `Ok(true)`: Quorum verification succeeded
507    /// - `Ok(false)`: Leadership definitively lost during verification
508    /// - `Err(_)`: Global timeout exceeded or critical failure occurred
509    async fn verify_leadership_persistent(
510        &mut self,
511        payloads: Vec<EntryPayload>,
512        ctx: &RaftContext<T>,
513        role_tx: &mpsc::UnboundedSender<RoleEvent>,
514    ) -> Result<bool> {
515        let initial_delay =
516            Duration::from_millis(ctx.node_config.retry.internal_quorum.base_delay_ms);
517        let max_delay = Duration::from_millis(ctx.node_config.retry.internal_quorum.max_delay_ms);
518        let global_timeout = ctx.node_config.raft.membership.verify_leadership_persistent_timeout;
519
520        let mut current_delay = initial_delay;
521        let start_time = Instant::now();
522
523        loop {
524            match self.verify_internal_quorum(payloads.clone(), ctx, role_tx).await {
525                Ok(QuorumVerificationResult::Success) => {
526                    return Ok(true);
527                }
528                Ok(QuorumVerificationResult::LeadershipLost) => return Ok(false),
529                Ok(QuorumVerificationResult::RetryRequired) => {
530                    // Check global timeout before retrying
531                    if start_time.elapsed() > global_timeout {
532                        return Err(NetworkError::GlobalTimeout(
533                            "Leadership verification timed out".to_string(),
534                        )
535                        .into());
536                    }
537
538                    current_delay =
539                        current_delay.checked_mul(2).unwrap_or(max_delay).min(max_delay);
540                    let jitter = Duration::from_millis(rand::random::<u64>() % 500);
541                    sleep(current_delay + jitter).await;
542                }
543                Err(e) => return Err(e),
544            }
545        }
546    }
547
548    /// Note: This method acts as the centerialized internal Raft client
549    ///
550    /// Check leadership quorum verification Immidiatelly
551    ///
552    /// - Bypasses all queues with direct RPC transmission
553    /// - Enforces synchronous quorum validation
554    /// - Guarantees real-time network visibility
555    ///
556    /// Scenario handling summary:
557    ///
558    /// 1. Quorum achieved:
559    ///    - Return: `Ok(QuorumVerificationResult::Success)`
560    ///
561    /// 2. Quorum NOT achieved (verifiable):
562    ///    - Return: `Ok(QuorumVerificationResult::RetryRequired)`
563    ///
564    /// 3. Quorum NOT achieved (non-verifiable):
565    ///    - Return: `Ok(QuorumVerificationResult::LeadershipLost)`
566    ///
567    /// 4. Partial timeouts:
568    ///    - Return: `Ok(QuorumVerificationResult::RetryRequired)`
569    ///
570    /// 5. All timeouts:
571    ///    - Return: `Ok(QuorumVerificationResult::RetryRequired)`
572    ///
573    /// 6. Higher term detected:
574    ///    - Return: `Err(HigherTerm)`
575    ///
576    /// 7. Critical failure (e.g., system or logic error):
577    ///    - Return: original error
578    /// Override: Initialize cluster metadata after becoming leader.
579    /// Must be called once after leader election succeeds.
580    async fn init_cluster_metadata(
581        &mut self,
582        membership: &Arc<T::M>,
583    ) -> Result<()> {
584        // Calculate total voter count including self as leader
585        let voters = membership.voters().await;
586        let total_voters = voters.len() + 1; // +1 for leader (self)
587
588        // Get all replication targets (voters + learners, excluding self)
589        let replication_targets = membership.replication_peers().await;
590
591        // Single-voter cluster: only this node is a voter (quorum = 1)
592        let single_voter = total_voters == 1;
593
594        self.cluster_metadata = ClusterMetadata {
595            single_voter,
596            total_voters,
597            replication_targets: replication_targets.clone(),
598        };
599
600        debug!(
601            "Initialized cluster metadata: single_voter={}, total_voters={}, replication_targets={}",
602            single_voter,
603            total_voters,
604            replication_targets.len()
605        );
606        Ok(())
607    }
608
609    async fn verify_internal_quorum(
610        &mut self,
611        payloads: Vec<EntryPayload>,
612        ctx: &RaftContext<T>,
613        role_tx: &mpsc::UnboundedSender<RoleEvent>,
614    ) -> Result<QuorumVerificationResult> {
615        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
616
617        self.execute_request_immediately(
618            RaftRequestWithSignal {
619                id: nanoid!(),
620                payloads,
621                senders: vec![resp_tx],
622                wait_for_apply_event: false,
623            },
624            ctx,
625            role_tx,
626        )
627        .await?;
628
629        // Wait for response with timeout
630        let timeout_duration =
631            Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
632        match timeout(timeout_duration, resp_rx).await {
633            // Case 1: Response received successfully and verification passed
634            Ok(Ok(Ok(response))) => {
635                debug!("Leadership check response: {:?}", response);
636
637                // Handle different response cases
638                Ok(if response.is_write_success() {
639                    QuorumVerificationResult::Success
640                } else if response.is_retry_required() {
641                    // Verifiable quorum failure
642                    QuorumVerificationResult::RetryRequired
643                } else {
644                    // Non-verifiable failure or explicit rejection
645                    QuorumVerificationResult::LeadershipLost
646                })
647            }
648
649            // Case 2: Received explicit rejection status
650            Ok(Ok(Err(status))) => {
651                warn!("Leadership rejected by follower: {status:?}");
652                Ok(QuorumVerificationResult::LeadershipLost)
653            }
654
655            // Case 3: Channel communication failure (unrecoverable error)
656            Ok(Err(e)) => {
657                error!("Channel error during leadership check: {:?}", e);
658                Err(NetworkError::SingalReceiveFailed(e.to_string()).into())
659            }
660
661            // Case 4: Waiting for response timeout
662            Err(_) => {
663                warn!("Leadership check timed out after {:?}", timeout_duration);
664                Err(NetworkError::Timeout {
665                    node_id: self.node_id(),
666                    duration: timeout_duration,
667                }
668                .into())
669            }
670        }
671    }
672
673    fn is_leader(&self) -> bool {
674        true
675    }
676
677    fn become_leader(&self) -> Result<RaftRole<T>> {
678        warn!("I am leader already");
679
680        Err(StateTransitionError::InvalidTransition.into())
681    }
682
683    fn become_candidate(&self) -> Result<RaftRole<T>> {
684        error!("Leader can not become Candidate");
685
686        Err(StateTransitionError::InvalidTransition.into())
687    }
688
689    fn become_follower(&self) -> Result<RaftRole<T>> {
690        info!(
691            "Node {} term {} transitioning to Follower",
692            self.node_id(),
693            self.current_term(),
694        );
695        println!(
696            "[Node {}] Leader → Follower (term {})",
697            self.node_id(),
698            self.current_term()
699        );
700        Ok(RaftRole::Follower(Box::new(self.into())))
701    }
702
703    fn become_learner(&self) -> Result<RaftRole<T>> {
704        error!("Leader can not become Learner");
705
706        Err(StateTransitionError::InvalidTransition.into())
707    }
708
709    fn is_timer_expired(&self) -> bool {
710        self.timer.is_expired()
711    }
712
713    /// Raft starts, we will check if we need reset all timer
714    fn reset_timer(&mut self) {
715        self.timer.reset_replication();
716    }
717
718    fn next_deadline(&self) -> Instant {
719        self.timer.next_deadline()
720    }
721
722    /// Trigger heartbeat now
723    async fn tick(
724        &mut self,
725        role_tx: &mpsc::UnboundedSender<RoleEvent>,
726        _raft_tx: &mpsc::Sender<RaftEvent>,
727        ctx: &RaftContext<T>,
728    ) -> Result<()> {
729        let now = Instant::now();
730        // Keep syncing leader_id (hot-path: ~5ns atomic store)
731        self.shared_state().set_current_leader(self.node_id());
732
733        // 1. Clear expired learners
734        if let Err(e) = self.run_periodic_maintenance(role_tx, ctx).await {
735            error!("Failed to run periodic maintenance: {}", e);
736        }
737
738        // 2. Heartbeat trigger check
739        // Send heartbeat if the replication timer expires
740        if now >= self.timer.replication_deadline() {
741            debug!(?now, "tick::reset_replication timer");
742
743            // Piggyback pending writes onto heartbeat; if nothing to write,
744            // send an empty AppendEntries to maintain leadership and reset timer.
745            let request = self.propose_buffer.flush();
746            self.send_heartbeat_or_batch(request, role_tx, ctx).await?;
747        }
748
749        Ok(())
750    }
751
752    fn drain_read_buffer(&mut self) -> Result<()> {
753        // Drain linearizable read buffer
754        let batch = self.linearizable_read_buffer.take_all();
755        if !batch.is_empty() {
756            warn!(
757                "Read batch: draining {} linearizable read requests due to role change",
758                batch.len()
759            );
760            for (_, sender) in batch {
761                let _ = sender.send(Err(tonic::Status::unavailable("Leader stepped down")));
762            }
763        }
764
765        // Drain lease read queue
766        if !self.lease_read_queue.is_empty() {
767            warn!(
768                "Read batch: draining {} lease read requests due to role change",
769                self.lease_read_queue.len()
770            );
771            for (_, sender) in self.lease_read_queue.drain(..) {
772                let _ = sender.send(Err(tonic::Status::unavailable("Leader stepped down")));
773            }
774        }
775
776        // Drain eventual read queue
777        if !self.eventual_read_queue.is_empty() {
778            warn!(
779                "Read batch: draining {} eventual read requests due to role change",
780                self.eventual_read_queue.len()
781            );
782            for (_, sender) in self.eventual_read_queue.drain(..) {
783                let _ = sender.send(Err(tonic::Status::unavailable("Leader stepped down")));
784            }
785        }
786
787        // Drain pending linearizable reads awaiting ApplyCompleted
788        if !self.pending_reads.is_empty() {
789            let count: usize = self.pending_reads.values().map(|b| b.len()).sum();
790            warn!(
791                "Read batch: draining {} pending linearizable reads due to role change",
792                count
793            );
794            for (_, batch) in std::mem::take(&mut self.pending_reads) {
795                for (_, sender) in batch {
796                    let _ = sender.send(Err(tonic::Status::unavailable("Leader stepped down")));
797                }
798            }
799        }
800
801        // Drain write buffer (propose_buffer)
802        if !self.propose_buffer.is_empty() {
803            warn!(
804                "Write batch: draining {} pending write requests due to role change",
805                self.propose_buffer.len()
806            );
807
808            if let Some(batch) = self.propose_buffer.flush() {
809                for sender in batch.senders {
810                    let _ = sender.send(Err(tonic::Status::failed_precondition("Not leader")));
811                }
812            }
813        }
814
815        Ok(())
816    }
817
818    fn push_client_cmd(
819        &mut self,
820        cmd: ClientCmd,
821        ctx: &RaftContext<Self::T>,
822    ) {
823        use crate::client_command_to_entry_payloads;
824
825        let backpressure = &ctx.node_config.raft.backpressure;
826
827        match cmd {
828            ClientCmd::Propose(req, sender) => {
829                let current_pending = self.propose_buffer.len();
830
831                // Record buffer utilization metric (with sampling)
832                if let Some(ref metrics) = self.backpressure_metrics {
833                    if backpressure.max_pending_writes > 0 {
834                        let utilization = (current_pending as f64
835                            / backpressure.max_pending_writes as f64)
836                            * 100.0;
837                        metrics.record_buffer_utilization(utilization, true);
838                    }
839                }
840
841                // Check write backpressure limit
842                if backpressure.should_reject_write(current_pending) {
843                    // Record rejection metric
844                    if let Some(ref metrics) = self.backpressure_metrics {
845                        metrics.record_rejection(true);
846                    }
847
848                    let _ = sender.send(Err(Status::resource_exhausted(
849                        "Too many pending write requests",
850                    )));
851                    return;
852                }
853
854                // Convert command to payload (will be merged in drain_batch)
855                if let Some(cmd) = req.command {
856                    let payload = client_command_to_entry_payloads(vec![cmd])
857                        .into_iter()
858                        .next()
859                        .expect("client_command_to_entry_payloads should return 1 element");
860
861                    // Push lightweight tuple directly (zero-copy, no Vec allocation)
862                    self.propose_buffer.push(payload, sender);
863                } else {
864                    // Empty command: reject
865                    let _ = sender.send(Err(Status::invalid_argument("Command is empty")));
866                }
867            }
868            ClientCmd::Read(req, sender) => {
869                // Determine effective read consistency policy
870                let effective_policy = self.determine_read_policy(&req);
871
872                // Route to appropriate buffer based on policy
873                match effective_policy {
874                    ServerReadConsistencyPolicy::LinearizableRead => {
875                        let current_pending = self.linearizable_read_buffer.len();
876
877                        // Record buffer utilization metric (with sampling)
878                        if let Some(ref metrics) = self.backpressure_metrics {
879                            if backpressure.max_pending_reads > 0 {
880                                let utilization = (current_pending as f64
881                                    / backpressure.max_pending_reads as f64)
882                                    * 100.0;
883                                metrics.record_buffer_utilization(utilization, false);
884                            }
885                        }
886
887                        // Check read backpressure limit
888                        if backpressure.should_reject_read(current_pending) {
889                            // Record rejection metric
890                            if let Some(ref metrics) = self.backpressure_metrics {
891                                metrics.record_rejection(false);
892                            }
893
894                            let _ = sender.send(Err(Status::resource_exhausted(
895                                "Too many pending read requests",
896                            )));
897                            return;
898                        }
899
900                        self.linearizable_read_buffer.push((req, sender));
901                    }
902                    ServerReadConsistencyPolicy::LeaseRead => {
903                        let current_pending = self.lease_read_queue.len();
904
905                        // Record buffer utilization metric (with sampling)
906                        if let Some(ref metrics) = self.backpressure_metrics {
907                            if backpressure.max_pending_reads > 0 {
908                                let utilization = (current_pending as f64
909                                    / backpressure.max_pending_reads as f64)
910                                    * 100.0;
911                                metrics.record_buffer_utilization(utilization, false);
912                            }
913                        }
914
915                        // Check read backpressure limit
916                        if backpressure.should_reject_read(current_pending) {
917                            // Record rejection metric
918                            if let Some(ref metrics) = self.backpressure_metrics {
919                                metrics.record_rejection(false);
920                            }
921
922                            let _ = sender.send(Err(Status::resource_exhausted(
923                                "Too many pending read requests",
924                            )));
925                            return;
926                        }
927
928                        self.lease_read_queue.push_back((req, sender));
929                    }
930                    ServerReadConsistencyPolicy::EventualConsistency => {
931                        let current_pending = self.eventual_read_queue.len();
932
933                        // Record buffer utilization metric (with sampling)
934                        if let Some(ref metrics) = self.backpressure_metrics {
935                            if backpressure.max_pending_reads > 0 {
936                                let utilization = (current_pending as f64
937                                    / backpressure.max_pending_reads as f64)
938                                    * 100.0;
939                                metrics.record_buffer_utilization(utilization, false);
940                            }
941                        }
942
943                        // Check read backpressure limit
944                        if backpressure.should_reject_read(current_pending) {
945                            // Record rejection metric
946                            if let Some(ref metrics) = self.backpressure_metrics {
947                                metrics.record_rejection(false);
948                            }
949
950                            let _ = sender.send(Err(Status::resource_exhausted(
951                                "Too many pending read requests",
952                            )));
953                            return;
954                        }
955
956                        self.eventual_read_queue.push_back((req, sender));
957                    }
958                }
959            }
960        }
961    }
962
963    async fn flush_cmd_buffers(
964        &mut self,
965        ctx: &RaftContext<Self::T>,
966        role_tx: &mpsc::UnboundedSender<RoleEvent>,
967    ) -> Result<()> {
968        // Drain-based: unconditionally flush all buffered commands
969        // No timeout/size checks - drain from channel already collected the batch
970
971        let has_writes = !self.propose_buffer.is_empty();
972        let has_reads = !self.linearizable_read_buffer.is_empty();
973
974        // Fast path: pure write workload (most common case, ~95%)
975        // Preserves original high-performance code path for write-heavy workloads
976        if has_writes && !has_reads {
977            if let Some(request) = self.propose_buffer.flush() {
978                self.process_batch(std::iter::once(request), role_tx, ctx).await?;
979            }
980        }
981        // Unified path: mixed write+read or pure read workloads
982        // Merges RPC for 2*RTT → 1*RTT optimization in mixed scenarios
983        else if has_writes || has_reads {
984            self.unified_write_and_linear_read(ctx, role_tx).await?;
985        }
986
987        // Process lease reads (immediate, no batching)
988        while let Some((req, sender)) = self.lease_read_queue.pop_front() {
989            if let Err(e) = self.process_lease_read(req, sender, ctx, role_tx).await {
990                error!("process_lease_read failed: {:?}", e);
991            }
992        }
993
994        // Process eventual consistency reads (immediate, no batching)
995        while let Some((req, sender)) = self.eventual_read_queue.pop_front() {
996            self.process_eventual_read(req, sender, ctx);
997        }
998
999        Ok(())
1000    }
1001
1002    async fn handle_raft_event(
1003        &mut self,
1004        raft_event: RaftEvent,
1005        ctx: &RaftContext<T>,
1006        role_tx: mpsc::UnboundedSender<RoleEvent>,
1007    ) -> Result<()> {
1008        let my_id = self.shared_state.node_id;
1009        let my_term = self.current_term();
1010
1011        match raft_event {
1012            // Leader receives RequestVote(term=X, candidate=Y)
1013            // 1. If X > currentTerm:
1014            // - Leader → Follower, currentTerm = X
1015            // - Replay event
1016            // 2. Else:
1017            // - Reply with VoteGranted=false, currentTerm=currentTerm
1018            RaftEvent::ReceiveVoteRequest(vote_request, sender) => {
1019                debug!(
1020                    "handle_raft_event::RaftEvent::ReceiveVoteRequest: {:?}",
1021                    &vote_request
1022                );
1023
1024                let my_term = self.current_term();
1025                if my_term < vote_request.term {
1026                    self.update_current_term(vote_request.term);
1027                    // Step down as Follower
1028                    self.send_become_follower_event(None, &role_tx)?;
1029
1030                    info!("Leader will not process Vote request, it should let Follower do it.");
1031                    send_replay_raft_event(
1032                        &role_tx,
1033                        RaftEvent::ReceiveVoteRequest(vote_request, sender),
1034                    )?;
1035                } else {
1036                    let last_log_id =
1037                        ctx.raft_log().last_log_id().unwrap_or(LogId { index: 0, term: 0 });
1038                    let response = VoteResponse {
1039                        term: my_term,
1040                        vote_granted: false,
1041                        last_log_index: last_log_id.index,
1042                        last_log_term: last_log_id.term,
1043                    };
1044                    sender.send(Ok(response)).map_err(|e| {
1045                        let error_str = format!("{e:?}");
1046                        error!("Failed to send: {}", error_str);
1047                        NetworkError::SingalSendFailed(error_str)
1048                    })?;
1049                }
1050            }
1051
1052            RaftEvent::ClusterConf(_metadata_request, sender) => {
1053                let cluster_conf = ctx
1054                    .membership()
1055                    .retrieve_cluster_membership_config(self.shared_state().current_leader())
1056                    .await;
1057                debug!("Leader receive ClusterConf: {:?}", &cluster_conf);
1058                if let Err(e) = sender.send(Ok(cluster_conf)) {
1059                    // Receiver timed out and dropped — this is normal, do not crash the node
1060                    error!(
1061                        "Failed to send ClusterConf response (receiver dropped): {:?}",
1062                        e
1063                    );
1064                }
1065            }
1066
1067            RaftEvent::ClusterConfUpdate(cluste_conf_change_request, sender) => {
1068                let current_conf_version = ctx.membership().get_cluster_conf_version().await;
1069                debug!(%current_conf_version, ?cluste_conf_change_request,
1070                    "handle_raft_event::RaftEvent::ClusterConfUpdate",
1071                );
1072
1073                // Reject the fake Leader append entries request
1074                if my_term >= cluste_conf_change_request.term {
1075                    let response = ClusterConfUpdateResponse::higher_term(
1076                        my_id,
1077                        my_term,
1078                        current_conf_version,
1079                    );
1080
1081                    sender.send(Ok(response)).map_err(|e| {
1082                        let error_str = format!("{e:?}");
1083                        error!("Failed to send: {}", error_str);
1084                        NetworkError::SingalSendFailed(error_str)
1085                    })?;
1086                } else {
1087                    // Step down as Follower as new Leader found
1088                    info!(
1089                        "my({}) term < request one, now I will step down to Follower",
1090                        my_id
1091                    );
1092                    //TODO: if there is a bug?  self.update_current_term(vote_request.term);
1093                    self.send_become_follower_event(Some(cluste_conf_change_request.id), &role_tx)?;
1094
1095                    info!(
1096                        "Leader will not process append_entries_request, it should let Follower do it."
1097                    );
1098                    send_replay_raft_event(
1099                        &role_tx,
1100                        RaftEvent::ClusterConfUpdate(cluste_conf_change_request, sender),
1101                    )?;
1102                }
1103            }
1104
1105            RaftEvent::AppendEntries(append_entries_request, sender) => {
1106                debug!(
1107                    "handle_raft_event::RaftEvent::AppendEntries: {:?}",
1108                    &append_entries_request
1109                );
1110
1111                // Reject the fake Leader append entries request
1112                if my_term >= append_entries_request.term {
1113                    let response = AppendEntriesResponse::higher_term(my_id, my_term);
1114
1115                    sender.send(Ok(response)).map_err(|e| {
1116                        let error_str = format!("{e:?}");
1117                        error!("Failed to send: {}", error_str);
1118                        NetworkError::SingalSendFailed(error_str)
1119                    })?;
1120                } else {
1121                    // Step down as Follower as new Leader found
1122                    info!(
1123                        "my({}) term < request one, now I will step down to Follower",
1124                        my_id
1125                    );
1126                    //TODO: if there is a bug?  self.update_current_term(vote_request.term);
1127                    self.send_become_follower_event(
1128                        Some(append_entries_request.leader_id),
1129                        &role_tx,
1130                    )?;
1131
1132                    info!(
1133                        "Leader will not process append_entries_request, it should let Follower do it."
1134                    );
1135                    send_replay_raft_event(
1136                        &role_tx,
1137                        RaftEvent::AppendEntries(append_entries_request, sender),
1138                    )?;
1139                }
1140            }
1141
1142            RaftEvent::InstallSnapshotChunk(_streaming, sender) => {
1143                sender
1144                    .send(Err(Status::permission_denied("Not Follower or Learner. ")))
1145                    .map_err(|e| {
1146                        let error_str = format!("{e:?}");
1147                        error!("Failed to send: {}", error_str);
1148                        NetworkError::SingalSendFailed(error_str)
1149                    })?;
1150
1151                return Err(ConsensusError::RoleViolation {
1152                    current_role: "Leader",
1153                    required_role: "Follower or Learner",
1154                    context: format!(
1155                        "Leader node {} receives RaftEvent::InstallSnapshotChunk",
1156                        ctx.node_id
1157                    ),
1158                }
1159                .into());
1160            }
1161
1162            RaftEvent::CreateSnapshotEvent => {
1163                // Prevent duplicate snapshot creation
1164                if self.snapshot_in_progress.load(std::sync::atomic::Ordering::Acquire) {
1165                    info!("Snapshot creation already in progress. Skipping duplicate request.");
1166                    return Ok(());
1167                }
1168
1169                self.snapshot_in_progress.store(true, std::sync::atomic::Ordering::Release);
1170                let state_machine_handler = ctx.state_machine_handler().clone();
1171
1172                // Use spawn to perform snapshot creation in the background
1173                tokio::spawn(async move {
1174                    let result = state_machine_handler.create_snapshot().await;
1175                    info!("SnapshotCreated event will be processed in another event thread");
1176                    if let Err(e) =
1177                        send_replay_raft_event(&role_tx, RaftEvent::SnapshotCreated(result))
1178                    {
1179                        error!("Failed to send snapshot creation result: {}", e);
1180                    }
1181                });
1182            }
1183
1184            RaftEvent::SnapshotCreated(result) => {
1185                self.snapshot_in_progress.store(false, Ordering::SeqCst);
1186
1187                match result {
1188                    Err(e) => {
1189                        error!(%e, "State machine snapshot creation failed");
1190                    }
1191                    Ok((
1192                        SnapshotMetadata {
1193                            last_included: last_included_option,
1194                            checksum: _,
1195                        },
1196                        _final_path,
1197                    )) => {
1198                        info!("Initiating log purge after snapshot creation");
1199
1200                        if let Some(last_included) = last_included_option {
1201                            // ----------------------
1202                            // Phase 1: Schedule log purge if possible
1203                            // ----------------------
1204                            trace!("Phase 1: Schedule log purge if possible");
1205                            if self.can_purge_logs(self.last_purged_index, last_included) {
1206                                trace!(?last_included, "Phase 1: Scheduling log purge");
1207                                self.scheduled_purge_upto(last_included);
1208                            }
1209
1210                            // ----------------------
1211                            // Phase 2: Execute local purge
1212                            // ----------------------
1213                            // Per Raft §7: Leader purges independently without peer coordination
1214                            trace!("Phase 2: Execute scheduled purge task");
1215                            debug!(?last_included, "Execute scheduled purge task");
1216                            if let Some(scheduled) = self.scheduled_purge_upto {
1217                                let purge_executor = ctx.purge_executor();
1218                                match purge_executor.execute_purge(scheduled).await {
1219                                    Ok(_) => {
1220                                        if let Err(e) = send_replay_raft_event(
1221                                            &role_tx,
1222                                            RaftEvent::LogPurgeCompleted(scheduled),
1223                                        ) {
1224                                            error!(%e, "Failed to notify purge completion");
1225                                        }
1226                                    }
1227                                    Err(e) => {
1228                                        error!(?e, ?scheduled, "Log purge execution failed");
1229                                    }
1230                                }
1231                            }
1232                        }
1233                    }
1234                }
1235            }
1236
1237            RaftEvent::LogPurgeCompleted(purged_id) => {
1238                // Ensure we don't regress the purge index
1239                if self.last_purged_index.map_or(true, |current| purged_id.index > current.index) {
1240                    debug!(
1241                        ?purged_id,
1242                        "Updating last purged index after successful execution"
1243                    );
1244                    self.last_purged_index = Some(purged_id);
1245                } else {
1246                    warn!(
1247                        ?purged_id,
1248                        ?self.last_purged_index,
1249                        "Received outdated purge completion, ignoring"
1250                    );
1251                }
1252            }
1253
1254            RaftEvent::JoinCluster(join_request, sender) => {
1255                debug!(?join_request, "Leader::RaftEvent::JoinCluster");
1256                self.handle_join_cluster(join_request, sender, ctx, &role_tx).await?;
1257            }
1258
1259            RaftEvent::DiscoverLeader(request, sender) => {
1260                debug!(?request, "Leader::RaftEvent::DiscoverLeader");
1261
1262                if let Some(meta) = ctx.membership().retrieve_node_meta(my_id).await {
1263                    let response = LeaderDiscoveryResponse {
1264                        leader_id: my_id,
1265                        leader_address: meta.address,
1266                        term: my_term,
1267                    };
1268                    sender.send(Ok(response)).map_err(|e| {
1269                        let error_str = format!("{e:?}");
1270                        error!("Failed to send: {}", error_str);
1271                        NetworkError::SingalSendFailed(error_str)
1272                    })?;
1273                    return Ok(());
1274                } else {
1275                    let msg = "Leader can not find its address? It must be a bug.";
1276                    error!("{}", msg);
1277                    panic!("{}", msg);
1278                }
1279            }
1280            RaftEvent::StreamSnapshot(request, sender) => {
1281                debug!("Leader::RaftEvent::StreamSnapshot");
1282
1283                // Get the latest snapshot metadata
1284                if let Some(metadata) = ctx.state_machine().snapshot_metadata() {
1285                    // Create response channel
1286                    let (response_tx, response_rx) =
1287                        mpsc::channel::<std::result::Result<Arc<SnapshotChunk>, Status>>(32);
1288                    // Convert to properly encoded tonic stream
1289                    let size = 1024 * 1024 * 1024; // 1GB max message size
1290                    let response_stream = create_production_snapshot_stream(response_rx, size);
1291                    // Immediately respond with the stream
1292                    sender.send(Ok(response_stream)).map_err(|e| {
1293                        let error_str = format!("{e:?}");
1294                        error!("Stream response failed: {}", error_str);
1295                        NetworkError::SingalSendFailed(error_str)
1296                    })?;
1297
1298                    // Spawn background transfer task
1299                    let state_machine_handler = ctx.state_machine_handler().clone();
1300                    let config = ctx.node_config.raft.snapshot.clone();
1301                    // Load snapshot data stream
1302                    let data_stream =
1303                        state_machine_handler.load_snapshot_data(metadata.clone()).await?;
1304
1305                    tokio::spawn(async move {
1306                        if let Err(e) = BackgroundSnapshotTransfer::<T>::run_pull_transfer(
1307                            request,
1308                            response_tx,
1309                            data_stream,
1310                            config,
1311                        )
1312                        .await
1313                        {
1314                            error!("StreamSnapshot failed: {:?}", e);
1315                        }
1316                    });
1317                } else {
1318                    warn!("No snapshot available for streaming");
1319                    sender.send(Err(Status::not_found("Snapshot not found"))).map_err(|e| {
1320                        let error_str = format!("{e:?}");
1321                        error!("Stream response failed: {}", error_str);
1322                        NetworkError::SingalSendFailed(error_str)
1323                    })?;
1324                }
1325            }
1326            RaftEvent::TriggerSnapshotPush { peer_id } => {
1327                if let Some(lastest_snapshot_metadata) = ctx.state_machine().snapshot_metadata() {
1328                    Self::trigger_background_snapshot(
1329                        peer_id,
1330                        lastest_snapshot_metadata,
1331                        ctx.state_machine_handler().clone(),
1332                        ctx.membership(),
1333                        ctx.node_config.raft.snapshot.clone(),
1334                    )
1335                    .await?;
1336                }
1337            }
1338
1339            RaftEvent::PromoteReadyLearners => {
1340                // SAFETY: Called from main event loop, no reentrancy issues
1341                info!(
1342                    "[Leader {}] ⚡ PromoteReadyLearners event received, pending_promotions: {:?}",
1343                    self.node_id(),
1344                    self.pending_promotions.iter().map(|p| p.node_id).collect::<Vec<_>>()
1345                );
1346                self.process_pending_promotions(ctx, &role_tx).await?;
1347            }
1348
1349            RaftEvent::MembershipApplied => {
1350                // Save old membership for comparison
1351                let old_replication_targets = self.cluster_metadata.replication_targets.clone();
1352
1353                // Refresh cluster metadata cache after membership change is applied
1354                debug!("Refreshing cluster metadata cache after membership change");
1355                if let Err(e) = self.update_cluster_metadata(&ctx.membership()).await {
1356                    warn!("Failed to update cluster metadata: {:?}", e);
1357                }
1358
1359                // CRITICAL FIX #218: Initialize replication state for newly added peers
1360                // Per Raft protocol: When new members join, Leader must initialize their next_index
1361                let newly_added: Vec<u32> = self
1362                    .cluster_metadata
1363                    .replication_targets
1364                    .iter()
1365                    .filter(|new_peer| {
1366                        !old_replication_targets.iter().any(|old_peer| old_peer.id == new_peer.id)
1367                    })
1368                    .map(|peer| peer.id)
1369                    .collect();
1370
1371                if !newly_added.is_empty() {
1372                    debug!(
1373                        "Initializing replication state for {} new peer(s): {:?}",
1374                        newly_added.len(),
1375                        newly_added
1376                    );
1377                    let last_entry_id = ctx.raft_log().last_entry_id();
1378                    if let Err(e) =
1379                        self.init_peers_next_index_and_match_index(last_entry_id, newly_added)
1380                    {
1381                        warn!("Failed to initialize next_index for new peers: {:?}", e);
1382                        // Non-fatal: next_index will use default value of 1,
1383                        // replication will still work but may be less efficient
1384                    }
1385                }
1386            }
1387
1388            RaftEvent::ApplyCompleted {
1389                last_index,
1390                results,
1391            } => {
1392                let num_results = results.len();
1393
1394                // Match apply results to pending client requests and send responses
1395                let responses: Vec<_> = results
1396                    .into_iter()
1397                    .filter_map(|r| {
1398                        self.pending_requests.remove(&r.index).map(|sender| (r, sender))
1399                    })
1400                    .collect();
1401
1402                for (result, sender) in responses {
1403                    let response = if result.succeeded {
1404                        ClientResponse::write_success()
1405                    } else {
1406                        ClientResponse::cas_failure()
1407                    };
1408
1409                    trace!(
1410                        "[Leader-{}] Sending response to client for index {}: succeeded={}",
1411                        self.node_id(),
1412                        result.index,
1413                        result.succeeded
1414                    );
1415
1416                    if sender.send(Ok(response)).is_err() {
1417                        trace!(
1418                            "[Leader-{}] Client receiver dropped for index {}",
1419                            self.node_id(),
1420                            result.index
1421                        );
1422                    }
1423                }
1424
1425                // Serve pending linearizable reads whose read_index <= last_index.
1426                let reads_to_serve: Vec<_> =
1427                    self.pending_reads.range(..=last_index).map(|(k, _)| *k).collect();
1428
1429                for read_index in reads_to_serve {
1430                    if let Some(batch) = self.pending_reads.remove(&read_index) {
1431                        self.execute_pending_reads(batch, ctx);
1432                    }
1433                }
1434
1435                // Check snapshot after SM apply — last_applied is now accurate.
1436                check_and_trigger_snapshot(
1437                    last_index,
1438                    Leader as i32,
1439                    self.current_term(),
1440                    ctx,
1441                    &role_tx,
1442                )?;
1443
1444                trace!(
1445                    "[Leader-{}] TIMING: process_apply_completed({} results)",
1446                    self.node_id(),
1447                    num_results,
1448                );
1449            }
1450
1451            RaftEvent::FatalError { source, error } => {
1452                error!("[Leader] Fatal error from {}: {}", source, error);
1453                let fatal_status = || tonic::Status::internal(format!("Node fatal error: {error}"));
1454                // Notify all pending write requests
1455                let pending: Vec<_> = self.pending_requests.drain().collect();
1456                if !pending.is_empty() {
1457                    warn!(
1458                        "[Leader] FatalError: notifying {} pending write requests",
1459                        pending.len()
1460                    );
1461                    for (_index, sender) in pending {
1462                        let _ = sender.send(Err(fatal_status()));
1463                    }
1464                }
1465                // Notify buffered linearizable reads (pre-flush)
1466                let lin_buf = self.linearizable_read_buffer.take_all();
1467                if !lin_buf.is_empty() {
1468                    warn!(
1469                        "[Leader] FatalError: notifying {} buffered linearizable reads",
1470                        lin_buf.len()
1471                    );
1472                    for (_, sender) in lin_buf {
1473                        let _ = sender.send(Err(fatal_status()));
1474                    }
1475                }
1476                // Notify pending linearizable reads awaiting ApplyCompleted
1477                if !self.pending_reads.is_empty() {
1478                    let count: usize = self.pending_reads.values().map(|b| b.len()).sum();
1479                    warn!(
1480                        "[Leader] FatalError: notifying {} pending linearizable reads",
1481                        count
1482                    );
1483                    for (_, batch) in std::mem::take(&mut self.pending_reads) {
1484                        for (_, sender) in batch {
1485                            let _ = sender.send(Err(fatal_status()));
1486                        }
1487                    }
1488                }
1489                // Notify queued lease reads
1490                if !self.lease_read_queue.is_empty() {
1491                    warn!(
1492                        "[Leader] FatalError: notifying {} queued lease reads",
1493                        self.lease_read_queue.len()
1494                    );
1495                    for (_, sender) in self.lease_read_queue.drain(..) {
1496                        let _ = sender.send(Err(fatal_status()));
1497                    }
1498                }
1499                // Notify queued eventual reads
1500                if !self.eventual_read_queue.is_empty() {
1501                    warn!(
1502                        "[Leader] FatalError: notifying {} queued eventual reads",
1503                        self.eventual_read_queue.len()
1504                    );
1505                    for (_, sender) in self.eventual_read_queue.drain(..) {
1506                        let _ = sender.send(Err(fatal_status()));
1507                    }
1508                }
1509                return Err(crate::Error::Fatal(format!(
1510                    "Fatal error from {source}: {error}"
1511                )));
1512            }
1513
1514            RaftEvent::StepDownSelfRemoved => {
1515                // Only Leader can propose configuration changes and remove itself
1516                // Per Raft protocol: Leader steps down immediately after self-removal
1517                warn!(
1518                    "[Leader-{}] Removed from cluster membership, stepping down to Follower",
1519                    self.node_id()
1520                );
1521                role_tx.send(RoleEvent::BecomeFollower(None)).map_err(|e| {
1522                    error!(
1523                        "[Leader-{}] Failed to send BecomeFollower after self-removal: {:?}",
1524                        self.node_id(),
1525                        e
1526                    );
1527                    NetworkError::SingalSendFailed(format!(
1528                        "BecomeFollower after self-removal: {e:?}"
1529                    ))
1530                })?;
1531                return Ok(());
1532            }
1533        }
1534
1535        Ok(())
1536    }
1537}
1538
1539impl<T: TypeConfig> LeaderState<T> {
1540    /// Initialize cluster metadata after becoming leader.
1541    /// Update cluster metadata when membership changes.
1542    pub async fn update_cluster_metadata(
1543        &mut self,
1544        membership: &Arc<T::M>,
1545    ) -> Result<()> {
1546        // Calculate total voter count including self as leader
1547        let voters = membership.voters().await;
1548        let total_voters = voters.len() + 1; // +1 for leader (self)
1549
1550        // Get all replication targets (voters + learners, excluding self)
1551        let replication_targets = membership.replication_peers().await;
1552
1553        // Single-voter cluster: only this node is a voter (quorum = 1)
1554        let single_voter = total_voters == 1;
1555
1556        self.cluster_metadata = ClusterMetadata {
1557            single_voter,
1558            total_voters,
1559            replication_targets: replication_targets.clone(),
1560        };
1561
1562        debug!(
1563            "Updated cluster metadata: single_voter={}, total_voters={}, replication_targets={}",
1564            single_voter,
1565            total_voters,
1566            replication_targets.len()
1567        );
1568        Ok(())
1569    }
1570
1571    /// The fun will retrieve current state snapshot
1572    pub fn state_snapshot(&self) -> StateSnapshot {
1573        StateSnapshot {
1574            current_term: self.current_term(),
1575            voted_for: None,
1576            commit_index: self.commit_index(),
1577            role: Leader as i32,
1578        }
1579    }
1580
1581    /// The fun will retrieve current Leader state snapshot
1582    #[tracing::instrument]
1583    pub fn leader_state_snapshot(&self) -> LeaderStateSnapshot {
1584        LeaderStateSnapshot {
1585            next_index: self.next_index.clone(),
1586            match_index: self.match_index.clone(),
1587            noop_log_id: self.noop_log_id,
1588        }
1589    }
1590
1591    /// # Params
1592    /// Execute a Raft request immediately, bypassing the normal batching mechanism.
1593    ///
1594    /// This is used for quorum verification requests that must be sent immediately
1595    /// without waiting for batching (e.g., config changes, leadership verification).
1596    pub async fn execute_request_immediately(
1597        &mut self,
1598        raft_request_with_signal: RaftRequestWithSignal,
1599        ctx: &RaftContext<T>,
1600        role_tx: &mpsc::UnboundedSender<RoleEvent>,
1601    ) -> Result<()> {
1602        debug!(
1603            "Leader::execute_request_immediately, request_id: {}",
1604            raft_request_with_signal.id
1605        );
1606
1607        // Always bypass buffer for immediate execution (quorum verification)
1608        let batch = VecDeque::from([raft_request_with_signal]);
1609        self.process_batch(batch, role_tx, ctx).await?;
1610
1611        Ok(())
1612    }
1613
1614    /// Scenario handling summary:
1615    ///
1616    /// 1. Quorum achieved:
1617    ///    - Client response: `write_success()`
1618    ///    - State update: update peer indexes and commit index
1619    ///    - Return: `Ok(())`
1620    ///
1621    /// 2. Quorum NOT achieved (verifiable):
1622    ///    - Client response: `RetryRequired`
1623    ///    - State update: update peer indexes
1624    ///    - Return: `Ok(())`
1625    ///
1626    /// 3. Quorum NOT achieved (non-verifiable):
1627    ///    - Client response: `ProposeFailed`
1628    ///    - State update: update peer indexes
1629    ///    - Return: `Ok(())`
1630    ///
1631    /// 4. Partial timeouts:
1632    ///    - Client response: `ProposeFailed`
1633    ///    - State update: update only the peer indexes that responded
1634    ///    - Return: `Ok(())`
1635    ///
1636    /// 5. All timeouts:
1637    ///    - Client response: `ProposeFailed`
1638    ///    - State update: no update
1639    ///    - Return: `Ok(())`
1640    ///
1641    /// 6. Higher term detected:
1642    ///    - Client response: `TermOutdated`
1643    ///    - State update: update term and convert to follower
1644    ///    - Return: `Err(HigherTerm)`
1645    ///
1646    /// 7. Critical failure (e.g., system or logic error):
1647    ///    - Client response: `ProposeFailed`
1648    ///    - State update: none
1649    ///    - Return: original error
1650    pub async fn process_batch(
1651        &mut self,
1652        batch: impl IntoIterator<Item = RaftRequestWithSignal>,
1653        role_tx: &mpsc::UnboundedSender<RoleEvent>,
1654        ctx: &RaftContext<T>,
1655    ) -> Result<()> {
1656        self.timer.reset_replication();
1657
1658        let start_index = ctx.raft_log().last_entry_id() + 1;
1659        let (payloads, write_metadata) = Self::merge_batch_to_write_metadata(batch, start_index);
1660
1661        if !payloads.is_empty() {
1662            trace!(?payloads, "[Node-{}] process_batch", ctx.node_id);
1663        }
1664
1665        self.execute_and_process_raft_rpc(payloads, write_metadata, None, ctx, role_tx)
1666            .await
1667    }
1668
1669    /// Heartbeat dispatch: piggyback a write batch if available, otherwise send
1670    /// an empty AppendEntries to maintain leadership. Always resets the replication
1671    /// timer — the intent is explicit here rather than hidden inside `process_batch`.
1672    async fn send_heartbeat_or_batch(
1673        &mut self,
1674        request: Option<RaftRequestWithSignal>,
1675        role_tx: &mpsc::UnboundedSender<RoleEvent>,
1676        ctx: &RaftContext<T>,
1677    ) -> Result<()> {
1678        self.timer.reset_replication();
1679        match request {
1680            Some(req) => {
1681                // Has pending writes — process as a real batch.
1682                let start_index = ctx.raft_log().last_entry_id() + 1;
1683                let (payloads, write_metadata) =
1684                    Self::merge_batch_to_write_metadata(std::iter::once(req), start_index);
1685                self.execute_and_process_raft_rpc(payloads, write_metadata, None, ctx, role_tx)
1686                    .await
1687            }
1688            None => {
1689                // No pending writes — send empty AppendEntries as heartbeat.
1690                self.execute_and_process_raft_rpc(vec![], None, None, ctx, role_tx).await
1691            }
1692        }
1693    }
1694
1695    /// Update peer node index
1696    #[instrument(skip(self))]
1697    fn update_peer_indexes(
1698        &mut self,
1699        peer_updates: &HashMap<u32, PeerUpdate>,
1700    ) {
1701        for (peer_id, update) in peer_updates {
1702            if let Err(e) = self.update_next_index(*peer_id, update.next_index) {
1703                error!("Failed to update next index: {:?}", e);
1704            }
1705            trace!(
1706                "Updated next index for peer {}-{}",
1707                peer_id, update.next_index
1708            );
1709            if let Some(match_index) = update.match_index {
1710                if let Err(e) = self.update_match_index(*peer_id, match_index) {
1711                    error!("Failed to update match index: {:?}", e);
1712                }
1713                trace!("Updated match index for peer {}-{}", peer_id, match_index);
1714            }
1715        }
1716    }
1717
1718    pub async fn check_learner_progress(
1719        &mut self,
1720        learner_progress: &HashMap<u32, Option<u64>>,
1721        ctx: &RaftContext<T>,
1722        role_tx: &mpsc::UnboundedSender<RoleEvent>,
1723    ) -> Result<()> {
1724        debug!(?learner_progress, "check_learner_progress");
1725
1726        if !self.should_check_learner_progress(ctx) {
1727            return Ok(());
1728        }
1729
1730        if learner_progress.is_empty() {
1731            return Ok(());
1732        }
1733
1734        let ready_learners = self.find_promotable_learners(learner_progress, ctx).await;
1735        let new_promotions = self.deduplicate_promotions(ready_learners);
1736
1737        if !new_promotions.is_empty() {
1738            self.enqueue_and_notify_promotions(new_promotions, role_tx)?;
1739        }
1740
1741        Ok(())
1742    }
1743
1744    /// Check if enough time has elapsed since last learner progress check
1745    fn should_check_learner_progress(
1746        &mut self,
1747        ctx: &RaftContext<T>,
1748    ) -> bool {
1749        let throttle_interval =
1750            Duration::from_millis(ctx.node_config().raft.learner_check_throttle_ms);
1751        if self.last_learner_check.elapsed() < throttle_interval {
1752            return false;
1753        }
1754        self.last_learner_check = Instant::now();
1755        true
1756    }
1757
1758    /// Find learners that are caught up and eligible for promotion
1759    async fn find_promotable_learners(
1760        &self,
1761        learner_progress: &HashMap<u32, Option<u64>>,
1762        ctx: &RaftContext<T>,
1763    ) -> Vec<u32> {
1764        let leader_commit = self.commit_index();
1765        let threshold = ctx.node_config().raft.learner_catchup_threshold;
1766        let membership = ctx.membership();
1767
1768        let mut ready_learners = Vec::new();
1769
1770        for (&node_id, &match_index_opt) in learner_progress.iter() {
1771            if !membership.contains_node(node_id).await {
1772                continue;
1773            }
1774
1775            if !self.is_learner_caught_up(match_index_opt, leader_commit, threshold) {
1776                continue;
1777            }
1778
1779            let node_status =
1780                membership.get_node_status(node_id).await.unwrap_or(NodeStatus::ReadOnly);
1781            if !node_status.is_promotable() {
1782                debug!(
1783                    ?node_id,
1784                    ?node_status,
1785                    "Learner caught up but status is not Promotable, skipping"
1786                );
1787                continue;
1788            }
1789
1790            debug!(
1791                ?node_id,
1792                match_index = ?match_index_opt.unwrap_or(0),
1793                ?leader_commit,
1794                gap = leader_commit.saturating_sub(match_index_opt.unwrap_or(0)),
1795                "Learner caught up"
1796            );
1797            ready_learners.push(node_id);
1798        }
1799
1800        ready_learners
1801    }
1802
1803    /// Check if learner has caught up with leader based on log gap
1804    fn is_learner_caught_up(
1805        &self,
1806        match_index: Option<u64>,
1807        leader_commit: u64,
1808        threshold: u64,
1809    ) -> bool {
1810        let match_index = match_index.unwrap_or(0);
1811        let gap = leader_commit.saturating_sub(match_index);
1812        gap <= threshold
1813    }
1814
1815    /// Remove learners already in pending promotions queue
1816    fn deduplicate_promotions(
1817        &self,
1818        ready_learners: Vec<u32>,
1819    ) -> Vec<u32> {
1820        let already_pending: std::collections::HashSet<_> =
1821            self.pending_promotions.iter().map(|p| p.node_id).collect();
1822
1823        ready_learners.into_iter().filter(|id| !already_pending.contains(id)).collect()
1824    }
1825
1826    /// Add promotions to queue and send notification event
1827    fn enqueue_and_notify_promotions(
1828        &mut self,
1829        new_promotions: Vec<u32>,
1830        role_tx: &mpsc::UnboundedSender<RoleEvent>,
1831    ) -> Result<()> {
1832        info!(
1833            ?new_promotions,
1834            "Learners caught up, adding to pending promotions"
1835        );
1836
1837        for node_id in new_promotions {
1838            self.pending_promotions
1839                .push_back(PendingPromotion::new(node_id, Instant::now()));
1840        }
1841
1842        role_tx
1843            .send(RoleEvent::ReprocessEvent(Box::new(
1844                RaftEvent::PromoteReadyLearners,
1845            )))
1846            .map_err(|e| {
1847                let error_str = format!("{e:?}");
1848                error!("Failed to send PromoteReadyLearners: {}", error_str);
1849                Error::System(SystemError::Network(NetworkError::SingalSendFailed(
1850                    error_str,
1851                )))
1852            })?;
1853
1854        Ok(())
1855    }
1856
1857    #[allow(dead_code)]
1858    pub async fn batch_promote_learners(
1859        &mut self,
1860        ready_learners_ids: Vec<u32>,
1861        ctx: &RaftContext<T>,
1862        role_tx: &mpsc::UnboundedSender<RoleEvent>,
1863    ) -> Result<()> {
1864        // 1. Determine optimal promotion status based on quorum safety
1865        debug!("1. Determine optimal promotion status based on quorum safety");
1866        let membership = ctx.membership();
1867        let current_voters = membership.voters().await.len();
1868        // let syncings = membership.nodes_with_status(NodeStatus::Syncing).len();
1869        let new_active_count = current_voters + ready_learners_ids.len();
1870
1871        // Determine target status based on quorum safety
1872        trace!(
1873            ?current_voters,
1874            ?ready_learners_ids,
1875            "[Node-{}] new_active_count: {}",
1876            self.node_id(),
1877            new_active_count
1878        );
1879        let target_status = if ensure_safe_join(self.node_id(), new_active_count).is_ok() {
1880            trace!(
1881                "Going to update nodes-{:?} status to Active",
1882                ready_learners_ids
1883            );
1884            NodeStatus::Active
1885        } else {
1886            trace!(
1887                "Not enough quorum to promote learners: {:?}",
1888                ready_learners_ids
1889            );
1890            return Ok(());
1891        };
1892
1893        // 2. Create configuration change payload
1894        debug!("2. Create configuration change payload");
1895        let config_change = Change::BatchPromote(BatchPromote {
1896            node_ids: ready_learners_ids.clone(),
1897            new_status: target_status as i32,
1898        });
1899
1900        info!(?config_change, "Replicating cluster config");
1901
1902        // 3. Submit single config change for all ready learners
1903        debug!("3. Submit single config change for all ready learners");
1904        // Use verify_leadership_persistent for config changes (must eventually succeed)
1905        match self
1906            .verify_leadership_persistent(vec![EntryPayload::config(config_change)], ctx, role_tx)
1907            .await
1908        {
1909            Ok(true) => {
1910                info!(
1911                    "Batch promotion committed for nodes: {:?}",
1912                    ready_learners_ids
1913                );
1914            }
1915            Ok(false) => {
1916                warn!("Failed to commit batch promotion");
1917            }
1918            Err(e) => {
1919                error!("Batch promotion error: {:?}", e);
1920                return Err(e);
1921            }
1922        }
1923
1924        Ok(())
1925    }
1926
1927    /// Calculate new submission index
1928    #[instrument(skip(self))]
1929    fn calculate_new_commit_index(
1930        &mut self,
1931        raft_log: &Arc<ROF<T>>,
1932        peer_updates: &HashMap<u32, PeerUpdate>,
1933    ) -> Option<u64> {
1934        let old_commit_index = self.commit_index();
1935        let current_term = self.current_term();
1936
1937        let matched_ids: Vec<u64> =
1938            peer_updates.keys().filter_map(|&id| self.match_index(id)).collect();
1939
1940        let new_commit_index =
1941            raft_log.calculate_majority_matched_index(current_term, old_commit_index, matched_ids);
1942
1943        if new_commit_index.is_some() && new_commit_index.unwrap() > old_commit_index {
1944            new_commit_index
1945        } else {
1946            None
1947        }
1948    }
1949
1950    #[allow(dead_code)]
1951    fn if_update_commit_index(
1952        &self,
1953        new_commit_index_option: Option<u64>,
1954    ) -> (bool, u64) {
1955        let current_commit_index = self.commit_index();
1956        if let Some(new_commit_index) = new_commit_index_option {
1957            debug!("Leader::update_commit_index: {:?}", new_commit_index);
1958            if current_commit_index < new_commit_index {
1959                return (true, new_commit_index);
1960            }
1961        }
1962        debug!("Leader::update_commit_index: false");
1963        (false, current_commit_index)
1964    }
1965
1966    /// Calculate safe read index for linearizable reads.
1967    ///
1968    /// Returns max(commitIndex, noopIndex) to ensure:
1969    /// 1. All committed data is visible
1970    /// 2. Current term's no-op entry is accounted for
1971    /// 3. Read index is fixed at request arrival time (avoids waiting for concurrent writes)
1972    ///
1973    /// This optimization prevents reads from waiting for writes that arrived
1974    /// after the read request started processing.
1975    #[doc(hidden)]
1976    pub fn calculate_read_index(&self) -> u64 {
1977        let commit_index = self.commit_index();
1978        let noop_index = self.noop_log_id.unwrap_or(0);
1979        std::cmp::max(commit_index, noop_index)
1980    }
1981
1982    /// Wait for state machine to apply up to target index.
1983    ///
1984    /// This is used by linearizable reads to ensure they see all committed data
1985    /// up to the read_index calculated at request arrival time.
1986    #[doc(hidden)]
1987    pub async fn wait_until_applied(
1988        &self,
1989        target_index: u64,
1990        state_machine_handler: &Arc<SMHOF<T>>,
1991        last_applied: u64,
1992    ) -> Result<()> {
1993        if last_applied < target_index {
1994            state_machine_handler.update_pending(target_index);
1995
1996            let timeout_ms = self.node_config.raft.read_consistency.state_machine_sync_timeout_ms;
1997            state_machine_handler
1998                .wait_applied(target_index, std::time::Duration::from_millis(timeout_ms))
1999                .await?;
2000
2001            debug!("wait_until_applied: target_index={} success", target_index);
2002        }
2003        Ok(())
2004    }
2005
2006    #[instrument(skip(self))]
2007    fn scheduled_purge_upto(
2008        &mut self,
2009        received_last_included: LogId,
2010    ) {
2011        if let Some(existing) = self.scheduled_purge_upto {
2012            if existing.index >= received_last_included.index {
2013                warn!(
2014                    ?received_last_included,
2015                    ?existing,
2016                    "Will not update scheduled_purge_upto, received invalid last_included log"
2017                );
2018                return;
2019            }
2020        }
2021        info!(?self.scheduled_purge_upto, ?received_last_included, "Updte scheduled_purge_upto.");
2022        self.scheduled_purge_upto = Some(received_last_included);
2023    }
2024
2025    fn send_become_follower_event(
2026        &self,
2027        new_leader_id: Option<u32>,
2028        role_tx: &mpsc::UnboundedSender<RoleEvent>,
2029    ) -> Result<()> {
2030        info!(
2031            ?new_leader_id,
2032            "Leader is going to step down as Follower..."
2033        );
2034        role_tx.send(RoleEvent::BecomeFollower(new_leader_id)).map_err(|e| {
2035            let error_str = format!("{e:?}");
2036            error!("Failed to send: {}", error_str);
2037            NetworkError::SingalSendFailed(error_str)
2038        })?;
2039
2040        Ok(())
2041    }
2042
2043    /// Determines if logs prior to `last_included_in_snapshot` can be permanently discarded.
2044    ///
2045    /// Implements Leader-side log compaction safety checks per Raft paper §7.2:
2046    /// > "The leader uses a new RPC called InstallSnapshot to send snapshots to followers that are
2047    /// > too far behind"
2048    ///
2049    /// # Safety Invariants (ALL must hold)
2050    ///
2051    /// 1. **Committed Entry Guarantee**   `last_included_in_snapshot.index < self.commit_index`
2052    ///    - Ensures we never discard uncommitted entries (Raft §5.4.2)
2053    ///    - Maintains at least one committed entry after purge for log matching property
2054    ///
2055    /// 2. **Monotonic Snapshot Advancement**   `last_purge_index < last_included_in_snapshot.index`
2056    ///    - Enforces snapshot indices strictly increase (prevents rollback attacks)
2057    ///    - Maintains sequential purge ordering (FSM safety requirement)
2058    ///
2059    /// 3. **Operation Atomicity**   `pending_purge.is_none()`
2060    ///    - Ensures only one concurrent purge operation
2061    ///    - Critical for linearizable state machine semantics
2062    ///
2063    /// # Implementation Notes
2064    /// - Per Raft §7: "Each server compacts its log independently"
2065    /// - Leader purges immediately after snapshot without waiting for followers
2066    /// - Lagging followers recover via InstallSnapshot RPC
2067    /// - Actual log discard should be deferred until storage confirms snapshot persistence
2068    #[instrument(skip(self))]
2069    pub fn can_purge_logs(
2070        &self,
2071        last_purge_index: Option<LogId>,
2072        last_included_in_snapshot: LogId,
2073    ) -> bool {
2074        let commit_index = self.commit_index();
2075        debug!(
2076            ?commit_index,
2077            ?last_purge_index,
2078            ?last_included_in_snapshot,
2079            "can_purge_logs"
2080        );
2081
2082        let monotonic_check = last_purge_index
2083            .map(|lid| lid.index < last_included_in_snapshot.index)
2084            .unwrap_or(true);
2085
2086        // Per Raft §7: Leader purges independently after snapshot
2087        // No peer coordination required - lagging followers get InstallSnapshot
2088        last_included_in_snapshot.index < commit_index && monotonic_check
2089    }
2090
2091    pub async fn handle_join_cluster(
2092        &mut self,
2093        join_request: JoinRequest,
2094        sender: MaybeCloneOneshotSender<std::result::Result<JoinResponse, Status>>,
2095        ctx: &RaftContext<T>,
2096        role_tx: &mpsc::UnboundedSender<RoleEvent>,
2097    ) -> Result<()> {
2098        let node_id = join_request.node_id;
2099        let node_role = join_request.node_role;
2100        let address = join_request.address;
2101        let status = join_request.status;
2102        let membership = ctx.membership();
2103
2104        // 1. Validate join request
2105        debug!("1. Validate join request");
2106        if membership.contains_node(node_id).await {
2107            let error_msg = format!("Node {node_id} already exists in cluster");
2108            warn!(%error_msg);
2109            return self.send_join_error(sender, MembershipError::NodeAlreadyExists(node_id)).await;
2110        }
2111
2112        // 2. Create configuration change payload
2113        debug!("2. Create configuration change payload");
2114        if let Err(e) = membership.can_rejoin(node_id, node_role).await {
2115            let error_msg = format!("Node {node_id} cannot rejoin: {e}",);
2116            warn!(%error_msg);
2117            return self
2118                .send_join_error(sender, MembershipError::JoinClusterError(error_msg))
2119                .await;
2120        }
2121
2122        let config_change = Change::AddNode(AddNode {
2123            node_id,
2124            address: address.clone(),
2125            status,
2126        });
2127
2128        // 3. Submit config change, and wait for quorum confirmation
2129        debug!("3. Wait for quorum confirmation");
2130        // Use verify_leadership_persistent for config changes (must eventually succeed)
2131        match self
2132            .verify_leadership_persistent(vec![EntryPayload::config(config_change)], ctx, role_tx)
2133            .await
2134        {
2135            Ok(true) => {
2136                // 4. Update node status to Syncing
2137                debug!("4. Update node status to Syncing");
2138
2139                debug!(
2140                    "After updating, the replications peers: {:?}",
2141                    ctx.membership().replication_peers().await
2142                );
2143
2144                // Note: Cluster metadata cache will be updated via MembershipApplied event
2145                // after CommitHandler applies the config change to membership state
2146
2147                // 5. Send successful response
2148                debug!("5. Send successful response");
2149                info!("Join config committed for node {}", node_id);
2150                self.send_join_success(node_id, &address, sender, ctx).await?;
2151            }
2152            Ok(false) => {
2153                warn!("Failed to commit join config for node {}", node_id);
2154                self.send_join_error(sender, MembershipError::CommitTimeout).await?
2155            }
2156            Err(e) => {
2157                error!("Error waiting for commit: {:?}", e);
2158                self.send_join_error(sender, e).await?
2159            }
2160        }
2161        Ok(())
2162    }
2163
2164    async fn send_join_success(
2165        &self,
2166        node_id: u32,
2167        address: &str,
2168        sender: MaybeCloneOneshotSender<std::result::Result<JoinResponse, Status>>,
2169        ctx: &RaftContext<T>,
2170    ) -> Result<()> {
2171        // Retrieve latest snapshot metadata, if there is
2172        let snapshot_metadata = ctx.state_machine_handler().get_latest_snapshot_metadata();
2173
2174        // Prepare response
2175        let response = JoinResponse {
2176            success: true,
2177            error: String::new(),
2178            config: Some(
2179                ctx.membership()
2180                    .retrieve_cluster_membership_config(self.shared_state().current_leader())
2181                    .await,
2182            ),
2183            config_version: ctx.membership().get_cluster_conf_version().await,
2184            snapshot_metadata,
2185            leader_id: self.node_id(),
2186        };
2187
2188        sender.send(Ok(response)).map_err(|e| {
2189            error!("Failed to send join response: {:?}", e);
2190            NetworkError::SingalSendFailed(format!("{e:?}"))
2191        })?;
2192
2193        info!(
2194            "Node {} ({}) successfully added as learner",
2195            node_id, address
2196        );
2197
2198        // Print leader accepting new node message (Plan B)
2199        crate::utils::cluster_printer::print_leader_accepting_new_node(
2200            self.node_id(),
2201            node_id,
2202            address,
2203            d_engine_proto::common::NodeRole::Learner as i32,
2204        );
2205
2206        Ok(())
2207    }
2208
2209    async fn send_join_error(
2210        &self,
2211        sender: MaybeCloneOneshotSender<std::result::Result<JoinResponse, Status>>,
2212        error: impl Into<Error>,
2213    ) -> Result<()> {
2214        let error = error.into();
2215        let status = Status::failed_precondition(error.to_string());
2216
2217        sender.send(Err(status)).map_err(|e| {
2218            error!("Failed to send join error: {:?}", e);
2219            NetworkError::SingalSendFailed(format!("{e:?}"))
2220        })?;
2221
2222        Err(error)
2223    }
2224
2225    #[cfg(any(test, feature = "__test_support"))]
2226    pub fn new(
2227        node_id: u32,
2228        node_config: Arc<RaftNodeConfig>,
2229    ) -> Self {
2230        let ReplicationConfig {
2231            rpc_append_entries_clock_in_ms,
2232            ..
2233        } = node_config.raft.replication;
2234
2235        let batch_size = node_config.raft.batching.max_batch_size;
2236
2237        let enable_batch = node_config.raft.metrics.enable_batch;
2238        // Initialize backpressure metrics (None if disabled)
2239        let backpressure_metrics = if node_config.raft.metrics.enable_backpressure {
2240            Some(Arc::new(BackpressureMetrics::new(
2241                node_id,
2242                true,
2243                node_config.raft.metrics.sample_rate,
2244            )))
2245        } else {
2246            None
2247        };
2248
2249        LeaderState {
2250            cluster_metadata: ClusterMetadata {
2251                single_voter: false,
2252                total_voters: 0,
2253                replication_targets: vec![],
2254            },
2255            shared_state: SharedState::new(node_id, None, None),
2256            timer: Box::new(ReplicationTimer::new(rpc_append_entries_clock_in_ms)),
2257            next_index: HashMap::new(),
2258            match_index: HashMap::new(),
2259            noop_log_id: None,
2260
2261            propose_buffer: Box::new(ProposeBatchBuffer::new(batch_size).with_length_gauge(
2262                node_id,
2263                "propose",
2264                enable_batch,
2265            )),
2266
2267            node_config,
2268            scheduled_purge_upto: None,
2269            last_purged_index: None, //TODO
2270            last_learner_check: Instant::now(),
2271            snapshot_in_progress: AtomicBool::new(false),
2272            next_membership_maintenance_check: Instant::now(),
2273            pending_promotions: VecDeque::new(),
2274            lease_timestamp: AtomicU64::new(0),
2275            linearizable_read_buffer: Box::new(BatchBuffer::new(batch_size).with_length_gauge(
2276                node_id,
2277                "linearizable",
2278                enable_batch,
2279            )),
2280            lease_read_queue: VecDeque::new(),
2281            eventual_read_queue: VecDeque::new(),
2282            pending_requests: HashMap::new(),
2283            pending_reads: BTreeMap::new(),
2284            backpressure_metrics,
2285            _marker: PhantomData,
2286        }
2287    }
2288
2289    pub async fn trigger_background_snapshot(
2290        node_id: u32,
2291        metadata: SnapshotMetadata,
2292        state_machine_handler: Arc<SMHOF<T>>,
2293        membership: Arc<MOF<T>>,
2294        config: SnapshotConfig,
2295    ) -> Result<()> {
2296        let (result_tx, result_rx) = oneshot::channel();
2297
2298        // Delegate the actual transfer to a dedicated thread pool
2299        tokio::task::spawn_blocking(move || {
2300            let rt = tokio::runtime::Handle::current();
2301            let result = rt.block_on(async move {
2302                let bulk_channel = membership
2303                    .get_peer_channel(node_id, ConnectionType::Bulk)
2304                    .await
2305                    .ok_or(NetworkError::PeerConnectionNotFound(node_id))?;
2306
2307                let data_stream =
2308                    state_machine_handler.load_snapshot_data(metadata.clone()).await?;
2309
2310                BackgroundSnapshotTransfer::<T>::run_push_transfer(
2311                    node_id,
2312                    data_stream,
2313                    bulk_channel,
2314                    config,
2315                )
2316                .await
2317            });
2318
2319            // Non-blocking send result
2320            let _ = result_tx.send(result);
2321        });
2322
2323        // Non-blocking check result
2324        tokio::spawn(async move {
2325            match result_rx.await {
2326                Ok(Ok(_)) => info!("Snapshot to {} completed", node_id),
2327                Ok(Err(e)) => error!("Snapshot to {} failed: {:?}", node_id, e),
2328                Err(_) => warn!("Snapshot result channel closed unexpectedly"),
2329            }
2330        });
2331
2332        Ok(())
2333    }
2334
2335    /// Processes all pending promotions while respecting the cluster's odd-node constraint
2336    pub async fn process_pending_promotions(
2337        &mut self,
2338        ctx: &RaftContext<T>,
2339        role_tx: &mpsc::UnboundedSender<RoleEvent>,
2340    ) -> Result<()> {
2341        debug!(
2342            "[Leader {}] 🔄 process_pending_promotions called, pending: {:?}",
2343            self.node_id(),
2344            self.pending_promotions.iter().map(|p| p.node_id).collect::<Vec<_>>()
2345        );
2346
2347        // Get promotion configuration from the node config
2348        let config = &ctx.node_config().raft.membership.promotion;
2349
2350        // Step 1: Remove stale entries (older than configured threshold)
2351        let now = Instant::now();
2352        self.pending_promotions.retain(|entry| {
2353            now.duration_since(entry.ready_since) <= config.stale_learner_threshold
2354        });
2355
2356        if self.pending_promotions.is_empty() {
2357            debug!(
2358                "[Leader {}] ❌ pending_promotions is empty after stale cleanup",
2359                self.node_id()
2360            );
2361            return Ok(());
2362        }
2363
2364        // Step 2: Get current voter count (including self as leader)
2365        let membership = ctx.membership();
2366        let current_voters = membership.voters().await.len() + 1; // +1 for self (leader)
2367        debug!(
2368            "[Leader {}] 📊 current_voters: {}, pending: {}",
2369            self.node_id(),
2370            current_voters,
2371            self.pending_promotions.len()
2372        );
2373
2374        // Step 3: Calculate the maximum batch size that preserves an odd total
2375        let max_batch_size =
2376            calculate_safe_batch_size(current_voters, self.pending_promotions.len());
2377        debug!(
2378            "[Leader {}] 🎯 max_batch_size: {}",
2379            self.node_id(),
2380            max_batch_size
2381        );
2382
2383        if max_batch_size == 0 {
2384            // Nothing we can safely promote now
2385            debug!(
2386                "[Leader {}] ⚠️ max_batch_size is 0, cannot promote now",
2387                self.node_id()
2388            );
2389            return Ok(());
2390        }
2391
2392        // Step 4: Extract the batch from the queue (FIFO order)
2393        let promotion_entries = self.drain_batch(max_batch_size);
2394        let promotion_node_ids = promotion_entries.iter().map(|e| e.node_id).collect::<Vec<_>>();
2395
2396        // Step 5: Execute batch promotion
2397        if !promotion_node_ids.is_empty() {
2398            // Log the batch promotion
2399            info!(
2400                "Promoting learner batch of {} nodes: {:?} (total voters: {} -> {})",
2401                promotion_node_ids.len(),
2402                promotion_node_ids,
2403                current_voters,
2404                current_voters + promotion_node_ids.len()
2405            );
2406
2407            // Attempt promotion and restore batch on failure
2408            let result = self.safe_batch_promote(promotion_node_ids.clone(), ctx, role_tx).await;
2409
2410            if let Err(e) = result {
2411                // Restore entries to the front of the queue in reverse order
2412                for entry in promotion_entries.into_iter().rev() {
2413                    self.pending_promotions.push_front(entry);
2414                }
2415                return Err(e);
2416            }
2417
2418            info!(
2419                "Promotion successful. Cluster members: {:?}",
2420                membership.voters().await
2421            );
2422        }
2423
2424        trace!(
2425            ?self.pending_promotions,
2426            "Step 6: Reschedule if any pending promotions remain"
2427        );
2428        // Step 6: Reschedule if any pending promotions remain
2429        if !self.pending_promotions.is_empty() {
2430            debug!(
2431                "[Leader {}] 🔁 Re-sending PromoteReadyLearners for remaining pending: {:?}",
2432                self.node_id(),
2433                self.pending_promotions.iter().map(|p| p.node_id).collect::<Vec<_>>()
2434            );
2435            // Important: Re-send the event to trigger next cycle
2436            role_tx
2437                .send(RoleEvent::ReprocessEvent(Box::new(
2438                    RaftEvent::PromoteReadyLearners,
2439                )))
2440                .map_err(|e| {
2441                    let error_str = format!("{e:?}");
2442                    error!("Send PromoteReadyLearners event failed: {}", error_str);
2443                    NetworkError::SingalSendFailed(error_str)
2444                })?;
2445        }
2446
2447        Ok(())
2448    }
2449
2450    /// Removes the first `count` nodes from the pending queue and returns them
2451    pub(super) fn drain_batch(
2452        &mut self,
2453        count: usize,
2454    ) -> Vec<PendingPromotion> {
2455        let mut batch = Vec::with_capacity(count);
2456        for _ in 0..count {
2457            if let Some(entry) = self.pending_promotions.pop_front() {
2458                batch.push(entry);
2459            } else {
2460                break;
2461            }
2462        }
2463        batch
2464    }
2465    /// Merge a batch of requests into unified write metadata for RPC processing.
2466    ///
2467    /// OR-combines wait_for_apply flags: if any request needs apply confirmation, all wait.
2468    /// This is safe because in practice all requests in a batch share the same flag
2469    /// (writes always true, noop/config always false — never mixed).
2470    pub(super) fn merge_batch_to_write_metadata(
2471        batch: impl IntoIterator<Item = RaftRequestWithSignal>,
2472        start_idx: u64,
2473    ) -> (Vec<EntryPayload>, Option<WriteMetadata>) {
2474        let mut all_payloads = Vec::new();
2475        let mut all_senders = Vec::new();
2476        let mut any_wait_for_apply = false;
2477
2478        for mut req in batch {
2479            all_payloads.extend(std::mem::take(&mut req.payloads));
2480            all_senders.extend(req.senders);
2481            any_wait_for_apply |= req.wait_for_apply_event;
2482        }
2483
2484        if all_payloads.is_empty() && all_senders.is_empty() {
2485            return (all_payloads, None);
2486        }
2487
2488        (
2489            all_payloads,
2490            Some((start_idx, all_senders, any_wait_for_apply)),
2491        )
2492    }
2493
2494    /// Core RPC execution and unified result processing.
2495    ///
2496    /// Shared by `process_batch` (write-only) and `unified_write_and_linear_read` (write+read).
2497    /// Handles all 4 result cases: quorum success, quorum failure, higher term, other errors.
2498    async fn execute_and_process_raft_rpc(
2499        &mut self,
2500        payloads: Vec<EntryPayload>,
2501        write_metadata: Option<WriteMetadata>,
2502        read_batch: Option<Vec<LinearizableReadRequest>>,
2503        ctx: &RaftContext<T>,
2504        role_tx: &mpsc::UnboundedSender<RoleEvent>,
2505    ) -> Result<()> {
2506        trace!(
2507            cluster_size = self.cluster_metadata.total_voters,
2508            payload_count = payloads.len(),
2509        );
2510
2511        let result = ctx
2512            .replication_handler()
2513            .handle_raft_request_in_batch(
2514                payloads,
2515                self.state_snapshot(),
2516                self.leader_state_snapshot(),
2517                &self.cluster_metadata,
2518                ctx,
2519            )
2520            .await;
2521
2522        debug!(?result, "execute_and_process_raft_rpc");
2523
2524        match result {
2525            Ok(AppendResults {
2526                commit_quorum_achieved: true,
2527                peer_updates,
2528                learner_progress,
2529            }) => {
2530                self.update_lease_timestamp();
2531                self.update_peer_indexes(&peer_updates);
2532                if let Err(e) = self.check_learner_progress(&learner_progress, ctx, role_tx).await {
2533                    error!(?e, "check_learner_progress failed");
2534                }
2535
2536                let new_commit_index = if self.cluster_metadata.single_voter {
2537                    let last_log_index = ctx.raft_log().last_entry_id();
2538                    if last_log_index > self.commit_index() {
2539                        Some(last_log_index)
2540                    } else {
2541                        None
2542                    }
2543                } else {
2544                    self.calculate_new_commit_index(ctx.raft_log(), &peer_updates)
2545                };
2546
2547                if let Some(new_commit_index) = new_commit_index {
2548                    debug!(
2549                        "[Leader-{}] New commit acknowledged: {}",
2550                        self.node_id(),
2551                        new_commit_index
2552                    );
2553                    self.update_commit_index_with_signal(
2554                        Leader as i32,
2555                        self.current_term(),
2556                        new_commit_index,
2557                        role_tx,
2558                    )?;
2559                }
2560
2561                if let Some((start_idx, senders, wait_for_apply)) = write_metadata {
2562                    if wait_for_apply {
2563                        for (i, sender) in senders.into_iter().enumerate() {
2564                            self.pending_requests.insert(start_idx + i as u64, sender);
2565                        }
2566                    } else {
2567                        for sender in senders {
2568                            let _ = sender.send(Ok(ClientResponse::write_success()));
2569                        }
2570                    }
2571                }
2572
2573                if let Some(read_batch) = read_batch {
2574                    let read_index = self.calculate_read_index();
2575                    let last_applied = ctx.state_machine().last_applied().index;
2576                    if last_applied >= read_index {
2577                        // Fast path: SM already caught up, serve immediately
2578                        self.execute_pending_reads(read_batch, ctx);
2579                    } else {
2580                        // Slow path: register for ApplyCompleted callback
2581                        self.pending_reads.entry(read_index).or_default().extend(read_batch);
2582                    }
2583                }
2584
2585                Ok(())
2586            }
2587
2588            Ok(AppendResults {
2589                commit_quorum_achieved: false,
2590                peer_updates,
2591                learner_progress,
2592            }) => {
2593                self.update_peer_indexes(&peer_updates);
2594                if let Err(e) = self.check_learner_progress(&learner_progress, ctx, role_tx).await {
2595                    error!(?e, "check_learner_progress failed");
2596                }
2597
2598                let responses_received = peer_updates.len();
2599                let cluster_size = self.cluster_metadata.total_voters;
2600                let error_code = if is_majority(responses_received, cluster_size) {
2601                    ErrorCode::RetryRequired
2602                } else {
2603                    ErrorCode::ProposeFailed
2604                };
2605
2606                if let Some((_, senders, _)) = write_metadata {
2607                    for sender in senders {
2608                        let _ = sender.send(Ok(ClientResponse::client_error(error_code)));
2609                    }
2610                }
2611                if let Some(read_batch) = read_batch {
2612                    let status = tonic::Status::failed_precondition("Quorum not reached");
2613                    for (_, sender) in read_batch {
2614                        let _ = sender.send(Err(status.clone()));
2615                    }
2616                }
2617
2618                Ok(())
2619            }
2620
2621            Err(Error::Consensus(ConsensusError::Replication(ReplicationError::HigherTerm(
2622                higher_term,
2623            )))) => {
2624                warn!("Higher term detected: {}", higher_term);
2625                self.update_current_term(higher_term);
2626                self.send_become_follower_event(None, role_tx)?;
2627
2628                if let Some((_, senders, _)) = write_metadata {
2629                    for sender in senders {
2630                        let _ =
2631                            sender.send(Ok(ClientResponse::client_error(ErrorCode::TermOutdated)));
2632                    }
2633                }
2634                if let Some(read_batch) = read_batch {
2635                    let status = tonic::Status::failed_precondition("Term outdated");
2636                    for (_, sender) in read_batch {
2637                        let _ = sender.send(Err(status.clone()));
2638                    }
2639                }
2640
2641                Err(ReplicationError::HigherTerm(higher_term).into())
2642            }
2643
2644            Err(e) => {
2645                error!("RPC failed: {:?}", e);
2646
2647                if let Some((_, senders, _)) = write_metadata {
2648                    for sender in senders {
2649                        let _ =
2650                            sender.send(Ok(ClientResponse::client_error(ErrorCode::ProposeFailed)));
2651                    }
2652                }
2653                if let Some(read_batch) = read_batch {
2654                    let error_msg = format!("RPC failed: {e}");
2655                    let status = tonic::Status::failed_precondition(error_msg);
2656                    for (_, sender) in read_batch {
2657                        let _ = sender.send(Err(status.clone()));
2658                    }
2659                }
2660
2661                Err(e)
2662            }
2663        }
2664    }
2665
2666    async fn safe_batch_promote(
2667        &mut self,
2668        batch: Vec<u32>,
2669        ctx: &RaftContext<T>,
2670        role_tx: &mpsc::UnboundedSender<RoleEvent>,
2671    ) -> Result<()> {
2672        let change = Change::BatchPromote(BatchPromote {
2673            node_ids: batch.clone(),
2674            new_status: NodeStatus::Active as i32,
2675        });
2676
2677        // Submit batch activation
2678        // Use verify_leadership_persistent for config changes (must eventually succeed)
2679        self.verify_leadership_persistent(vec![EntryPayload::config(change)], ctx, role_tx)
2680            .await?;
2681
2682        Ok(())
2683    }
2684
2685    async fn run_periodic_maintenance(
2686        &mut self,
2687        role_tx: &mpsc::UnboundedSender<RoleEvent>,
2688        ctx: &RaftContext<T>,
2689    ) -> Result<()> {
2690        if let Err(e) = self.conditionally_purge_stale_learners(role_tx, ctx).await {
2691            error!("Stale learner purge failed: {}", e);
2692        }
2693
2694        if let Err(e) = self.conditionally_purge_zombie_nodes(role_tx, ctx).await {
2695            error!("Zombie node purge failed: {}", e);
2696        }
2697
2698        // Regardless of whether a stale node is found, we set the next check according to a fixed
2699        // period
2700        self.reset_next_membership_maintenance_check(
2701            ctx.node_config().raft.membership.membership_maintenance_interval,
2702        );
2703        Ok(())
2704    }
2705
2706    /// Periodic check triggered every ~30s in the worst-case scenario
2707    /// using priority-based lazy scheduling. Actual average frequency
2708    /// is inversely proportional to system load.
2709    pub async fn conditionally_purge_stale_learners(
2710        &mut self,
2711        role_tx: &mpsc::UnboundedSender<RoleEvent>,
2712        ctx: &RaftContext<T>,
2713    ) -> Result<()> {
2714        let config = &ctx.node_config.raft.membership.promotion;
2715
2716        // Optimization: Skip check 99.9% of the time using scheduled trial method
2717        if self.pending_promotions.is_empty()
2718            || self.next_membership_maintenance_check > Instant::now()
2719        {
2720            trace!("Skipping stale learner check");
2721            return Ok(());
2722        }
2723
2724        let now = Instant::now();
2725        let queue_len = self.pending_promotions.len();
2726
2727        // Inspect only oldest 1% of items or max 100 entries per rules
2728        let inspect_count = queue_len.min(100).min(1.max(queue_len / 100));
2729        let mut stale_entries = Vec::new();
2730
2731        trace!("Inspecting {} entries", inspect_count);
2732        for _ in 0..inspect_count {
2733            if let Some(entry) = self.pending_promotions.pop_front() {
2734                trace!(
2735                    "Inspecting entry: {:?} - {:?} - {:?}",
2736                    entry,
2737                    now.duration_since(entry.ready_since),
2738                    &config.stale_learner_threshold
2739                );
2740                if now.duration_since(entry.ready_since) > config.stale_learner_threshold {
2741                    stale_entries.push(entry);
2742                } else {
2743                    // Return non-stale entry and stop
2744                    self.pending_promotions.push_front(entry);
2745                    break;
2746                }
2747            } else {
2748                break;
2749            }
2750        }
2751
2752        trace!("Stale learner check completed: {:?}", stale_entries);
2753
2754        // Process collected stale entries
2755        for entry in stale_entries {
2756            if let Err(e) = self.handle_stale_learner(entry.node_id, role_tx, ctx).await {
2757                error!("Failed to handle stale learner: {}", e);
2758            }
2759        }
2760
2761        Ok(())
2762    }
2763
2764    /// Remove non-Active zombie nodes that exceed failure threshold
2765    async fn conditionally_purge_zombie_nodes(
2766        &mut self,
2767        role_tx: &mpsc::UnboundedSender<RoleEvent>,
2768        ctx: &RaftContext<T>,
2769    ) -> Result<()> {
2770        // Optimize: get membership reference once to reduce lock contention
2771        let membership = ctx.membership();
2772        let zombie_candidates = membership.get_zombie_candidates().await;
2773        let mut nodes_to_remove = Vec::new();
2774
2775        for node_id in zombie_candidates {
2776            if let Some(status) = membership.get_node_status(node_id).await {
2777                if status != NodeStatus::Active {
2778                    nodes_to_remove.push(node_id);
2779                }
2780            }
2781        }
2782        // Batch removal if we have candidates
2783        if !nodes_to_remove.is_empty() {
2784            let change = Change::BatchRemove(BatchRemove {
2785                node_ids: nodes_to_remove.clone(),
2786            });
2787
2788            info!(
2789                "Proposing batch removal of zombie nodes: {:?}",
2790                nodes_to_remove
2791            );
2792
2793            // Submit single config change for all nodes
2794            // Use verify_leadership_persistent for config changes (must eventually succeed)
2795            match self
2796                .verify_leadership_persistent(vec![EntryPayload::config(change)], ctx, role_tx)
2797                .await
2798            {
2799                Ok(true) => {
2800                    info!("Batch removal committed for nodes: {:?}", nodes_to_remove);
2801                }
2802                Ok(false) => {
2803                    warn!("Failed to commit batch removal");
2804                }
2805                Err(e) => {
2806                    error!("Batch removal error: {:?}", e);
2807                    return Err(e);
2808                }
2809            }
2810        }
2811
2812        Ok(())
2813    }
2814
2815    pub fn reset_next_membership_maintenance_check(
2816        &mut self,
2817        membership_maintenance_interval: Duration,
2818    ) {
2819        self.next_membership_maintenance_check = Instant::now() + membership_maintenance_interval;
2820    }
2821
2822    /// Remove stalled learner via membership change consensus
2823    pub async fn handle_stale_learner(
2824        &mut self,
2825        node_id: u32,
2826        role_tx: &mpsc::UnboundedSender<RoleEvent>,
2827        ctx: &RaftContext<T>,
2828    ) -> Result<()> {
2829        // Stalled learner detected - remove via membership change (requires consensus)
2830        warn!(
2831            "Learner {} is stalled, removing from cluster via consensus",
2832            node_id
2833        );
2834
2835        let change = Change::BatchRemove(BatchRemove {
2836            node_ids: vec![node_id],
2837        });
2838
2839        // Submit removal through membership change entry (requires quorum consensus)
2840        // Use verify_leadership_persistent for config changes (must eventually succeed)
2841        match self
2842            .verify_leadership_persistent(vec![EntryPayload::config(change)], ctx, role_tx)
2843            .await
2844        {
2845            Ok(true) => {
2846                info!(
2847                    "Stalled learner {} successfully removed from cluster",
2848                    node_id
2849                );
2850            }
2851            Ok(false) => {
2852                warn!("Failed to commit removal of stalled learner {}", node_id);
2853            }
2854            Err(e) => {
2855                error!("Error removing stalled learner {}: {:?}", node_id, e);
2856                return Err(e);
2857            }
2858        }
2859
2860        Ok(())
2861    }
2862
2863    /// Check if current lease is still valid for LeaseRead policy
2864    pub fn is_lease_valid(
2865        &self,
2866        ctx: &RaftContext<T>,
2867    ) -> bool {
2868        let now = std::time::SystemTime::now()
2869            .duration_since(std::time::UNIX_EPOCH)
2870            .unwrap_or_default()
2871            .as_millis() as u64;
2872
2873        let last_confirmed = self.lease_timestamp.load(std::sync::atomic::Ordering::Acquire);
2874        let lease_duration = ctx.node_config().raft.read_consistency.lease_duration_ms;
2875
2876        if now < last_confirmed {
2877            // Clock moved backwards: system clock issue, treat lease as invalid
2878            error!("Clock moved backwards: Now {now}, Last Confirmed {last_confirmed}");
2879            return false;
2880        }
2881
2882        // Allow multiple requests within the same millisecond (now == last_confirmed)
2883        if now == last_confirmed {
2884            return true;
2885        }
2886        (now - last_confirmed) < lease_duration
2887    }
2888
2889    /// Update lease timestamp after successful leadership verification
2890    fn update_lease_timestamp(&self) {
2891        let now = std::time::SystemTime::now()
2892            .duration_since(std::time::UNIX_EPOCH)
2893            .unwrap_or_default()
2894            .as_millis() as u64;
2895
2896        self.lease_timestamp.store(now, std::sync::atomic::Ordering::Release);
2897    }
2898
2899    /// Verify leadership with quorum and refresh lease timestamp on success.
2900    /// This is a single-shot verification (fast-fail) optimized for read operations.
2901    ///
2902    /// Returns Ok(()) if verification succeeds and lease is refreshed.
2903    /// Returns Err on any verification failure.
2904    async fn verify_leadership_and_refresh_lease(
2905        &mut self,
2906        ctx: &RaftContext<T>,
2907        role_tx: &mpsc::UnboundedSender<RoleEvent>,
2908    ) -> Result<()> {
2909        match self.verify_internal_quorum(vec![], ctx, role_tx).await {
2910            Ok(QuorumVerificationResult::Success) => Ok(()),
2911            Ok(QuorumVerificationResult::LeadershipLost) => Err(ConsensusError::Replication(
2912                ReplicationError::NotLeader { leader_id: None },
2913            )
2914            .into()),
2915            Ok(QuorumVerificationResult::RetryRequired) => {
2916                Err(ConsensusError::Replication(ReplicationError::QuorumNotReached).into())
2917            }
2918            Err(e) => Err(e),
2919        }
2920    }
2921
2922    /// Unified write + linearizable read: single RPC for both (2*RTT → 1*RTT).
2923    ///
2924    /// Safety: write quorum verification also confirms leadership for reads (Raft §6.4).
2925    pub(super) async fn unified_write_and_linear_read(
2926        &mut self,
2927        ctx: &RaftContext<T>,
2928        role_tx: &mpsc::UnboundedSender<RoleEvent>,
2929    ) -> Result<()> {
2930        // Extract write metadata
2931        let (payloads, write_metadata) = if !self.propose_buffer.is_empty() {
2932            if let Some(req) = self.propose_buffer.flush() {
2933                let start_idx = ctx.raft_log().last_entry_id() + 1;
2934                (
2935                    req.payloads,
2936                    Some((start_idx, req.senders, req.wait_for_apply_event)),
2937                )
2938            } else {
2939                (Vec::new(), None)
2940            }
2941        } else {
2942            (Vec::new(), None)
2943        };
2944
2945        // Extract read batch
2946        let read_batch = if !self.linearizable_read_buffer.is_empty() {
2947            Some(self.linearizable_read_buffer.take_all())
2948        } else {
2949            None
2950        };
2951
2952        self.execute_and_process_raft_rpc(payloads, write_metadata, read_batch, ctx, role_tx)
2953            .await
2954    }
2955
2956    /// Execute a batch of reads against the state machine and respond to clients.
2957    /// Caller must ensure SM has already applied up to the required read_index.
2958    fn execute_pending_reads(
2959        &self,
2960        read_batch: impl IntoIterator<Item = LinearizableReadRequest>,
2961        ctx: &RaftContext<T>,
2962    ) {
2963        for (req, sender) in read_batch {
2964            let results = ctx
2965                .handlers
2966                .state_machine_handler
2967                .read_from_state_machine(req.keys)
2968                .unwrap_or_default();
2969            let _ = sender.send(Ok(ClientResponse::read_results(results)));
2970        }
2971    }
2972
2973    #[cfg(test)]
2974    pub(crate) fn test_update_lease_timestamp(&mut self) {
2975        self.update_lease_timestamp();
2976    }
2977
2978    /// Determine effective read consistency policy based on client request and server config
2979    pub(super) fn determine_read_policy(
2980        &self,
2981        req: &ClientReadRequest,
2982    ) -> ServerReadConsistencyPolicy {
2983        use d_engine_proto::client::ReadConsistencyPolicy as ClientReadConsistencyPolicy;
2984
2985        if req.has_consistency_policy() {
2986            // Client explicitly specified policy
2987            if self.node_config.raft.read_consistency.allow_client_override {
2988                match req.consistency_policy() {
2989                    ClientReadConsistencyPolicy::LeaseRead => {
2990                        ServerReadConsistencyPolicy::LeaseRead
2991                    }
2992                    ClientReadConsistencyPolicy::LinearizableRead => {
2993                        ServerReadConsistencyPolicy::LinearizableRead
2994                    }
2995                    ClientReadConsistencyPolicy::EventualConsistency => {
2996                        ServerReadConsistencyPolicy::EventualConsistency
2997                    }
2998                }
2999            } else {
3000                // Client override not allowed - use server default
3001                self.node_config.raft.read_consistency.default_policy.clone()
3002            }
3003        } else {
3004            // No client policy specified - use server default
3005            self.node_config.raft.read_consistency.default_policy.clone()
3006        }
3007    }
3008
3009    /// Process single lease-based read request
3010    pub(super) async fn process_lease_read(
3011        &mut self,
3012        req: ClientReadRequest,
3013        sender: MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
3014        ctx: &RaftContext<T>,
3015        role_tx: &mpsc::UnboundedSender<RoleEvent>,
3016    ) -> Result<()> {
3017        if self.is_lease_valid(ctx) {
3018            // Lease valid - serve immediately
3019            let results = ctx
3020                .handlers
3021                .state_machine_handler
3022                .read_from_state_machine(req.keys)
3023                .unwrap_or_default();
3024            let response = ClientResponse::read_results(results);
3025            let _ = sender.send(Ok(response));
3026        } else {
3027            // Lease expired - refresh and then serve
3028            if let Err(e) = self.verify_leadership_and_refresh_lease(ctx, role_tx).await {
3029                warn!("[Leader] Lease read: leadership verification failed: {e}");
3030                if sender
3031                    .send(Err(tonic::Status::unavailable(format!(
3032                        "Leadership verification failed: {e}"
3033                    ))))
3034                    .is_err()
3035                {
3036                    warn!(
3037                        "[Leader] Lease read: client already disconnected before error response could be sent"
3038                    );
3039                }
3040                return Err(e);
3041            }
3042            let results = ctx
3043                .handlers
3044                .state_machine_handler
3045                .read_from_state_machine(req.keys)
3046                .unwrap_or_default();
3047            let response = ClientResponse::read_results(results);
3048            let _ = sender.send(Ok(response));
3049        }
3050        Ok(())
3051    }
3052
3053    /// Process single eventual consistency read request
3054    fn process_eventual_read(
3055        &mut self,
3056        req: ClientReadRequest,
3057        sender: MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
3058        ctx: &RaftContext<T>,
3059    ) {
3060        // Eventual consistency: serve immediately without any verification
3061        let results = ctx
3062            .handlers
3063            .state_machine_handler
3064            .read_from_state_machine(req.keys)
3065            .unwrap_or_default();
3066        let response = ClientResponse::read_results(results);
3067        let _ = sender.send(Ok(response));
3068    }
3069}
3070
3071impl<T: TypeConfig> From<&CandidateState<T>> for LeaderState<T> {
3072    fn from(candidate: &CandidateState<T>) -> Self {
3073        let ReplicationConfig {
3074            rpc_append_entries_clock_in_ms,
3075            ..
3076        } = candidate.node_config.raft.replication;
3077
3078        // Clone shared_state and set self as leader immediately
3079        let shared_state = candidate.shared_state.clone();
3080        shared_state.set_current_leader(candidate.node_id());
3081
3082        // Initialize backpressure metrics (None if disabled)
3083        let backpressure_metrics = if candidate.node_config.raft.metrics.enable_backpressure {
3084            Some(Arc::new(BackpressureMetrics::new(
3085                candidate.node_id(),
3086                true,
3087                candidate.node_config.raft.metrics.sample_rate,
3088            )))
3089        } else {
3090            None
3091        };
3092
3093        Self {
3094            shared_state,
3095            timer: Box::new(ReplicationTimer::new(rpc_append_entries_clock_in_ms)),
3096            next_index: HashMap::new(),
3097            match_index: HashMap::new(),
3098            noop_log_id: None,
3099
3100            propose_buffer: Box::new(
3101                ProposeBatchBuffer::new(candidate.node_config.raft.batching.max_batch_size)
3102                    .with_length_gauge(
3103                        candidate.node_id(),
3104                        "propose",
3105                        candidate.node_config.raft.metrics.enable_batch,
3106                    ),
3107            ),
3108
3109            node_config: candidate.node_config.clone(),
3110
3111            scheduled_purge_upto: None,
3112            last_purged_index: candidate.last_purged_index,
3113            last_learner_check: Instant::now(),
3114            snapshot_in_progress: AtomicBool::new(false),
3115            next_membership_maintenance_check: Instant::now(),
3116            pending_promotions: VecDeque::new(),
3117            cluster_metadata: ClusterMetadata {
3118                single_voter: false,
3119                total_voters: 0,
3120                replication_targets: vec![],
3121            },
3122            lease_timestamp: AtomicU64::new(0),
3123            linearizable_read_buffer: Box::new(
3124                BatchBuffer::new(candidate.node_config.raft.batching.max_batch_size)
3125                    .with_length_gauge(
3126                        candidate.node_id(),
3127                        "linearizable",
3128                        candidate.node_config.raft.metrics.enable_batch,
3129                    ),
3130            ),
3131            lease_read_queue: VecDeque::new(),
3132            eventual_read_queue: VecDeque::new(),
3133            pending_requests: HashMap::new(),
3134            pending_reads: BTreeMap::new(),
3135            backpressure_metrics,
3136            _marker: PhantomData,
3137        }
3138    }
3139}
3140
3141impl<T: TypeConfig> Debug for LeaderState<T> {
3142    fn fmt(
3143        &self,
3144        f: &mut std::fmt::Formatter<'_>,
3145    ) -> std::fmt::Result {
3146        f.debug_struct("LeaderState")
3147            .field("shared_state", &self.shared_state)
3148            .field("next_index", &self.next_index)
3149            .field("match_index", &self.match_index)
3150            .field("noop_log_id", &self.noop_log_id)
3151            .finish()
3152    }
3153}
3154
3155/// Calculates the maximum number of nodes we can promote while keeping the total voter count
3156/// odd
3157///
3158/// - `current`: current number of voting nodes
3159/// - `available`: number of ready learners pending promotion
3160///
3161/// Returns the maximum number of nodes to promote (0 if no safe promotion exists)
3162pub fn calculate_safe_batch_size(
3163    current: usize,
3164    available: usize,
3165) -> usize {
3166    if (current + available) % 2 == 1 {
3167        // Promoting all is safe
3168        available
3169    } else {
3170        // We can only promote (available - 1) to keep the invariant
3171        // But if available - 1 == 0, then we cannot promote any?
3172        available.saturating_sub(1)
3173    }
3174}