1use super::LeaderStateSnapshot;
2use super::RaftRole;
3use super::SharedState;
4use super::StateSnapshot;
5use super::buffers::BatchBuffer;
6use super::buffers::ProposeBatchBuffer;
7use super::candidate_state::CandidateState;
8use super::role_state::RaftRoleState;
9use super::role_state::check_and_trigger_snapshot;
10use super::role_state::send_replay_raft_event;
11use crate::AppendResults;
12use crate::BackgroundSnapshotTransfer;
13use crate::ConnectionType;
14use crate::ConsensusError;
15use crate::Error;
16use crate::MaybeCloneOneshot;
17use crate::MaybeCloneOneshotSender;
18use crate::Membership;
19use crate::MembershipError;
20use crate::NetworkError;
21use crate::PeerUpdate;
22use crate::PurgeExecutor;
23use crate::QuorumVerificationResult;
24use crate::RaftContext;
25use crate::RaftEvent;
26use crate::RaftLog;
27use crate::RaftNodeConfig;
28use crate::RaftOneshot;
29use crate::RaftRequestWithSignal;
30use crate::ReadConsistencyPolicy as ServerReadConsistencyPolicy;
31use crate::ReplicationConfig;
32use crate::ReplicationCore;
33use crate::ReplicationError;
34use crate::ReplicationTimer;
35use crate::Result;
36use crate::RoleEvent;
37use crate::SnapshotConfig;
38use crate::StateMachine;
39use crate::StateMachineHandler;
40use crate::StateTransitionError;
41use crate::SystemError;
42use crate::TypeConfig;
43use crate::alias::MOF;
44use crate::alias::ROF;
45use crate::alias::SMHOF;
46use crate::cluster::is_majority;
47use crate::ensure_safe_join;
48use crate::event::ClientCmd;
49use crate::stream::create_production_snapshot_stream;
50use d_engine_proto::client::ClientReadRequest;
51use d_engine_proto::client::ClientResponse;
52use d_engine_proto::common::AddNode;
53use d_engine_proto::common::BatchPromote;
54use d_engine_proto::common::BatchRemove;
55use d_engine_proto::common::EntryPayload;
56use d_engine_proto::common::LogId;
57use d_engine_proto::common::NodeRole::Leader;
58use d_engine_proto::common::NodeStatus;
59use d_engine_proto::common::membership_change::Change;
60use d_engine_proto::error::ErrorCode;
61use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
62use d_engine_proto::server::cluster::JoinRequest;
63use d_engine_proto::server::cluster::JoinResponse;
64use d_engine_proto::server::cluster::LeaderDiscoveryResponse;
65use d_engine_proto::server::cluster::NodeMeta;
66use d_engine_proto::server::election::VoteResponse;
67use d_engine_proto::server::election::VotedFor;
68use d_engine_proto::server::replication::AppendEntriesResponse;
69use d_engine_proto::server::storage::SnapshotChunk;
70use d_engine_proto::server::storage::SnapshotMetadata;
71use nanoid::nanoid;
72use std::collections::BTreeMap;
73use std::collections::HashMap;
74use std::collections::VecDeque;
75use std::fmt::Debug;
76use std::marker::PhantomData;
77use std::sync::Arc;
78use std::sync::atomic::AtomicBool;
79use std::sync::atomic::AtomicU32;
80use std::sync::atomic::AtomicU64;
81use std::sync::atomic::Ordering;
82use std::time::Duration;
83use tokio::sync::mpsc;
84use tokio::sync::oneshot;
85use tokio::time::Instant;
86use tokio::time::sleep;
87use tokio::time::timeout;
88use tonic::Status;
89use tonic::async_trait;
90use tracing::debug;
91use tracing::error;
92use tracing::info;
93use tracing::instrument;
94use tracing::trace;
95use tracing::warn;
96
97type LinearizableReadRequest = (
99 ClientReadRequest,
100 MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
101);
102
103type WriteMetadata = (
105 u64,
106 Vec<MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>>,
107 bool,
108);
109
110#[derive(Debug, Clone)]
112pub struct PendingPromotion {
113 pub node_id: u32,
114 pub ready_since: Instant,
115}
116
117impl PendingPromotion {
118 pub fn new(
119 node_id: u32,
120 ready_since: Instant,
121 ) -> Self {
122 PendingPromotion {
123 node_id,
124 ready_since,
125 }
126 }
127}
128
129#[derive(Debug, Clone)]
134pub struct ClusterMetadata {
135 pub single_voter: bool,
137 pub total_voters: usize,
139 pub replication_targets: Vec<NodeMeta>,
141}
142
143pub struct BackpressureMetrics {
148 labels_write: Arc<[(String, String)]>,
150 labels_read: Arc<[(String, String)]>,
152 enabled: bool,
154 sample_counter: AtomicU32,
156 sample_rate: u32,
158}
159
160impl BackpressureMetrics {
161 pub fn new(
163 node_id: u32,
164 enabled: bool,
165 sample_rate: u32,
166 ) -> Self {
167 let sample_rate = if sample_rate == 0 { 1 } else { sample_rate };
168 let node_id_str = node_id.to_string();
169 let labels_write = Arc::new([
170 ("node_id".to_string(), node_id_str.clone()),
171 ("type".to_string(), "write".to_string()),
172 ]);
173 let labels_read = Arc::new([
174 ("node_id".to_string(), node_id_str),
175 ("type".to_string(), "read".to_string()),
176 ]);
177
178 Self {
179 labels_write,
180 labels_read,
181 enabled,
182 sample_counter: AtomicU32::new(0),
183 sample_rate,
184 }
185 }
186
187 #[inline]
189 pub fn record_rejection(
190 &self,
191 is_write: bool,
192 ) {
193 if self.enabled {
194 let labels = if is_write {
195 &self.labels_write
196 } else {
197 &self.labels_read
198 };
199 metrics::counter!("backpressure.rejections", labels.as_ref()).increment(1);
200 }
201 }
202
203 #[inline]
205 pub fn record_buffer_utilization(
206 &self,
207 utilization: f64,
208 is_write: bool,
209 ) {
210 if self.enabled {
211 let counter = self.sample_counter.fetch_add(1, Ordering::Relaxed);
212 if counter % self.sample_rate == 0 {
213 let labels = if is_write {
214 &self.labels_write
215 } else {
216 &self.labels_read
217 };
218 metrics::gauge!("backpressure.buffer_utilization", labels.as_ref())
219 .set(utilization);
220 }
221 }
222 }
223}
224
225pub struct LeaderState<T: TypeConfig> {
233 pub shared_state: SharedState,
236
237 pub next_index: HashMap<u32, u64>,
242
243 pub(super) match_index: HashMap<u32, u64>,
248
249 #[doc(hidden)]
252 pub noop_log_id: Option<u64>,
253
254 pub scheduled_purge_upto: Option<LogId>,
264
265 pub last_purged_index: Option<LogId>,
276
277 pub snapshot_in_progress: AtomicBool,
279
280 pub(super) propose_buffer: Box<ProposeBatchBuffer>,
289
290 timer: Box<ReplicationTimer>,
298
299 pub(super) node_config: Arc<RaftNodeConfig>,
307
308 pub last_learner_check: Instant,
310
311 pub next_membership_maintenance_check: Instant,
325
326 pub pending_promotions: VecDeque<PendingPromotion>,
328
329 pub(super) lease_timestamp: AtomicU64,
332
333 pub(super) cluster_metadata: ClusterMetadata,
337
338 pub(super) linearizable_read_buffer: Box<BatchBuffer<LinearizableReadRequest>>,
341
342 pub(super) lease_read_queue: VecDeque<(
345 ClientReadRequest,
346 MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
347 )>,
348
349 pub(super) eventual_read_queue: VecDeque<(
352 ClientReadRequest,
353 MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
354 )>,
355
356 pub(super) pending_requests:
361 HashMap<u64, MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>>,
362
363 pub(super) pending_reads: BTreeMap<u64, VecDeque<LinearizableReadRequest>>,
368
369 backpressure_metrics: Option<Arc<BackpressureMetrics>>,
372
373 _marker: PhantomData<T>,
376}
377
378#[async_trait]
379impl<T: TypeConfig> RaftRoleState for LeaderState<T> {
380 type T = T;
381
382 fn shared_state(&self) -> &SharedState {
383 &self.shared_state
384 }
385
386 fn shared_state_mut(&mut self) -> &mut SharedState {
387 &mut self.shared_state
388 }
389
390 #[tracing::instrument]
394 fn update_commit_index(
395 &mut self,
396 new_commit_index: u64,
397 ) -> Result<()> {
398 if self.commit_index() < new_commit_index {
399 debug!("update_commit_index to: {:?}", new_commit_index);
400 self.shared_state.commit_index = new_commit_index;
401 } else {
402 warn!(
403 "Illegal operation, might be a bug! I am Leader old_commit_index({}) >= new_commit_index:({})",
404 self.commit_index(),
405 new_commit_index
406 )
407 }
408 Ok(())
409 }
410
411 fn voted_for(&self) -> Result<Option<VotedFor>> {
413 self.shared_state().voted_for()
414 }
415
416 fn update_voted_for(
419 &mut self,
420 voted_for: VotedFor,
421 ) -> Result<bool> {
422 self.shared_state_mut().update_voted_for(voted_for)
423 }
424
425 fn next_index(
426 &self,
427 node_id: u32,
428 ) -> Option<u64> {
429 Some(if let Some(n) = self.next_index.get(&node_id) {
430 *n
431 } else {
432 1
433 })
434 }
435
436 fn update_next_index(
437 &mut self,
438 node_id: u32,
439 new_next_id: u64,
440 ) -> Result<()> {
441 debug!("update_next_index({}) to {}", node_id, new_next_id);
442 self.next_index.insert(node_id, new_next_id);
443 Ok(())
444 }
445
446 fn update_match_index(
447 &mut self,
448 node_id: u32,
449 new_match_id: u64,
450 ) -> Result<()> {
451 self.match_index.insert(node_id, new_match_id);
452 Ok(())
453 }
454
455 fn match_index(
456 &self,
457 node_id: u32,
458 ) -> Option<u64> {
459 self.match_index.get(&node_id).copied()
460 }
461
462 fn init_peers_next_index_and_match_index(
463 &mut self,
464 last_entry_id: u64,
465 peer_ids: Vec<u32>,
466 ) -> Result<()> {
467 for peer_id in peer_ids {
468 debug!("init leader state for peer_id: {:?}", peer_id);
469 let new_next_id = last_entry_id + 1;
470 self.update_next_index(peer_id, new_next_id)?;
471 self.update_match_index(peer_id, 0)?;
472 }
473 Ok(())
474 }
475
476 fn on_noop_committed(
479 &mut self,
480 ctx: &RaftContext<Self::T>,
481 ) -> Result<()> {
482 let noop_index = ctx.raft_log().last_entry_id();
483 self.noop_log_id = Some(noop_index);
484 debug!("Tracked noop_log_id: {}", noop_index);
485 Ok(())
486 }
487 fn noop_log_id(&self) -> Result<Option<u64>> {
488 Ok(self.noop_log_id)
489 }
490
491 async fn verify_leadership_persistent(
510 &mut self,
511 payloads: Vec<EntryPayload>,
512 ctx: &RaftContext<T>,
513 role_tx: &mpsc::UnboundedSender<RoleEvent>,
514 ) -> Result<bool> {
515 let initial_delay =
516 Duration::from_millis(ctx.node_config.retry.internal_quorum.base_delay_ms);
517 let max_delay = Duration::from_millis(ctx.node_config.retry.internal_quorum.max_delay_ms);
518 let global_timeout = ctx.node_config.raft.membership.verify_leadership_persistent_timeout;
519
520 let mut current_delay = initial_delay;
521 let start_time = Instant::now();
522
523 loop {
524 match self.verify_internal_quorum(payloads.clone(), ctx, role_tx).await {
525 Ok(QuorumVerificationResult::Success) => {
526 return Ok(true);
527 }
528 Ok(QuorumVerificationResult::LeadershipLost) => return Ok(false),
529 Ok(QuorumVerificationResult::RetryRequired) => {
530 if start_time.elapsed() > global_timeout {
532 return Err(NetworkError::GlobalTimeout(
533 "Leadership verification timed out".to_string(),
534 )
535 .into());
536 }
537
538 current_delay =
539 current_delay.checked_mul(2).unwrap_or(max_delay).min(max_delay);
540 let jitter = Duration::from_millis(rand::random::<u64>() % 500);
541 sleep(current_delay + jitter).await;
542 }
543 Err(e) => return Err(e),
544 }
545 }
546 }
547
548 async fn init_cluster_metadata(
581 &mut self,
582 membership: &Arc<T::M>,
583 ) -> Result<()> {
584 let voters = membership.voters().await;
586 let total_voters = voters.len() + 1; let replication_targets = membership.replication_peers().await;
590
591 let single_voter = total_voters == 1;
593
594 self.cluster_metadata = ClusterMetadata {
595 single_voter,
596 total_voters,
597 replication_targets: replication_targets.clone(),
598 };
599
600 debug!(
601 "Initialized cluster metadata: single_voter={}, total_voters={}, replication_targets={}",
602 single_voter,
603 total_voters,
604 replication_targets.len()
605 );
606 Ok(())
607 }
608
609 async fn verify_internal_quorum(
610 &mut self,
611 payloads: Vec<EntryPayload>,
612 ctx: &RaftContext<T>,
613 role_tx: &mpsc::UnboundedSender<RoleEvent>,
614 ) -> Result<QuorumVerificationResult> {
615 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
616
617 self.execute_request_immediately(
618 RaftRequestWithSignal {
619 id: nanoid!(),
620 payloads,
621 senders: vec![resp_tx],
622 wait_for_apply_event: false,
623 },
624 ctx,
625 role_tx,
626 )
627 .await?;
628
629 let timeout_duration =
631 Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
632 match timeout(timeout_duration, resp_rx).await {
633 Ok(Ok(Ok(response))) => {
635 debug!("Leadership check response: {:?}", response);
636
637 Ok(if response.is_write_success() {
639 QuorumVerificationResult::Success
640 } else if response.is_retry_required() {
641 QuorumVerificationResult::RetryRequired
643 } else {
644 QuorumVerificationResult::LeadershipLost
646 })
647 }
648
649 Ok(Ok(Err(status))) => {
651 warn!("Leadership rejected by follower: {status:?}");
652 Ok(QuorumVerificationResult::LeadershipLost)
653 }
654
655 Ok(Err(e)) => {
657 error!("Channel error during leadership check: {:?}", e);
658 Err(NetworkError::SingalReceiveFailed(e.to_string()).into())
659 }
660
661 Err(_) => {
663 warn!("Leadership check timed out after {:?}", timeout_duration);
664 Err(NetworkError::Timeout {
665 node_id: self.node_id(),
666 duration: timeout_duration,
667 }
668 .into())
669 }
670 }
671 }
672
673 fn is_leader(&self) -> bool {
674 true
675 }
676
677 fn become_leader(&self) -> Result<RaftRole<T>> {
678 warn!("I am leader already");
679
680 Err(StateTransitionError::InvalidTransition.into())
681 }
682
683 fn become_candidate(&self) -> Result<RaftRole<T>> {
684 error!("Leader can not become Candidate");
685
686 Err(StateTransitionError::InvalidTransition.into())
687 }
688
689 fn become_follower(&self) -> Result<RaftRole<T>> {
690 info!(
691 "Node {} term {} transitioning to Follower",
692 self.node_id(),
693 self.current_term(),
694 );
695 println!(
696 "[Node {}] Leader → Follower (term {})",
697 self.node_id(),
698 self.current_term()
699 );
700 Ok(RaftRole::Follower(Box::new(self.into())))
701 }
702
703 fn become_learner(&self) -> Result<RaftRole<T>> {
704 error!("Leader can not become Learner");
705
706 Err(StateTransitionError::InvalidTransition.into())
707 }
708
709 fn is_timer_expired(&self) -> bool {
710 self.timer.is_expired()
711 }
712
713 fn reset_timer(&mut self) {
715 self.timer.reset_replication();
716 }
717
718 fn next_deadline(&self) -> Instant {
719 self.timer.next_deadline()
720 }
721
722 async fn tick(
724 &mut self,
725 role_tx: &mpsc::UnboundedSender<RoleEvent>,
726 _raft_tx: &mpsc::Sender<RaftEvent>,
727 ctx: &RaftContext<T>,
728 ) -> Result<()> {
729 let now = Instant::now();
730 self.shared_state().set_current_leader(self.node_id());
732
733 if let Err(e) = self.run_periodic_maintenance(role_tx, ctx).await {
735 error!("Failed to run periodic maintenance: {}", e);
736 }
737
738 if now >= self.timer.replication_deadline() {
741 debug!(?now, "tick::reset_replication timer");
742
743 let request = self.propose_buffer.flush();
746 self.send_heartbeat_or_batch(request, role_tx, ctx).await?;
747 }
748
749 Ok(())
750 }
751
752 fn drain_read_buffer(&mut self) -> Result<()> {
753 let batch = self.linearizable_read_buffer.take_all();
755 if !batch.is_empty() {
756 warn!(
757 "Read batch: draining {} linearizable read requests due to role change",
758 batch.len()
759 );
760 for (_, sender) in batch {
761 let _ = sender.send(Err(tonic::Status::unavailable("Leader stepped down")));
762 }
763 }
764
765 if !self.lease_read_queue.is_empty() {
767 warn!(
768 "Read batch: draining {} lease read requests due to role change",
769 self.lease_read_queue.len()
770 );
771 for (_, sender) in self.lease_read_queue.drain(..) {
772 let _ = sender.send(Err(tonic::Status::unavailable("Leader stepped down")));
773 }
774 }
775
776 if !self.eventual_read_queue.is_empty() {
778 warn!(
779 "Read batch: draining {} eventual read requests due to role change",
780 self.eventual_read_queue.len()
781 );
782 for (_, sender) in self.eventual_read_queue.drain(..) {
783 let _ = sender.send(Err(tonic::Status::unavailable("Leader stepped down")));
784 }
785 }
786
787 if !self.pending_reads.is_empty() {
789 let count: usize = self.pending_reads.values().map(|b| b.len()).sum();
790 warn!(
791 "Read batch: draining {} pending linearizable reads due to role change",
792 count
793 );
794 for (_, batch) in std::mem::take(&mut self.pending_reads) {
795 for (_, sender) in batch {
796 let _ = sender.send(Err(tonic::Status::unavailable("Leader stepped down")));
797 }
798 }
799 }
800
801 if !self.propose_buffer.is_empty() {
803 warn!(
804 "Write batch: draining {} pending write requests due to role change",
805 self.propose_buffer.len()
806 );
807
808 if let Some(batch) = self.propose_buffer.flush() {
809 for sender in batch.senders {
810 let _ = sender.send(Err(tonic::Status::failed_precondition("Not leader")));
811 }
812 }
813 }
814
815 Ok(())
816 }
817
818 fn push_client_cmd(
819 &mut self,
820 cmd: ClientCmd,
821 ctx: &RaftContext<Self::T>,
822 ) {
823 use crate::client_command_to_entry_payloads;
824
825 let backpressure = &ctx.node_config.raft.backpressure;
826
827 match cmd {
828 ClientCmd::Propose(req, sender) => {
829 let current_pending = self.propose_buffer.len();
830
831 if let Some(ref metrics) = self.backpressure_metrics {
833 if backpressure.max_pending_writes > 0 {
834 let utilization = (current_pending as f64
835 / backpressure.max_pending_writes as f64)
836 * 100.0;
837 metrics.record_buffer_utilization(utilization, true);
838 }
839 }
840
841 if backpressure.should_reject_write(current_pending) {
843 if let Some(ref metrics) = self.backpressure_metrics {
845 metrics.record_rejection(true);
846 }
847
848 let _ = sender.send(Err(Status::resource_exhausted(
849 "Too many pending write requests",
850 )));
851 return;
852 }
853
854 if let Some(cmd) = req.command {
856 let payload = client_command_to_entry_payloads(vec![cmd])
857 .into_iter()
858 .next()
859 .expect("client_command_to_entry_payloads should return 1 element");
860
861 self.propose_buffer.push(payload, sender);
863 } else {
864 let _ = sender.send(Err(Status::invalid_argument("Command is empty")));
866 }
867 }
868 ClientCmd::Read(req, sender) => {
869 let effective_policy = self.determine_read_policy(&req);
871
872 match effective_policy {
874 ServerReadConsistencyPolicy::LinearizableRead => {
875 let current_pending = self.linearizable_read_buffer.len();
876
877 if let Some(ref metrics) = self.backpressure_metrics {
879 if backpressure.max_pending_reads > 0 {
880 let utilization = (current_pending as f64
881 / backpressure.max_pending_reads as f64)
882 * 100.0;
883 metrics.record_buffer_utilization(utilization, false);
884 }
885 }
886
887 if backpressure.should_reject_read(current_pending) {
889 if let Some(ref metrics) = self.backpressure_metrics {
891 metrics.record_rejection(false);
892 }
893
894 let _ = sender.send(Err(Status::resource_exhausted(
895 "Too many pending read requests",
896 )));
897 return;
898 }
899
900 self.linearizable_read_buffer.push((req, sender));
901 }
902 ServerReadConsistencyPolicy::LeaseRead => {
903 let current_pending = self.lease_read_queue.len();
904
905 if let Some(ref metrics) = self.backpressure_metrics {
907 if backpressure.max_pending_reads > 0 {
908 let utilization = (current_pending as f64
909 / backpressure.max_pending_reads as f64)
910 * 100.0;
911 metrics.record_buffer_utilization(utilization, false);
912 }
913 }
914
915 if backpressure.should_reject_read(current_pending) {
917 if let Some(ref metrics) = self.backpressure_metrics {
919 metrics.record_rejection(false);
920 }
921
922 let _ = sender.send(Err(Status::resource_exhausted(
923 "Too many pending read requests",
924 )));
925 return;
926 }
927
928 self.lease_read_queue.push_back((req, sender));
929 }
930 ServerReadConsistencyPolicy::EventualConsistency => {
931 let current_pending = self.eventual_read_queue.len();
932
933 if let Some(ref metrics) = self.backpressure_metrics {
935 if backpressure.max_pending_reads > 0 {
936 let utilization = (current_pending as f64
937 / backpressure.max_pending_reads as f64)
938 * 100.0;
939 metrics.record_buffer_utilization(utilization, false);
940 }
941 }
942
943 if backpressure.should_reject_read(current_pending) {
945 if let Some(ref metrics) = self.backpressure_metrics {
947 metrics.record_rejection(false);
948 }
949
950 let _ = sender.send(Err(Status::resource_exhausted(
951 "Too many pending read requests",
952 )));
953 return;
954 }
955
956 self.eventual_read_queue.push_back((req, sender));
957 }
958 }
959 }
960 }
961 }
962
963 async fn flush_cmd_buffers(
964 &mut self,
965 ctx: &RaftContext<Self::T>,
966 role_tx: &mpsc::UnboundedSender<RoleEvent>,
967 ) -> Result<()> {
968 let has_writes = !self.propose_buffer.is_empty();
972 let has_reads = !self.linearizable_read_buffer.is_empty();
973
974 if has_writes && !has_reads {
977 if let Some(request) = self.propose_buffer.flush() {
978 self.process_batch(std::iter::once(request), role_tx, ctx).await?;
979 }
980 }
981 else if has_writes || has_reads {
984 self.unified_write_and_linear_read(ctx, role_tx).await?;
985 }
986
987 while let Some((req, sender)) = self.lease_read_queue.pop_front() {
989 if let Err(e) = self.process_lease_read(req, sender, ctx, role_tx).await {
990 error!("process_lease_read failed: {:?}", e);
991 }
992 }
993
994 while let Some((req, sender)) = self.eventual_read_queue.pop_front() {
996 self.process_eventual_read(req, sender, ctx);
997 }
998
999 Ok(())
1000 }
1001
1002 async fn handle_raft_event(
1003 &mut self,
1004 raft_event: RaftEvent,
1005 ctx: &RaftContext<T>,
1006 role_tx: mpsc::UnboundedSender<RoleEvent>,
1007 ) -> Result<()> {
1008 let my_id = self.shared_state.node_id;
1009 let my_term = self.current_term();
1010
1011 match raft_event {
1012 RaftEvent::ReceiveVoteRequest(vote_request, sender) => {
1019 debug!(
1020 "handle_raft_event::RaftEvent::ReceiveVoteRequest: {:?}",
1021 &vote_request
1022 );
1023
1024 let my_term = self.current_term();
1025 if my_term < vote_request.term {
1026 self.update_current_term(vote_request.term);
1027 self.send_become_follower_event(None, &role_tx)?;
1029
1030 info!("Leader will not process Vote request, it should let Follower do it.");
1031 send_replay_raft_event(
1032 &role_tx,
1033 RaftEvent::ReceiveVoteRequest(vote_request, sender),
1034 )?;
1035 } else {
1036 let last_log_id =
1037 ctx.raft_log().last_log_id().unwrap_or(LogId { index: 0, term: 0 });
1038 let response = VoteResponse {
1039 term: my_term,
1040 vote_granted: false,
1041 last_log_index: last_log_id.index,
1042 last_log_term: last_log_id.term,
1043 };
1044 sender.send(Ok(response)).map_err(|e| {
1045 let error_str = format!("{e:?}");
1046 error!("Failed to send: {}", error_str);
1047 NetworkError::SingalSendFailed(error_str)
1048 })?;
1049 }
1050 }
1051
1052 RaftEvent::ClusterConf(_metadata_request, sender) => {
1053 let cluster_conf = ctx
1054 .membership()
1055 .retrieve_cluster_membership_config(self.shared_state().current_leader())
1056 .await;
1057 debug!("Leader receive ClusterConf: {:?}", &cluster_conf);
1058 if let Err(e) = sender.send(Ok(cluster_conf)) {
1059 error!(
1061 "Failed to send ClusterConf response (receiver dropped): {:?}",
1062 e
1063 );
1064 }
1065 }
1066
1067 RaftEvent::ClusterConfUpdate(cluste_conf_change_request, sender) => {
1068 let current_conf_version = ctx.membership().get_cluster_conf_version().await;
1069 debug!(%current_conf_version, ?cluste_conf_change_request,
1070 "handle_raft_event::RaftEvent::ClusterConfUpdate",
1071 );
1072
1073 if my_term >= cluste_conf_change_request.term {
1075 let response = ClusterConfUpdateResponse::higher_term(
1076 my_id,
1077 my_term,
1078 current_conf_version,
1079 );
1080
1081 sender.send(Ok(response)).map_err(|e| {
1082 let error_str = format!("{e:?}");
1083 error!("Failed to send: {}", error_str);
1084 NetworkError::SingalSendFailed(error_str)
1085 })?;
1086 } else {
1087 info!(
1089 "my({}) term < request one, now I will step down to Follower",
1090 my_id
1091 );
1092 self.send_become_follower_event(Some(cluste_conf_change_request.id), &role_tx)?;
1094
1095 info!(
1096 "Leader will not process append_entries_request, it should let Follower do it."
1097 );
1098 send_replay_raft_event(
1099 &role_tx,
1100 RaftEvent::ClusterConfUpdate(cluste_conf_change_request, sender),
1101 )?;
1102 }
1103 }
1104
1105 RaftEvent::AppendEntries(append_entries_request, sender) => {
1106 debug!(
1107 "handle_raft_event::RaftEvent::AppendEntries: {:?}",
1108 &append_entries_request
1109 );
1110
1111 if my_term >= append_entries_request.term {
1113 let response = AppendEntriesResponse::higher_term(my_id, my_term);
1114
1115 sender.send(Ok(response)).map_err(|e| {
1116 let error_str = format!("{e:?}");
1117 error!("Failed to send: {}", error_str);
1118 NetworkError::SingalSendFailed(error_str)
1119 })?;
1120 } else {
1121 info!(
1123 "my({}) term < request one, now I will step down to Follower",
1124 my_id
1125 );
1126 self.send_become_follower_event(
1128 Some(append_entries_request.leader_id),
1129 &role_tx,
1130 )?;
1131
1132 info!(
1133 "Leader will not process append_entries_request, it should let Follower do it."
1134 );
1135 send_replay_raft_event(
1136 &role_tx,
1137 RaftEvent::AppendEntries(append_entries_request, sender),
1138 )?;
1139 }
1140 }
1141
1142 RaftEvent::InstallSnapshotChunk(_streaming, sender) => {
1143 sender
1144 .send(Err(Status::permission_denied("Not Follower or Learner. ")))
1145 .map_err(|e| {
1146 let error_str = format!("{e:?}");
1147 error!("Failed to send: {}", error_str);
1148 NetworkError::SingalSendFailed(error_str)
1149 })?;
1150
1151 return Err(ConsensusError::RoleViolation {
1152 current_role: "Leader",
1153 required_role: "Follower or Learner",
1154 context: format!(
1155 "Leader node {} receives RaftEvent::InstallSnapshotChunk",
1156 ctx.node_id
1157 ),
1158 }
1159 .into());
1160 }
1161
1162 RaftEvent::CreateSnapshotEvent => {
1163 if self.snapshot_in_progress.load(std::sync::atomic::Ordering::Acquire) {
1165 info!("Snapshot creation already in progress. Skipping duplicate request.");
1166 return Ok(());
1167 }
1168
1169 self.snapshot_in_progress.store(true, std::sync::atomic::Ordering::Release);
1170 let state_machine_handler = ctx.state_machine_handler().clone();
1171
1172 tokio::spawn(async move {
1174 let result = state_machine_handler.create_snapshot().await;
1175 info!("SnapshotCreated event will be processed in another event thread");
1176 if let Err(e) =
1177 send_replay_raft_event(&role_tx, RaftEvent::SnapshotCreated(result))
1178 {
1179 error!("Failed to send snapshot creation result: {}", e);
1180 }
1181 });
1182 }
1183
1184 RaftEvent::SnapshotCreated(result) => {
1185 self.snapshot_in_progress.store(false, Ordering::SeqCst);
1186
1187 match result {
1188 Err(e) => {
1189 error!(%e, "State machine snapshot creation failed");
1190 }
1191 Ok((
1192 SnapshotMetadata {
1193 last_included: last_included_option,
1194 checksum: _,
1195 },
1196 _final_path,
1197 )) => {
1198 info!("Initiating log purge after snapshot creation");
1199
1200 if let Some(last_included) = last_included_option {
1201 trace!("Phase 1: Schedule log purge if possible");
1205 if self.can_purge_logs(self.last_purged_index, last_included) {
1206 trace!(?last_included, "Phase 1: Scheduling log purge");
1207 self.scheduled_purge_upto(last_included);
1208 }
1209
1210 trace!("Phase 2: Execute scheduled purge task");
1215 debug!(?last_included, "Execute scheduled purge task");
1216 if let Some(scheduled) = self.scheduled_purge_upto {
1217 let purge_executor = ctx.purge_executor();
1218 match purge_executor.execute_purge(scheduled).await {
1219 Ok(_) => {
1220 if let Err(e) = send_replay_raft_event(
1221 &role_tx,
1222 RaftEvent::LogPurgeCompleted(scheduled),
1223 ) {
1224 error!(%e, "Failed to notify purge completion");
1225 }
1226 }
1227 Err(e) => {
1228 error!(?e, ?scheduled, "Log purge execution failed");
1229 }
1230 }
1231 }
1232 }
1233 }
1234 }
1235 }
1236
1237 RaftEvent::LogPurgeCompleted(purged_id) => {
1238 if self.last_purged_index.map_or(true, |current| purged_id.index > current.index) {
1240 debug!(
1241 ?purged_id,
1242 "Updating last purged index after successful execution"
1243 );
1244 self.last_purged_index = Some(purged_id);
1245 } else {
1246 warn!(
1247 ?purged_id,
1248 ?self.last_purged_index,
1249 "Received outdated purge completion, ignoring"
1250 );
1251 }
1252 }
1253
1254 RaftEvent::JoinCluster(join_request, sender) => {
1255 debug!(?join_request, "Leader::RaftEvent::JoinCluster");
1256 self.handle_join_cluster(join_request, sender, ctx, &role_tx).await?;
1257 }
1258
1259 RaftEvent::DiscoverLeader(request, sender) => {
1260 debug!(?request, "Leader::RaftEvent::DiscoverLeader");
1261
1262 if let Some(meta) = ctx.membership().retrieve_node_meta(my_id).await {
1263 let response = LeaderDiscoveryResponse {
1264 leader_id: my_id,
1265 leader_address: meta.address,
1266 term: my_term,
1267 };
1268 sender.send(Ok(response)).map_err(|e| {
1269 let error_str = format!("{e:?}");
1270 error!("Failed to send: {}", error_str);
1271 NetworkError::SingalSendFailed(error_str)
1272 })?;
1273 return Ok(());
1274 } else {
1275 let msg = "Leader can not find its address? It must be a bug.";
1276 error!("{}", msg);
1277 panic!("{}", msg);
1278 }
1279 }
1280 RaftEvent::StreamSnapshot(request, sender) => {
1281 debug!("Leader::RaftEvent::StreamSnapshot");
1282
1283 if let Some(metadata) = ctx.state_machine().snapshot_metadata() {
1285 let (response_tx, response_rx) =
1287 mpsc::channel::<std::result::Result<Arc<SnapshotChunk>, Status>>(32);
1288 let size = 1024 * 1024 * 1024; let response_stream = create_production_snapshot_stream(response_rx, size);
1291 sender.send(Ok(response_stream)).map_err(|e| {
1293 let error_str = format!("{e:?}");
1294 error!("Stream response failed: {}", error_str);
1295 NetworkError::SingalSendFailed(error_str)
1296 })?;
1297
1298 let state_machine_handler = ctx.state_machine_handler().clone();
1300 let config = ctx.node_config.raft.snapshot.clone();
1301 let data_stream =
1303 state_machine_handler.load_snapshot_data(metadata.clone()).await?;
1304
1305 tokio::spawn(async move {
1306 if let Err(e) = BackgroundSnapshotTransfer::<T>::run_pull_transfer(
1307 request,
1308 response_tx,
1309 data_stream,
1310 config,
1311 )
1312 .await
1313 {
1314 error!("StreamSnapshot failed: {:?}", e);
1315 }
1316 });
1317 } else {
1318 warn!("No snapshot available for streaming");
1319 sender.send(Err(Status::not_found("Snapshot not found"))).map_err(|e| {
1320 let error_str = format!("{e:?}");
1321 error!("Stream response failed: {}", error_str);
1322 NetworkError::SingalSendFailed(error_str)
1323 })?;
1324 }
1325 }
1326 RaftEvent::TriggerSnapshotPush { peer_id } => {
1327 if let Some(lastest_snapshot_metadata) = ctx.state_machine().snapshot_metadata() {
1328 Self::trigger_background_snapshot(
1329 peer_id,
1330 lastest_snapshot_metadata,
1331 ctx.state_machine_handler().clone(),
1332 ctx.membership(),
1333 ctx.node_config.raft.snapshot.clone(),
1334 )
1335 .await?;
1336 }
1337 }
1338
1339 RaftEvent::PromoteReadyLearners => {
1340 info!(
1342 "[Leader {}] ⚡ PromoteReadyLearners event received, pending_promotions: {:?}",
1343 self.node_id(),
1344 self.pending_promotions.iter().map(|p| p.node_id).collect::<Vec<_>>()
1345 );
1346 self.process_pending_promotions(ctx, &role_tx).await?;
1347 }
1348
1349 RaftEvent::MembershipApplied => {
1350 let old_replication_targets = self.cluster_metadata.replication_targets.clone();
1352
1353 debug!("Refreshing cluster metadata cache after membership change");
1355 if let Err(e) = self.update_cluster_metadata(&ctx.membership()).await {
1356 warn!("Failed to update cluster metadata: {:?}", e);
1357 }
1358
1359 let newly_added: Vec<u32> = self
1362 .cluster_metadata
1363 .replication_targets
1364 .iter()
1365 .filter(|new_peer| {
1366 !old_replication_targets.iter().any(|old_peer| old_peer.id == new_peer.id)
1367 })
1368 .map(|peer| peer.id)
1369 .collect();
1370
1371 if !newly_added.is_empty() {
1372 debug!(
1373 "Initializing replication state for {} new peer(s): {:?}",
1374 newly_added.len(),
1375 newly_added
1376 );
1377 let last_entry_id = ctx.raft_log().last_entry_id();
1378 if let Err(e) =
1379 self.init_peers_next_index_and_match_index(last_entry_id, newly_added)
1380 {
1381 warn!("Failed to initialize next_index for new peers: {:?}", e);
1382 }
1385 }
1386 }
1387
1388 RaftEvent::ApplyCompleted {
1389 last_index,
1390 results,
1391 } => {
1392 let num_results = results.len();
1393
1394 let responses: Vec<_> = results
1396 .into_iter()
1397 .filter_map(|r| {
1398 self.pending_requests.remove(&r.index).map(|sender| (r, sender))
1399 })
1400 .collect();
1401
1402 for (result, sender) in responses {
1403 let response = if result.succeeded {
1404 ClientResponse::write_success()
1405 } else {
1406 ClientResponse::cas_failure()
1407 };
1408
1409 trace!(
1410 "[Leader-{}] Sending response to client for index {}: succeeded={}",
1411 self.node_id(),
1412 result.index,
1413 result.succeeded
1414 );
1415
1416 if sender.send(Ok(response)).is_err() {
1417 trace!(
1418 "[Leader-{}] Client receiver dropped for index {}",
1419 self.node_id(),
1420 result.index
1421 );
1422 }
1423 }
1424
1425 let reads_to_serve: Vec<_> =
1427 self.pending_reads.range(..=last_index).map(|(k, _)| *k).collect();
1428
1429 for read_index in reads_to_serve {
1430 if let Some(batch) = self.pending_reads.remove(&read_index) {
1431 self.execute_pending_reads(batch, ctx);
1432 }
1433 }
1434
1435 check_and_trigger_snapshot(
1437 last_index,
1438 Leader as i32,
1439 self.current_term(),
1440 ctx,
1441 &role_tx,
1442 )?;
1443
1444 trace!(
1445 "[Leader-{}] TIMING: process_apply_completed({} results)",
1446 self.node_id(),
1447 num_results,
1448 );
1449 }
1450
1451 RaftEvent::FatalError { source, error } => {
1452 error!("[Leader] Fatal error from {}: {}", source, error);
1453 let fatal_status = || tonic::Status::internal(format!("Node fatal error: {error}"));
1454 let pending: Vec<_> = self.pending_requests.drain().collect();
1456 if !pending.is_empty() {
1457 warn!(
1458 "[Leader] FatalError: notifying {} pending write requests",
1459 pending.len()
1460 );
1461 for (_index, sender) in pending {
1462 let _ = sender.send(Err(fatal_status()));
1463 }
1464 }
1465 let lin_buf = self.linearizable_read_buffer.take_all();
1467 if !lin_buf.is_empty() {
1468 warn!(
1469 "[Leader] FatalError: notifying {} buffered linearizable reads",
1470 lin_buf.len()
1471 );
1472 for (_, sender) in lin_buf {
1473 let _ = sender.send(Err(fatal_status()));
1474 }
1475 }
1476 if !self.pending_reads.is_empty() {
1478 let count: usize = self.pending_reads.values().map(|b| b.len()).sum();
1479 warn!(
1480 "[Leader] FatalError: notifying {} pending linearizable reads",
1481 count
1482 );
1483 for (_, batch) in std::mem::take(&mut self.pending_reads) {
1484 for (_, sender) in batch {
1485 let _ = sender.send(Err(fatal_status()));
1486 }
1487 }
1488 }
1489 if !self.lease_read_queue.is_empty() {
1491 warn!(
1492 "[Leader] FatalError: notifying {} queued lease reads",
1493 self.lease_read_queue.len()
1494 );
1495 for (_, sender) in self.lease_read_queue.drain(..) {
1496 let _ = sender.send(Err(fatal_status()));
1497 }
1498 }
1499 if !self.eventual_read_queue.is_empty() {
1501 warn!(
1502 "[Leader] FatalError: notifying {} queued eventual reads",
1503 self.eventual_read_queue.len()
1504 );
1505 for (_, sender) in self.eventual_read_queue.drain(..) {
1506 let _ = sender.send(Err(fatal_status()));
1507 }
1508 }
1509 return Err(crate::Error::Fatal(format!(
1510 "Fatal error from {source}: {error}"
1511 )));
1512 }
1513
1514 RaftEvent::StepDownSelfRemoved => {
1515 warn!(
1518 "[Leader-{}] Removed from cluster membership, stepping down to Follower",
1519 self.node_id()
1520 );
1521 role_tx.send(RoleEvent::BecomeFollower(None)).map_err(|e| {
1522 error!(
1523 "[Leader-{}] Failed to send BecomeFollower after self-removal: {:?}",
1524 self.node_id(),
1525 e
1526 );
1527 NetworkError::SingalSendFailed(format!(
1528 "BecomeFollower after self-removal: {e:?}"
1529 ))
1530 })?;
1531 return Ok(());
1532 }
1533 }
1534
1535 Ok(())
1536 }
1537}
1538
1539impl<T: TypeConfig> LeaderState<T> {
1540 pub async fn update_cluster_metadata(
1543 &mut self,
1544 membership: &Arc<T::M>,
1545 ) -> Result<()> {
1546 let voters = membership.voters().await;
1548 let total_voters = voters.len() + 1; let replication_targets = membership.replication_peers().await;
1552
1553 let single_voter = total_voters == 1;
1555
1556 self.cluster_metadata = ClusterMetadata {
1557 single_voter,
1558 total_voters,
1559 replication_targets: replication_targets.clone(),
1560 };
1561
1562 debug!(
1563 "Updated cluster metadata: single_voter={}, total_voters={}, replication_targets={}",
1564 single_voter,
1565 total_voters,
1566 replication_targets.len()
1567 );
1568 Ok(())
1569 }
1570
1571 pub fn state_snapshot(&self) -> StateSnapshot {
1573 StateSnapshot {
1574 current_term: self.current_term(),
1575 voted_for: None,
1576 commit_index: self.commit_index(),
1577 role: Leader as i32,
1578 }
1579 }
1580
1581 #[tracing::instrument]
1583 pub fn leader_state_snapshot(&self) -> LeaderStateSnapshot {
1584 LeaderStateSnapshot {
1585 next_index: self.next_index.clone(),
1586 match_index: self.match_index.clone(),
1587 noop_log_id: self.noop_log_id,
1588 }
1589 }
1590
1591 pub async fn execute_request_immediately(
1597 &mut self,
1598 raft_request_with_signal: RaftRequestWithSignal,
1599 ctx: &RaftContext<T>,
1600 role_tx: &mpsc::UnboundedSender<RoleEvent>,
1601 ) -> Result<()> {
1602 debug!(
1603 "Leader::execute_request_immediately, request_id: {}",
1604 raft_request_with_signal.id
1605 );
1606
1607 let batch = VecDeque::from([raft_request_with_signal]);
1609 self.process_batch(batch, role_tx, ctx).await?;
1610
1611 Ok(())
1612 }
1613
1614 pub async fn process_batch(
1651 &mut self,
1652 batch: impl IntoIterator<Item = RaftRequestWithSignal>,
1653 role_tx: &mpsc::UnboundedSender<RoleEvent>,
1654 ctx: &RaftContext<T>,
1655 ) -> Result<()> {
1656 self.timer.reset_replication();
1657
1658 let start_index = ctx.raft_log().last_entry_id() + 1;
1659 let (payloads, write_metadata) = Self::merge_batch_to_write_metadata(batch, start_index);
1660
1661 if !payloads.is_empty() {
1662 trace!(?payloads, "[Node-{}] process_batch", ctx.node_id);
1663 }
1664
1665 self.execute_and_process_raft_rpc(payloads, write_metadata, None, ctx, role_tx)
1666 .await
1667 }
1668
1669 async fn send_heartbeat_or_batch(
1673 &mut self,
1674 request: Option<RaftRequestWithSignal>,
1675 role_tx: &mpsc::UnboundedSender<RoleEvent>,
1676 ctx: &RaftContext<T>,
1677 ) -> Result<()> {
1678 self.timer.reset_replication();
1679 match request {
1680 Some(req) => {
1681 let start_index = ctx.raft_log().last_entry_id() + 1;
1683 let (payloads, write_metadata) =
1684 Self::merge_batch_to_write_metadata(std::iter::once(req), start_index);
1685 self.execute_and_process_raft_rpc(payloads, write_metadata, None, ctx, role_tx)
1686 .await
1687 }
1688 None => {
1689 self.execute_and_process_raft_rpc(vec![], None, None, ctx, role_tx).await
1691 }
1692 }
1693 }
1694
1695 #[instrument(skip(self))]
1697 fn update_peer_indexes(
1698 &mut self,
1699 peer_updates: &HashMap<u32, PeerUpdate>,
1700 ) {
1701 for (peer_id, update) in peer_updates {
1702 if let Err(e) = self.update_next_index(*peer_id, update.next_index) {
1703 error!("Failed to update next index: {:?}", e);
1704 }
1705 trace!(
1706 "Updated next index for peer {}-{}",
1707 peer_id, update.next_index
1708 );
1709 if let Some(match_index) = update.match_index {
1710 if let Err(e) = self.update_match_index(*peer_id, match_index) {
1711 error!("Failed to update match index: {:?}", e);
1712 }
1713 trace!("Updated match index for peer {}-{}", peer_id, match_index);
1714 }
1715 }
1716 }
1717
1718 pub async fn check_learner_progress(
1719 &mut self,
1720 learner_progress: &HashMap<u32, Option<u64>>,
1721 ctx: &RaftContext<T>,
1722 role_tx: &mpsc::UnboundedSender<RoleEvent>,
1723 ) -> Result<()> {
1724 debug!(?learner_progress, "check_learner_progress");
1725
1726 if !self.should_check_learner_progress(ctx) {
1727 return Ok(());
1728 }
1729
1730 if learner_progress.is_empty() {
1731 return Ok(());
1732 }
1733
1734 let ready_learners = self.find_promotable_learners(learner_progress, ctx).await;
1735 let new_promotions = self.deduplicate_promotions(ready_learners);
1736
1737 if !new_promotions.is_empty() {
1738 self.enqueue_and_notify_promotions(new_promotions, role_tx)?;
1739 }
1740
1741 Ok(())
1742 }
1743
1744 fn should_check_learner_progress(
1746 &mut self,
1747 ctx: &RaftContext<T>,
1748 ) -> bool {
1749 let throttle_interval =
1750 Duration::from_millis(ctx.node_config().raft.learner_check_throttle_ms);
1751 if self.last_learner_check.elapsed() < throttle_interval {
1752 return false;
1753 }
1754 self.last_learner_check = Instant::now();
1755 true
1756 }
1757
1758 async fn find_promotable_learners(
1760 &self,
1761 learner_progress: &HashMap<u32, Option<u64>>,
1762 ctx: &RaftContext<T>,
1763 ) -> Vec<u32> {
1764 let leader_commit = self.commit_index();
1765 let threshold = ctx.node_config().raft.learner_catchup_threshold;
1766 let membership = ctx.membership();
1767
1768 let mut ready_learners = Vec::new();
1769
1770 for (&node_id, &match_index_opt) in learner_progress.iter() {
1771 if !membership.contains_node(node_id).await {
1772 continue;
1773 }
1774
1775 if !self.is_learner_caught_up(match_index_opt, leader_commit, threshold) {
1776 continue;
1777 }
1778
1779 let node_status =
1780 membership.get_node_status(node_id).await.unwrap_or(NodeStatus::ReadOnly);
1781 if !node_status.is_promotable() {
1782 debug!(
1783 ?node_id,
1784 ?node_status,
1785 "Learner caught up but status is not Promotable, skipping"
1786 );
1787 continue;
1788 }
1789
1790 debug!(
1791 ?node_id,
1792 match_index = ?match_index_opt.unwrap_or(0),
1793 ?leader_commit,
1794 gap = leader_commit.saturating_sub(match_index_opt.unwrap_or(0)),
1795 "Learner caught up"
1796 );
1797 ready_learners.push(node_id);
1798 }
1799
1800 ready_learners
1801 }
1802
1803 fn is_learner_caught_up(
1805 &self,
1806 match_index: Option<u64>,
1807 leader_commit: u64,
1808 threshold: u64,
1809 ) -> bool {
1810 let match_index = match_index.unwrap_or(0);
1811 let gap = leader_commit.saturating_sub(match_index);
1812 gap <= threshold
1813 }
1814
1815 fn deduplicate_promotions(
1817 &self,
1818 ready_learners: Vec<u32>,
1819 ) -> Vec<u32> {
1820 let already_pending: std::collections::HashSet<_> =
1821 self.pending_promotions.iter().map(|p| p.node_id).collect();
1822
1823 ready_learners.into_iter().filter(|id| !already_pending.contains(id)).collect()
1824 }
1825
1826 fn enqueue_and_notify_promotions(
1828 &mut self,
1829 new_promotions: Vec<u32>,
1830 role_tx: &mpsc::UnboundedSender<RoleEvent>,
1831 ) -> Result<()> {
1832 info!(
1833 ?new_promotions,
1834 "Learners caught up, adding to pending promotions"
1835 );
1836
1837 for node_id in new_promotions {
1838 self.pending_promotions
1839 .push_back(PendingPromotion::new(node_id, Instant::now()));
1840 }
1841
1842 role_tx
1843 .send(RoleEvent::ReprocessEvent(Box::new(
1844 RaftEvent::PromoteReadyLearners,
1845 )))
1846 .map_err(|e| {
1847 let error_str = format!("{e:?}");
1848 error!("Failed to send PromoteReadyLearners: {}", error_str);
1849 Error::System(SystemError::Network(NetworkError::SingalSendFailed(
1850 error_str,
1851 )))
1852 })?;
1853
1854 Ok(())
1855 }
1856
1857 #[allow(dead_code)]
1858 pub async fn batch_promote_learners(
1859 &mut self,
1860 ready_learners_ids: Vec<u32>,
1861 ctx: &RaftContext<T>,
1862 role_tx: &mpsc::UnboundedSender<RoleEvent>,
1863 ) -> Result<()> {
1864 debug!("1. Determine optimal promotion status based on quorum safety");
1866 let membership = ctx.membership();
1867 let current_voters = membership.voters().await.len();
1868 let new_active_count = current_voters + ready_learners_ids.len();
1870
1871 trace!(
1873 ?current_voters,
1874 ?ready_learners_ids,
1875 "[Node-{}] new_active_count: {}",
1876 self.node_id(),
1877 new_active_count
1878 );
1879 let target_status = if ensure_safe_join(self.node_id(), new_active_count).is_ok() {
1880 trace!(
1881 "Going to update nodes-{:?} status to Active",
1882 ready_learners_ids
1883 );
1884 NodeStatus::Active
1885 } else {
1886 trace!(
1887 "Not enough quorum to promote learners: {:?}",
1888 ready_learners_ids
1889 );
1890 return Ok(());
1891 };
1892
1893 debug!("2. Create configuration change payload");
1895 let config_change = Change::BatchPromote(BatchPromote {
1896 node_ids: ready_learners_ids.clone(),
1897 new_status: target_status as i32,
1898 });
1899
1900 info!(?config_change, "Replicating cluster config");
1901
1902 debug!("3. Submit single config change for all ready learners");
1904 match self
1906 .verify_leadership_persistent(vec![EntryPayload::config(config_change)], ctx, role_tx)
1907 .await
1908 {
1909 Ok(true) => {
1910 info!(
1911 "Batch promotion committed for nodes: {:?}",
1912 ready_learners_ids
1913 );
1914 }
1915 Ok(false) => {
1916 warn!("Failed to commit batch promotion");
1917 }
1918 Err(e) => {
1919 error!("Batch promotion error: {:?}", e);
1920 return Err(e);
1921 }
1922 }
1923
1924 Ok(())
1925 }
1926
1927 #[instrument(skip(self))]
1929 fn calculate_new_commit_index(
1930 &mut self,
1931 raft_log: &Arc<ROF<T>>,
1932 peer_updates: &HashMap<u32, PeerUpdate>,
1933 ) -> Option<u64> {
1934 let old_commit_index = self.commit_index();
1935 let current_term = self.current_term();
1936
1937 let matched_ids: Vec<u64> =
1938 peer_updates.keys().filter_map(|&id| self.match_index(id)).collect();
1939
1940 let new_commit_index =
1941 raft_log.calculate_majority_matched_index(current_term, old_commit_index, matched_ids);
1942
1943 if new_commit_index.is_some() && new_commit_index.unwrap() > old_commit_index {
1944 new_commit_index
1945 } else {
1946 None
1947 }
1948 }
1949
1950 #[allow(dead_code)]
1951 fn if_update_commit_index(
1952 &self,
1953 new_commit_index_option: Option<u64>,
1954 ) -> (bool, u64) {
1955 let current_commit_index = self.commit_index();
1956 if let Some(new_commit_index) = new_commit_index_option {
1957 debug!("Leader::update_commit_index: {:?}", new_commit_index);
1958 if current_commit_index < new_commit_index {
1959 return (true, new_commit_index);
1960 }
1961 }
1962 debug!("Leader::update_commit_index: false");
1963 (false, current_commit_index)
1964 }
1965
1966 #[doc(hidden)]
1976 pub fn calculate_read_index(&self) -> u64 {
1977 let commit_index = self.commit_index();
1978 let noop_index = self.noop_log_id.unwrap_or(0);
1979 std::cmp::max(commit_index, noop_index)
1980 }
1981
1982 #[doc(hidden)]
1987 pub async fn wait_until_applied(
1988 &self,
1989 target_index: u64,
1990 state_machine_handler: &Arc<SMHOF<T>>,
1991 last_applied: u64,
1992 ) -> Result<()> {
1993 if last_applied < target_index {
1994 state_machine_handler.update_pending(target_index);
1995
1996 let timeout_ms = self.node_config.raft.read_consistency.state_machine_sync_timeout_ms;
1997 state_machine_handler
1998 .wait_applied(target_index, std::time::Duration::from_millis(timeout_ms))
1999 .await?;
2000
2001 debug!("wait_until_applied: target_index={} success", target_index);
2002 }
2003 Ok(())
2004 }
2005
2006 #[instrument(skip(self))]
2007 fn scheduled_purge_upto(
2008 &mut self,
2009 received_last_included: LogId,
2010 ) {
2011 if let Some(existing) = self.scheduled_purge_upto {
2012 if existing.index >= received_last_included.index {
2013 warn!(
2014 ?received_last_included,
2015 ?existing,
2016 "Will not update scheduled_purge_upto, received invalid last_included log"
2017 );
2018 return;
2019 }
2020 }
2021 info!(?self.scheduled_purge_upto, ?received_last_included, "Updte scheduled_purge_upto.");
2022 self.scheduled_purge_upto = Some(received_last_included);
2023 }
2024
2025 fn send_become_follower_event(
2026 &self,
2027 new_leader_id: Option<u32>,
2028 role_tx: &mpsc::UnboundedSender<RoleEvent>,
2029 ) -> Result<()> {
2030 info!(
2031 ?new_leader_id,
2032 "Leader is going to step down as Follower..."
2033 );
2034 role_tx.send(RoleEvent::BecomeFollower(new_leader_id)).map_err(|e| {
2035 let error_str = format!("{e:?}");
2036 error!("Failed to send: {}", error_str);
2037 NetworkError::SingalSendFailed(error_str)
2038 })?;
2039
2040 Ok(())
2041 }
2042
2043 #[instrument(skip(self))]
2069 pub fn can_purge_logs(
2070 &self,
2071 last_purge_index: Option<LogId>,
2072 last_included_in_snapshot: LogId,
2073 ) -> bool {
2074 let commit_index = self.commit_index();
2075 debug!(
2076 ?commit_index,
2077 ?last_purge_index,
2078 ?last_included_in_snapshot,
2079 "can_purge_logs"
2080 );
2081
2082 let monotonic_check = last_purge_index
2083 .map(|lid| lid.index < last_included_in_snapshot.index)
2084 .unwrap_or(true);
2085
2086 last_included_in_snapshot.index < commit_index && monotonic_check
2089 }
2090
2091 pub async fn handle_join_cluster(
2092 &mut self,
2093 join_request: JoinRequest,
2094 sender: MaybeCloneOneshotSender<std::result::Result<JoinResponse, Status>>,
2095 ctx: &RaftContext<T>,
2096 role_tx: &mpsc::UnboundedSender<RoleEvent>,
2097 ) -> Result<()> {
2098 let node_id = join_request.node_id;
2099 let node_role = join_request.node_role;
2100 let address = join_request.address;
2101 let status = join_request.status;
2102 let membership = ctx.membership();
2103
2104 debug!("1. Validate join request");
2106 if membership.contains_node(node_id).await {
2107 let error_msg = format!("Node {node_id} already exists in cluster");
2108 warn!(%error_msg);
2109 return self.send_join_error(sender, MembershipError::NodeAlreadyExists(node_id)).await;
2110 }
2111
2112 debug!("2. Create configuration change payload");
2114 if let Err(e) = membership.can_rejoin(node_id, node_role).await {
2115 let error_msg = format!("Node {node_id} cannot rejoin: {e}",);
2116 warn!(%error_msg);
2117 return self
2118 .send_join_error(sender, MembershipError::JoinClusterError(error_msg))
2119 .await;
2120 }
2121
2122 let config_change = Change::AddNode(AddNode {
2123 node_id,
2124 address: address.clone(),
2125 status,
2126 });
2127
2128 debug!("3. Wait for quorum confirmation");
2130 match self
2132 .verify_leadership_persistent(vec![EntryPayload::config(config_change)], ctx, role_tx)
2133 .await
2134 {
2135 Ok(true) => {
2136 debug!("4. Update node status to Syncing");
2138
2139 debug!(
2140 "After updating, the replications peers: {:?}",
2141 ctx.membership().replication_peers().await
2142 );
2143
2144 debug!("5. Send successful response");
2149 info!("Join config committed for node {}", node_id);
2150 self.send_join_success(node_id, &address, sender, ctx).await?;
2151 }
2152 Ok(false) => {
2153 warn!("Failed to commit join config for node {}", node_id);
2154 self.send_join_error(sender, MembershipError::CommitTimeout).await?
2155 }
2156 Err(e) => {
2157 error!("Error waiting for commit: {:?}", e);
2158 self.send_join_error(sender, e).await?
2159 }
2160 }
2161 Ok(())
2162 }
2163
2164 async fn send_join_success(
2165 &self,
2166 node_id: u32,
2167 address: &str,
2168 sender: MaybeCloneOneshotSender<std::result::Result<JoinResponse, Status>>,
2169 ctx: &RaftContext<T>,
2170 ) -> Result<()> {
2171 let snapshot_metadata = ctx.state_machine_handler().get_latest_snapshot_metadata();
2173
2174 let response = JoinResponse {
2176 success: true,
2177 error: String::new(),
2178 config: Some(
2179 ctx.membership()
2180 .retrieve_cluster_membership_config(self.shared_state().current_leader())
2181 .await,
2182 ),
2183 config_version: ctx.membership().get_cluster_conf_version().await,
2184 snapshot_metadata,
2185 leader_id: self.node_id(),
2186 };
2187
2188 sender.send(Ok(response)).map_err(|e| {
2189 error!("Failed to send join response: {:?}", e);
2190 NetworkError::SingalSendFailed(format!("{e:?}"))
2191 })?;
2192
2193 info!(
2194 "Node {} ({}) successfully added as learner",
2195 node_id, address
2196 );
2197
2198 crate::utils::cluster_printer::print_leader_accepting_new_node(
2200 self.node_id(),
2201 node_id,
2202 address,
2203 d_engine_proto::common::NodeRole::Learner as i32,
2204 );
2205
2206 Ok(())
2207 }
2208
2209 async fn send_join_error(
2210 &self,
2211 sender: MaybeCloneOneshotSender<std::result::Result<JoinResponse, Status>>,
2212 error: impl Into<Error>,
2213 ) -> Result<()> {
2214 let error = error.into();
2215 let status = Status::failed_precondition(error.to_string());
2216
2217 sender.send(Err(status)).map_err(|e| {
2218 error!("Failed to send join error: {:?}", e);
2219 NetworkError::SingalSendFailed(format!("{e:?}"))
2220 })?;
2221
2222 Err(error)
2223 }
2224
2225 #[cfg(any(test, feature = "__test_support"))]
2226 pub fn new(
2227 node_id: u32,
2228 node_config: Arc<RaftNodeConfig>,
2229 ) -> Self {
2230 let ReplicationConfig {
2231 rpc_append_entries_clock_in_ms,
2232 ..
2233 } = node_config.raft.replication;
2234
2235 let batch_size = node_config.raft.batching.max_batch_size;
2236
2237 let enable_batch = node_config.raft.metrics.enable_batch;
2238 let backpressure_metrics = if node_config.raft.metrics.enable_backpressure {
2240 Some(Arc::new(BackpressureMetrics::new(
2241 node_id,
2242 true,
2243 node_config.raft.metrics.sample_rate,
2244 )))
2245 } else {
2246 None
2247 };
2248
2249 LeaderState {
2250 cluster_metadata: ClusterMetadata {
2251 single_voter: false,
2252 total_voters: 0,
2253 replication_targets: vec![],
2254 },
2255 shared_state: SharedState::new(node_id, None, None),
2256 timer: Box::new(ReplicationTimer::new(rpc_append_entries_clock_in_ms)),
2257 next_index: HashMap::new(),
2258 match_index: HashMap::new(),
2259 noop_log_id: None,
2260
2261 propose_buffer: Box::new(ProposeBatchBuffer::new(batch_size).with_length_gauge(
2262 node_id,
2263 "propose",
2264 enable_batch,
2265 )),
2266
2267 node_config,
2268 scheduled_purge_upto: None,
2269 last_purged_index: None, last_learner_check: Instant::now(),
2271 snapshot_in_progress: AtomicBool::new(false),
2272 next_membership_maintenance_check: Instant::now(),
2273 pending_promotions: VecDeque::new(),
2274 lease_timestamp: AtomicU64::new(0),
2275 linearizable_read_buffer: Box::new(BatchBuffer::new(batch_size).with_length_gauge(
2276 node_id,
2277 "linearizable",
2278 enable_batch,
2279 )),
2280 lease_read_queue: VecDeque::new(),
2281 eventual_read_queue: VecDeque::new(),
2282 pending_requests: HashMap::new(),
2283 pending_reads: BTreeMap::new(),
2284 backpressure_metrics,
2285 _marker: PhantomData,
2286 }
2287 }
2288
2289 pub async fn trigger_background_snapshot(
2290 node_id: u32,
2291 metadata: SnapshotMetadata,
2292 state_machine_handler: Arc<SMHOF<T>>,
2293 membership: Arc<MOF<T>>,
2294 config: SnapshotConfig,
2295 ) -> Result<()> {
2296 let (result_tx, result_rx) = oneshot::channel();
2297
2298 tokio::task::spawn_blocking(move || {
2300 let rt = tokio::runtime::Handle::current();
2301 let result = rt.block_on(async move {
2302 let bulk_channel = membership
2303 .get_peer_channel(node_id, ConnectionType::Bulk)
2304 .await
2305 .ok_or(NetworkError::PeerConnectionNotFound(node_id))?;
2306
2307 let data_stream =
2308 state_machine_handler.load_snapshot_data(metadata.clone()).await?;
2309
2310 BackgroundSnapshotTransfer::<T>::run_push_transfer(
2311 node_id,
2312 data_stream,
2313 bulk_channel,
2314 config,
2315 )
2316 .await
2317 });
2318
2319 let _ = result_tx.send(result);
2321 });
2322
2323 tokio::spawn(async move {
2325 match result_rx.await {
2326 Ok(Ok(_)) => info!("Snapshot to {} completed", node_id),
2327 Ok(Err(e)) => error!("Snapshot to {} failed: {:?}", node_id, e),
2328 Err(_) => warn!("Snapshot result channel closed unexpectedly"),
2329 }
2330 });
2331
2332 Ok(())
2333 }
2334
2335 pub async fn process_pending_promotions(
2337 &mut self,
2338 ctx: &RaftContext<T>,
2339 role_tx: &mpsc::UnboundedSender<RoleEvent>,
2340 ) -> Result<()> {
2341 debug!(
2342 "[Leader {}] 🔄 process_pending_promotions called, pending: {:?}",
2343 self.node_id(),
2344 self.pending_promotions.iter().map(|p| p.node_id).collect::<Vec<_>>()
2345 );
2346
2347 let config = &ctx.node_config().raft.membership.promotion;
2349
2350 let now = Instant::now();
2352 self.pending_promotions.retain(|entry| {
2353 now.duration_since(entry.ready_since) <= config.stale_learner_threshold
2354 });
2355
2356 if self.pending_promotions.is_empty() {
2357 debug!(
2358 "[Leader {}] ❌ pending_promotions is empty after stale cleanup",
2359 self.node_id()
2360 );
2361 return Ok(());
2362 }
2363
2364 let membership = ctx.membership();
2366 let current_voters = membership.voters().await.len() + 1; debug!(
2368 "[Leader {}] 📊 current_voters: {}, pending: {}",
2369 self.node_id(),
2370 current_voters,
2371 self.pending_promotions.len()
2372 );
2373
2374 let max_batch_size =
2376 calculate_safe_batch_size(current_voters, self.pending_promotions.len());
2377 debug!(
2378 "[Leader {}] 🎯 max_batch_size: {}",
2379 self.node_id(),
2380 max_batch_size
2381 );
2382
2383 if max_batch_size == 0 {
2384 debug!(
2386 "[Leader {}] ⚠️ max_batch_size is 0, cannot promote now",
2387 self.node_id()
2388 );
2389 return Ok(());
2390 }
2391
2392 let promotion_entries = self.drain_batch(max_batch_size);
2394 let promotion_node_ids = promotion_entries.iter().map(|e| e.node_id).collect::<Vec<_>>();
2395
2396 if !promotion_node_ids.is_empty() {
2398 info!(
2400 "Promoting learner batch of {} nodes: {:?} (total voters: {} -> {})",
2401 promotion_node_ids.len(),
2402 promotion_node_ids,
2403 current_voters,
2404 current_voters + promotion_node_ids.len()
2405 );
2406
2407 let result = self.safe_batch_promote(promotion_node_ids.clone(), ctx, role_tx).await;
2409
2410 if let Err(e) = result {
2411 for entry in promotion_entries.into_iter().rev() {
2413 self.pending_promotions.push_front(entry);
2414 }
2415 return Err(e);
2416 }
2417
2418 info!(
2419 "Promotion successful. Cluster members: {:?}",
2420 membership.voters().await
2421 );
2422 }
2423
2424 trace!(
2425 ?self.pending_promotions,
2426 "Step 6: Reschedule if any pending promotions remain"
2427 );
2428 if !self.pending_promotions.is_empty() {
2430 debug!(
2431 "[Leader {}] 🔁 Re-sending PromoteReadyLearners for remaining pending: {:?}",
2432 self.node_id(),
2433 self.pending_promotions.iter().map(|p| p.node_id).collect::<Vec<_>>()
2434 );
2435 role_tx
2437 .send(RoleEvent::ReprocessEvent(Box::new(
2438 RaftEvent::PromoteReadyLearners,
2439 )))
2440 .map_err(|e| {
2441 let error_str = format!("{e:?}");
2442 error!("Send PromoteReadyLearners event failed: {}", error_str);
2443 NetworkError::SingalSendFailed(error_str)
2444 })?;
2445 }
2446
2447 Ok(())
2448 }
2449
2450 pub(super) fn drain_batch(
2452 &mut self,
2453 count: usize,
2454 ) -> Vec<PendingPromotion> {
2455 let mut batch = Vec::with_capacity(count);
2456 for _ in 0..count {
2457 if let Some(entry) = self.pending_promotions.pop_front() {
2458 batch.push(entry);
2459 } else {
2460 break;
2461 }
2462 }
2463 batch
2464 }
2465 pub(super) fn merge_batch_to_write_metadata(
2471 batch: impl IntoIterator<Item = RaftRequestWithSignal>,
2472 start_idx: u64,
2473 ) -> (Vec<EntryPayload>, Option<WriteMetadata>) {
2474 let mut all_payloads = Vec::new();
2475 let mut all_senders = Vec::new();
2476 let mut any_wait_for_apply = false;
2477
2478 for mut req in batch {
2479 all_payloads.extend(std::mem::take(&mut req.payloads));
2480 all_senders.extend(req.senders);
2481 any_wait_for_apply |= req.wait_for_apply_event;
2482 }
2483
2484 if all_payloads.is_empty() && all_senders.is_empty() {
2485 return (all_payloads, None);
2486 }
2487
2488 (
2489 all_payloads,
2490 Some((start_idx, all_senders, any_wait_for_apply)),
2491 )
2492 }
2493
2494 async fn execute_and_process_raft_rpc(
2499 &mut self,
2500 payloads: Vec<EntryPayload>,
2501 write_metadata: Option<WriteMetadata>,
2502 read_batch: Option<Vec<LinearizableReadRequest>>,
2503 ctx: &RaftContext<T>,
2504 role_tx: &mpsc::UnboundedSender<RoleEvent>,
2505 ) -> Result<()> {
2506 trace!(
2507 cluster_size = self.cluster_metadata.total_voters,
2508 payload_count = payloads.len(),
2509 );
2510
2511 let result = ctx
2512 .replication_handler()
2513 .handle_raft_request_in_batch(
2514 payloads,
2515 self.state_snapshot(),
2516 self.leader_state_snapshot(),
2517 &self.cluster_metadata,
2518 ctx,
2519 )
2520 .await;
2521
2522 debug!(?result, "execute_and_process_raft_rpc");
2523
2524 match result {
2525 Ok(AppendResults {
2526 commit_quorum_achieved: true,
2527 peer_updates,
2528 learner_progress,
2529 }) => {
2530 self.update_lease_timestamp();
2531 self.update_peer_indexes(&peer_updates);
2532 if let Err(e) = self.check_learner_progress(&learner_progress, ctx, role_tx).await {
2533 error!(?e, "check_learner_progress failed");
2534 }
2535
2536 let new_commit_index = if self.cluster_metadata.single_voter {
2537 let last_log_index = ctx.raft_log().last_entry_id();
2538 if last_log_index > self.commit_index() {
2539 Some(last_log_index)
2540 } else {
2541 None
2542 }
2543 } else {
2544 self.calculate_new_commit_index(ctx.raft_log(), &peer_updates)
2545 };
2546
2547 if let Some(new_commit_index) = new_commit_index {
2548 debug!(
2549 "[Leader-{}] New commit acknowledged: {}",
2550 self.node_id(),
2551 new_commit_index
2552 );
2553 self.update_commit_index_with_signal(
2554 Leader as i32,
2555 self.current_term(),
2556 new_commit_index,
2557 role_tx,
2558 )?;
2559 }
2560
2561 if let Some((start_idx, senders, wait_for_apply)) = write_metadata {
2562 if wait_for_apply {
2563 for (i, sender) in senders.into_iter().enumerate() {
2564 self.pending_requests.insert(start_idx + i as u64, sender);
2565 }
2566 } else {
2567 for sender in senders {
2568 let _ = sender.send(Ok(ClientResponse::write_success()));
2569 }
2570 }
2571 }
2572
2573 if let Some(read_batch) = read_batch {
2574 let read_index = self.calculate_read_index();
2575 let last_applied = ctx.state_machine().last_applied().index;
2576 if last_applied >= read_index {
2577 self.execute_pending_reads(read_batch, ctx);
2579 } else {
2580 self.pending_reads.entry(read_index).or_default().extend(read_batch);
2582 }
2583 }
2584
2585 Ok(())
2586 }
2587
2588 Ok(AppendResults {
2589 commit_quorum_achieved: false,
2590 peer_updates,
2591 learner_progress,
2592 }) => {
2593 self.update_peer_indexes(&peer_updates);
2594 if let Err(e) = self.check_learner_progress(&learner_progress, ctx, role_tx).await {
2595 error!(?e, "check_learner_progress failed");
2596 }
2597
2598 let responses_received = peer_updates.len();
2599 let cluster_size = self.cluster_metadata.total_voters;
2600 let error_code = if is_majority(responses_received, cluster_size) {
2601 ErrorCode::RetryRequired
2602 } else {
2603 ErrorCode::ProposeFailed
2604 };
2605
2606 if let Some((_, senders, _)) = write_metadata {
2607 for sender in senders {
2608 let _ = sender.send(Ok(ClientResponse::client_error(error_code)));
2609 }
2610 }
2611 if let Some(read_batch) = read_batch {
2612 let status = tonic::Status::failed_precondition("Quorum not reached");
2613 for (_, sender) in read_batch {
2614 let _ = sender.send(Err(status.clone()));
2615 }
2616 }
2617
2618 Ok(())
2619 }
2620
2621 Err(Error::Consensus(ConsensusError::Replication(ReplicationError::HigherTerm(
2622 higher_term,
2623 )))) => {
2624 warn!("Higher term detected: {}", higher_term);
2625 self.update_current_term(higher_term);
2626 self.send_become_follower_event(None, role_tx)?;
2627
2628 if let Some((_, senders, _)) = write_metadata {
2629 for sender in senders {
2630 let _ =
2631 sender.send(Ok(ClientResponse::client_error(ErrorCode::TermOutdated)));
2632 }
2633 }
2634 if let Some(read_batch) = read_batch {
2635 let status = tonic::Status::failed_precondition("Term outdated");
2636 for (_, sender) in read_batch {
2637 let _ = sender.send(Err(status.clone()));
2638 }
2639 }
2640
2641 Err(ReplicationError::HigherTerm(higher_term).into())
2642 }
2643
2644 Err(e) => {
2645 error!("RPC failed: {:?}", e);
2646
2647 if let Some((_, senders, _)) = write_metadata {
2648 for sender in senders {
2649 let _ =
2650 sender.send(Ok(ClientResponse::client_error(ErrorCode::ProposeFailed)));
2651 }
2652 }
2653 if let Some(read_batch) = read_batch {
2654 let error_msg = format!("RPC failed: {e}");
2655 let status = tonic::Status::failed_precondition(error_msg);
2656 for (_, sender) in read_batch {
2657 let _ = sender.send(Err(status.clone()));
2658 }
2659 }
2660
2661 Err(e)
2662 }
2663 }
2664 }
2665
2666 async fn safe_batch_promote(
2667 &mut self,
2668 batch: Vec<u32>,
2669 ctx: &RaftContext<T>,
2670 role_tx: &mpsc::UnboundedSender<RoleEvent>,
2671 ) -> Result<()> {
2672 let change = Change::BatchPromote(BatchPromote {
2673 node_ids: batch.clone(),
2674 new_status: NodeStatus::Active as i32,
2675 });
2676
2677 self.verify_leadership_persistent(vec![EntryPayload::config(change)], ctx, role_tx)
2680 .await?;
2681
2682 Ok(())
2683 }
2684
2685 async fn run_periodic_maintenance(
2686 &mut self,
2687 role_tx: &mpsc::UnboundedSender<RoleEvent>,
2688 ctx: &RaftContext<T>,
2689 ) -> Result<()> {
2690 if let Err(e) = self.conditionally_purge_stale_learners(role_tx, ctx).await {
2691 error!("Stale learner purge failed: {}", e);
2692 }
2693
2694 if let Err(e) = self.conditionally_purge_zombie_nodes(role_tx, ctx).await {
2695 error!("Zombie node purge failed: {}", e);
2696 }
2697
2698 self.reset_next_membership_maintenance_check(
2701 ctx.node_config().raft.membership.membership_maintenance_interval,
2702 );
2703 Ok(())
2704 }
2705
2706 pub async fn conditionally_purge_stale_learners(
2710 &mut self,
2711 role_tx: &mpsc::UnboundedSender<RoleEvent>,
2712 ctx: &RaftContext<T>,
2713 ) -> Result<()> {
2714 let config = &ctx.node_config.raft.membership.promotion;
2715
2716 if self.pending_promotions.is_empty()
2718 || self.next_membership_maintenance_check > Instant::now()
2719 {
2720 trace!("Skipping stale learner check");
2721 return Ok(());
2722 }
2723
2724 let now = Instant::now();
2725 let queue_len = self.pending_promotions.len();
2726
2727 let inspect_count = queue_len.min(100).min(1.max(queue_len / 100));
2729 let mut stale_entries = Vec::new();
2730
2731 trace!("Inspecting {} entries", inspect_count);
2732 for _ in 0..inspect_count {
2733 if let Some(entry) = self.pending_promotions.pop_front() {
2734 trace!(
2735 "Inspecting entry: {:?} - {:?} - {:?}",
2736 entry,
2737 now.duration_since(entry.ready_since),
2738 &config.stale_learner_threshold
2739 );
2740 if now.duration_since(entry.ready_since) > config.stale_learner_threshold {
2741 stale_entries.push(entry);
2742 } else {
2743 self.pending_promotions.push_front(entry);
2745 break;
2746 }
2747 } else {
2748 break;
2749 }
2750 }
2751
2752 trace!("Stale learner check completed: {:?}", stale_entries);
2753
2754 for entry in stale_entries {
2756 if let Err(e) = self.handle_stale_learner(entry.node_id, role_tx, ctx).await {
2757 error!("Failed to handle stale learner: {}", e);
2758 }
2759 }
2760
2761 Ok(())
2762 }
2763
2764 async fn conditionally_purge_zombie_nodes(
2766 &mut self,
2767 role_tx: &mpsc::UnboundedSender<RoleEvent>,
2768 ctx: &RaftContext<T>,
2769 ) -> Result<()> {
2770 let membership = ctx.membership();
2772 let zombie_candidates = membership.get_zombie_candidates().await;
2773 let mut nodes_to_remove = Vec::new();
2774
2775 for node_id in zombie_candidates {
2776 if let Some(status) = membership.get_node_status(node_id).await {
2777 if status != NodeStatus::Active {
2778 nodes_to_remove.push(node_id);
2779 }
2780 }
2781 }
2782 if !nodes_to_remove.is_empty() {
2784 let change = Change::BatchRemove(BatchRemove {
2785 node_ids: nodes_to_remove.clone(),
2786 });
2787
2788 info!(
2789 "Proposing batch removal of zombie nodes: {:?}",
2790 nodes_to_remove
2791 );
2792
2793 match self
2796 .verify_leadership_persistent(vec![EntryPayload::config(change)], ctx, role_tx)
2797 .await
2798 {
2799 Ok(true) => {
2800 info!("Batch removal committed for nodes: {:?}", nodes_to_remove);
2801 }
2802 Ok(false) => {
2803 warn!("Failed to commit batch removal");
2804 }
2805 Err(e) => {
2806 error!("Batch removal error: {:?}", e);
2807 return Err(e);
2808 }
2809 }
2810 }
2811
2812 Ok(())
2813 }
2814
2815 pub fn reset_next_membership_maintenance_check(
2816 &mut self,
2817 membership_maintenance_interval: Duration,
2818 ) {
2819 self.next_membership_maintenance_check = Instant::now() + membership_maintenance_interval;
2820 }
2821
2822 pub async fn handle_stale_learner(
2824 &mut self,
2825 node_id: u32,
2826 role_tx: &mpsc::UnboundedSender<RoleEvent>,
2827 ctx: &RaftContext<T>,
2828 ) -> Result<()> {
2829 warn!(
2831 "Learner {} is stalled, removing from cluster via consensus",
2832 node_id
2833 );
2834
2835 let change = Change::BatchRemove(BatchRemove {
2836 node_ids: vec![node_id],
2837 });
2838
2839 match self
2842 .verify_leadership_persistent(vec![EntryPayload::config(change)], ctx, role_tx)
2843 .await
2844 {
2845 Ok(true) => {
2846 info!(
2847 "Stalled learner {} successfully removed from cluster",
2848 node_id
2849 );
2850 }
2851 Ok(false) => {
2852 warn!("Failed to commit removal of stalled learner {}", node_id);
2853 }
2854 Err(e) => {
2855 error!("Error removing stalled learner {}: {:?}", node_id, e);
2856 return Err(e);
2857 }
2858 }
2859
2860 Ok(())
2861 }
2862
2863 pub fn is_lease_valid(
2865 &self,
2866 ctx: &RaftContext<T>,
2867 ) -> bool {
2868 let now = std::time::SystemTime::now()
2869 .duration_since(std::time::UNIX_EPOCH)
2870 .unwrap_or_default()
2871 .as_millis() as u64;
2872
2873 let last_confirmed = self.lease_timestamp.load(std::sync::atomic::Ordering::Acquire);
2874 let lease_duration = ctx.node_config().raft.read_consistency.lease_duration_ms;
2875
2876 if now < last_confirmed {
2877 error!("Clock moved backwards: Now {now}, Last Confirmed {last_confirmed}");
2879 return false;
2880 }
2881
2882 if now == last_confirmed {
2884 return true;
2885 }
2886 (now - last_confirmed) < lease_duration
2887 }
2888
2889 fn update_lease_timestamp(&self) {
2891 let now = std::time::SystemTime::now()
2892 .duration_since(std::time::UNIX_EPOCH)
2893 .unwrap_or_default()
2894 .as_millis() as u64;
2895
2896 self.lease_timestamp.store(now, std::sync::atomic::Ordering::Release);
2897 }
2898
2899 async fn verify_leadership_and_refresh_lease(
2905 &mut self,
2906 ctx: &RaftContext<T>,
2907 role_tx: &mpsc::UnboundedSender<RoleEvent>,
2908 ) -> Result<()> {
2909 match self.verify_internal_quorum(vec![], ctx, role_tx).await {
2910 Ok(QuorumVerificationResult::Success) => Ok(()),
2911 Ok(QuorumVerificationResult::LeadershipLost) => Err(ConsensusError::Replication(
2912 ReplicationError::NotLeader { leader_id: None },
2913 )
2914 .into()),
2915 Ok(QuorumVerificationResult::RetryRequired) => {
2916 Err(ConsensusError::Replication(ReplicationError::QuorumNotReached).into())
2917 }
2918 Err(e) => Err(e),
2919 }
2920 }
2921
2922 pub(super) async fn unified_write_and_linear_read(
2926 &mut self,
2927 ctx: &RaftContext<T>,
2928 role_tx: &mpsc::UnboundedSender<RoleEvent>,
2929 ) -> Result<()> {
2930 let (payloads, write_metadata) = if !self.propose_buffer.is_empty() {
2932 if let Some(req) = self.propose_buffer.flush() {
2933 let start_idx = ctx.raft_log().last_entry_id() + 1;
2934 (
2935 req.payloads,
2936 Some((start_idx, req.senders, req.wait_for_apply_event)),
2937 )
2938 } else {
2939 (Vec::new(), None)
2940 }
2941 } else {
2942 (Vec::new(), None)
2943 };
2944
2945 let read_batch = if !self.linearizable_read_buffer.is_empty() {
2947 Some(self.linearizable_read_buffer.take_all())
2948 } else {
2949 None
2950 };
2951
2952 self.execute_and_process_raft_rpc(payloads, write_metadata, read_batch, ctx, role_tx)
2953 .await
2954 }
2955
2956 fn execute_pending_reads(
2959 &self,
2960 read_batch: impl IntoIterator<Item = LinearizableReadRequest>,
2961 ctx: &RaftContext<T>,
2962 ) {
2963 for (req, sender) in read_batch {
2964 let results = ctx
2965 .handlers
2966 .state_machine_handler
2967 .read_from_state_machine(req.keys)
2968 .unwrap_or_default();
2969 let _ = sender.send(Ok(ClientResponse::read_results(results)));
2970 }
2971 }
2972
2973 #[cfg(test)]
2974 pub(crate) fn test_update_lease_timestamp(&mut self) {
2975 self.update_lease_timestamp();
2976 }
2977
2978 pub(super) fn determine_read_policy(
2980 &self,
2981 req: &ClientReadRequest,
2982 ) -> ServerReadConsistencyPolicy {
2983 use d_engine_proto::client::ReadConsistencyPolicy as ClientReadConsistencyPolicy;
2984
2985 if req.has_consistency_policy() {
2986 if self.node_config.raft.read_consistency.allow_client_override {
2988 match req.consistency_policy() {
2989 ClientReadConsistencyPolicy::LeaseRead => {
2990 ServerReadConsistencyPolicy::LeaseRead
2991 }
2992 ClientReadConsistencyPolicy::LinearizableRead => {
2993 ServerReadConsistencyPolicy::LinearizableRead
2994 }
2995 ClientReadConsistencyPolicy::EventualConsistency => {
2996 ServerReadConsistencyPolicy::EventualConsistency
2997 }
2998 }
2999 } else {
3000 self.node_config.raft.read_consistency.default_policy.clone()
3002 }
3003 } else {
3004 self.node_config.raft.read_consistency.default_policy.clone()
3006 }
3007 }
3008
3009 pub(super) async fn process_lease_read(
3011 &mut self,
3012 req: ClientReadRequest,
3013 sender: MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
3014 ctx: &RaftContext<T>,
3015 role_tx: &mpsc::UnboundedSender<RoleEvent>,
3016 ) -> Result<()> {
3017 if self.is_lease_valid(ctx) {
3018 let results = ctx
3020 .handlers
3021 .state_machine_handler
3022 .read_from_state_machine(req.keys)
3023 .unwrap_or_default();
3024 let response = ClientResponse::read_results(results);
3025 let _ = sender.send(Ok(response));
3026 } else {
3027 if let Err(e) = self.verify_leadership_and_refresh_lease(ctx, role_tx).await {
3029 warn!("[Leader] Lease read: leadership verification failed: {e}");
3030 if sender
3031 .send(Err(tonic::Status::unavailable(format!(
3032 "Leadership verification failed: {e}"
3033 ))))
3034 .is_err()
3035 {
3036 warn!(
3037 "[Leader] Lease read: client already disconnected before error response could be sent"
3038 );
3039 }
3040 return Err(e);
3041 }
3042 let results = ctx
3043 .handlers
3044 .state_machine_handler
3045 .read_from_state_machine(req.keys)
3046 .unwrap_or_default();
3047 let response = ClientResponse::read_results(results);
3048 let _ = sender.send(Ok(response));
3049 }
3050 Ok(())
3051 }
3052
3053 fn process_eventual_read(
3055 &mut self,
3056 req: ClientReadRequest,
3057 sender: MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
3058 ctx: &RaftContext<T>,
3059 ) {
3060 let results = ctx
3062 .handlers
3063 .state_machine_handler
3064 .read_from_state_machine(req.keys)
3065 .unwrap_or_default();
3066 let response = ClientResponse::read_results(results);
3067 let _ = sender.send(Ok(response));
3068 }
3069}
3070
3071impl<T: TypeConfig> From<&CandidateState<T>> for LeaderState<T> {
3072 fn from(candidate: &CandidateState<T>) -> Self {
3073 let ReplicationConfig {
3074 rpc_append_entries_clock_in_ms,
3075 ..
3076 } = candidate.node_config.raft.replication;
3077
3078 let shared_state = candidate.shared_state.clone();
3080 shared_state.set_current_leader(candidate.node_id());
3081
3082 let backpressure_metrics = if candidate.node_config.raft.metrics.enable_backpressure {
3084 Some(Arc::new(BackpressureMetrics::new(
3085 candidate.node_id(),
3086 true,
3087 candidate.node_config.raft.metrics.sample_rate,
3088 )))
3089 } else {
3090 None
3091 };
3092
3093 Self {
3094 shared_state,
3095 timer: Box::new(ReplicationTimer::new(rpc_append_entries_clock_in_ms)),
3096 next_index: HashMap::new(),
3097 match_index: HashMap::new(),
3098 noop_log_id: None,
3099
3100 propose_buffer: Box::new(
3101 ProposeBatchBuffer::new(candidate.node_config.raft.batching.max_batch_size)
3102 .with_length_gauge(
3103 candidate.node_id(),
3104 "propose",
3105 candidate.node_config.raft.metrics.enable_batch,
3106 ),
3107 ),
3108
3109 node_config: candidate.node_config.clone(),
3110
3111 scheduled_purge_upto: None,
3112 last_purged_index: candidate.last_purged_index,
3113 last_learner_check: Instant::now(),
3114 snapshot_in_progress: AtomicBool::new(false),
3115 next_membership_maintenance_check: Instant::now(),
3116 pending_promotions: VecDeque::new(),
3117 cluster_metadata: ClusterMetadata {
3118 single_voter: false,
3119 total_voters: 0,
3120 replication_targets: vec![],
3121 },
3122 lease_timestamp: AtomicU64::new(0),
3123 linearizable_read_buffer: Box::new(
3124 BatchBuffer::new(candidate.node_config.raft.batching.max_batch_size)
3125 .with_length_gauge(
3126 candidate.node_id(),
3127 "linearizable",
3128 candidate.node_config.raft.metrics.enable_batch,
3129 ),
3130 ),
3131 lease_read_queue: VecDeque::new(),
3132 eventual_read_queue: VecDeque::new(),
3133 pending_requests: HashMap::new(),
3134 pending_reads: BTreeMap::new(),
3135 backpressure_metrics,
3136 _marker: PhantomData,
3137 }
3138 }
3139}
3140
3141impl<T: TypeConfig> Debug for LeaderState<T> {
3142 fn fmt(
3143 &self,
3144 f: &mut std::fmt::Formatter<'_>,
3145 ) -> std::fmt::Result {
3146 f.debug_struct("LeaderState")
3147 .field("shared_state", &self.shared_state)
3148 .field("next_index", &self.next_index)
3149 .field("match_index", &self.match_index)
3150 .field("noop_log_id", &self.noop_log_id)
3151 .finish()
3152 }
3153}
3154
3155pub fn calculate_safe_batch_size(
3163 current: usize,
3164 available: usize,
3165) -> usize {
3166 if (current + available) % 2 == 1 {
3167 available
3169 } else {
3170 available.saturating_sub(1)
3173 }
3174}