1use std::collections::HashMap;
2use std::collections::VecDeque;
3use std::fmt::Debug;
4use std::marker::PhantomData;
5use std::sync::Arc;
6use std::sync::atomic::AtomicBool;
7use std::sync::atomic::AtomicU64;
8use std::sync::atomic::Ordering;
9use std::time::Duration;
10
11use d_engine_proto::client::ClientReadRequest;
12use d_engine_proto::client::ClientResponse;
13use d_engine_proto::client::ReadConsistencyPolicy as ClientReadConsistencyPolicy;
14use d_engine_proto::common::AddNode;
15use d_engine_proto::common::BatchPromote;
16use d_engine_proto::common::BatchRemove;
17use d_engine_proto::common::EntryPayload;
18use d_engine_proto::common::LogId;
19use d_engine_proto::common::NodeRole::Leader;
20use d_engine_proto::common::NodeStatus;
21use d_engine_proto::common::membership_change::Change;
22use d_engine_proto::error::ErrorCode;
23use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
24use d_engine_proto::server::cluster::JoinRequest;
25use d_engine_proto::server::cluster::JoinResponse;
26use d_engine_proto::server::cluster::LeaderDiscoveryResponse;
27use d_engine_proto::server::cluster::NodeMeta;
28use d_engine_proto::server::election::VoteResponse;
29use d_engine_proto::server::election::VotedFor;
30use d_engine_proto::server::replication::AppendEntriesResponse;
31use d_engine_proto::server::storage::PurgeLogRequest;
32use d_engine_proto::server::storage::PurgeLogResponse;
33use d_engine_proto::server::storage::SnapshotChunk;
34use d_engine_proto::server::storage::SnapshotMetadata;
35use nanoid::nanoid;
36use tokio::sync::mpsc;
37use tokio::sync::oneshot;
38use tokio::time::Instant;
39use tokio::time::sleep;
40use tokio::time::timeout;
41use tonic::Status;
42use tonic::async_trait;
43use tracing::debug;
44use tracing::error;
45use tracing::info;
46use tracing::instrument;
47use tracing::trace;
48use tracing::warn;
49
50use super::LeaderStateSnapshot;
51use super::RaftRole;
52use super::SharedState;
53use super::StateSnapshot;
54use super::candidate_state::CandidateState;
55use super::role_state::RaftRoleState;
56use crate::AppendResults;
57use crate::BackgroundSnapshotTransfer;
58use crate::BatchBuffer;
59use crate::ConnectionType;
60use crate::ConsensusError;
61use crate::Error;
62use crate::MaybeCloneOneshot;
63use crate::MaybeCloneOneshotSender;
64use crate::Membership;
65use crate::MembershipError;
66use crate::NetworkError;
67use crate::PeerUpdate;
68use crate::PurgeExecutor;
69use crate::QuorumVerificationResult;
70use crate::RaftContext;
71use crate::RaftEvent;
72use crate::RaftLog;
73use crate::RaftNodeConfig;
74use crate::RaftOneshot;
75use crate::RaftRequestWithSignal;
76use crate::ReadConsistencyPolicy as ServerReadConsistencyPolicy;
77use crate::ReplicationConfig;
78use crate::ReplicationCore;
79use crate::ReplicationError;
80use crate::ReplicationTimer;
81use crate::Result;
82use crate::RoleEvent;
83use crate::SnapshotConfig;
84use crate::StateMachine;
85use crate::StateMachineHandler;
86use crate::StateTransitionError;
87use crate::SystemError;
88use crate::Transport;
89use crate::TypeConfig;
90use crate::alias::MOF;
91use crate::alias::ROF;
92use crate::alias::SMHOF;
93use crate::client_command_to_entry_payloads;
94use crate::cluster::is_majority;
95use crate::ensure_safe_join;
96use crate::scoped_timer::ScopedTimer;
97use crate::stream::create_production_snapshot_stream;
98use crate::utils::cluster::error;
99
100#[derive(Debug, Clone)]
102pub struct PendingPromotion {
103 pub node_id: u32,
104 pub ready_since: Instant,
105}
106
107impl PendingPromotion {
108 pub fn new(
109 node_id: u32,
110 ready_since: Instant,
111 ) -> Self {
112 PendingPromotion {
113 node_id,
114 ready_since,
115 }
116 }
117}
118
119#[derive(Debug, Clone)]
124pub struct ClusterMetadata {
125 pub single_voter: bool,
127 pub total_voters: usize,
129 pub replication_targets: Vec<NodeMeta>,
131}
132
133pub struct LeaderState<T: TypeConfig> {
141 pub shared_state: SharedState,
144
145 pub next_index: HashMap<u32, u64>,
150
151 pub(super) match_index: HashMap<u32, u64>,
156
157 #[doc(hidden)]
160 pub noop_log_id: Option<u64>,
161
162 pub scheduled_purge_upto: Option<LogId>,
172
173 pub last_purged_index: Option<LogId>,
184
185 pub peer_purge_progress: HashMap<u32, u64>,
191
192 pub snapshot_in_progress: AtomicBool,
194
195 batch_buffer: Box<BatchBuffer<RaftRequestWithSignal>>,
202
203 timer: Box<ReplicationTimer>,
211
212 pub(super) node_config: Arc<RaftNodeConfig>,
220
221 pub last_learner_check: Instant,
223
224 pub next_membership_maintenance_check: Instant,
238
239 pub pending_promotions: VecDeque<PendingPromotion>,
241
242 pub(super) lease_timestamp: AtomicU64,
245
246 pub(super) cluster_metadata: ClusterMetadata,
250
251 pub(super) read_buffer: Vec<(
253 ClientReadRequest,
254 MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
255 )>,
256
257 pub(super) read_buffer_start_time: Option<Instant>,
259
260 _marker: PhantomData<T>,
263}
264
265#[async_trait]
266impl<T: TypeConfig> RaftRoleState for LeaderState<T> {
267 type T = T;
268
269 fn shared_state(&self) -> &SharedState {
270 &self.shared_state
271 }
272
273 fn shared_state_mut(&mut self) -> &mut SharedState {
274 &mut self.shared_state
275 }
276
277 #[tracing::instrument]
281 fn update_commit_index(
282 &mut self,
283 new_commit_index: u64,
284 ) -> Result<()> {
285 if self.commit_index() < new_commit_index {
286 debug!("update_commit_index to: {:?}", new_commit_index);
287 self.shared_state.commit_index = new_commit_index;
288 } else {
289 warn!(
290 "Illegal operation, might be a bug! I am Leader old_commit_index({}) >= new_commit_index:({})",
291 self.commit_index(),
292 new_commit_index
293 )
294 }
295 Ok(())
296 }
297
298 fn voted_for(&self) -> Result<Option<VotedFor>> {
300 self.shared_state().voted_for()
301 }
302
303 fn update_voted_for(
306 &mut self,
307 voted_for: VotedFor,
308 ) -> Result<bool> {
309 self.shared_state_mut().update_voted_for(voted_for)
310 }
311
312 fn next_index(
313 &self,
314 node_id: u32,
315 ) -> Option<u64> {
316 Some(if let Some(n) = self.next_index.get(&node_id) {
317 *n
318 } else {
319 1
320 })
321 }
322
323 fn update_next_index(
324 &mut self,
325 node_id: u32,
326 new_next_id: u64,
327 ) -> Result<()> {
328 debug!("update_next_index({}) to {}", node_id, new_next_id);
329 self.next_index.insert(node_id, new_next_id);
330 Ok(())
331 }
332
333 fn update_match_index(
334 &mut self,
335 node_id: u32,
336 new_match_id: u64,
337 ) -> Result<()> {
338 self.match_index.insert(node_id, new_match_id);
339 Ok(())
340 }
341
342 fn match_index(
343 &self,
344 node_id: u32,
345 ) -> Option<u64> {
346 self.match_index.get(&node_id).copied()
347 }
348
349 fn init_peers_next_index_and_match_index(
350 &mut self,
351 last_entry_id: u64,
352 peer_ids: Vec<u32>,
353 ) -> Result<()> {
354 for peer_id in peer_ids {
355 debug!("init leader state for peer_id: {:?}", peer_id);
356 let new_next_id = last_entry_id + 1;
357 self.update_next_index(peer_id, new_next_id)?;
358 self.update_match_index(peer_id, 0)?;
359 }
360 Ok(())
361 }
362
363 fn on_noop_committed(
366 &mut self,
367 ctx: &RaftContext<Self::T>,
368 ) -> Result<()> {
369 let noop_index = ctx.raft_log().last_entry_id();
370 self.noop_log_id = Some(noop_index);
371 debug!("Tracked noop_log_id: {}", noop_index);
372 Ok(())
373 }
374 fn noop_log_id(&self) -> Result<Option<u64>> {
375 Ok(self.noop_log_id)
376 }
377
378 async fn verify_leadership_persistent(
397 &mut self,
398 payloads: Vec<EntryPayload>,
399 bypass_queue: bool,
400 ctx: &RaftContext<T>,
401 role_tx: &mpsc::UnboundedSender<RoleEvent>,
402 ) -> Result<bool> {
403 let initial_delay =
404 Duration::from_millis(ctx.node_config.retry.internal_quorum.base_delay_ms);
405 let max_delay = Duration::from_millis(ctx.node_config.retry.internal_quorum.max_delay_ms);
406 let global_timeout = ctx.node_config.raft.membership.verify_leadership_persistent_timeout;
407
408 let mut current_delay = initial_delay;
409 let start_time = Instant::now();
410
411 loop {
412 match self.verify_internal_quorum(payloads.clone(), bypass_queue, ctx, role_tx).await {
413 Ok(QuorumVerificationResult::Success) => {
414 self.update_lease_timestamp();
418 return Ok(true);
419 }
420 Ok(QuorumVerificationResult::LeadershipLost) => return Ok(false),
421 Ok(QuorumVerificationResult::RetryRequired) => {
422 if start_time.elapsed() > global_timeout {
424 return Err(NetworkError::GlobalTimeout(
425 "Leadership verification timed out".to_string(),
426 )
427 .into());
428 }
429
430 current_delay =
431 current_delay.checked_mul(2).unwrap_or(max_delay).min(max_delay);
432 let jitter = Duration::from_millis(rand::random::<u64>() % 500);
433 sleep(current_delay + jitter).await;
434 }
435 Err(e) => return Err(e),
436 }
437 }
438 }
439
440 async fn init_cluster_metadata(
473 &mut self,
474 membership: &Arc<T::M>,
475 ) -> Result<()> {
476 let voters = membership.voters().await;
478 let total_voters = voters.len() + 1; let replication_targets = membership.replication_peers().await;
482
483 let single_voter = total_voters == 1;
485
486 self.cluster_metadata = ClusterMetadata {
487 single_voter,
488 total_voters,
489 replication_targets: replication_targets.clone(),
490 };
491
492 debug!(
493 "Initialized cluster metadata: single_voter={}, total_voters={}, replication_targets={}",
494 single_voter,
495 total_voters,
496 replication_targets.len()
497 );
498 Ok(())
499 }
500
501 async fn verify_internal_quorum(
502 &mut self,
503 payloads: Vec<EntryPayload>,
504 bypass_queue: bool,
505 ctx: &RaftContext<T>,
506 role_tx: &mpsc::UnboundedSender<RoleEvent>,
507 ) -> Result<QuorumVerificationResult> {
508 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
509
510 self.process_raft_request(
511 RaftRequestWithSignal {
512 id: nanoid!(),
513 payloads,
514 sender: resp_tx,
515 },
516 ctx,
517 bypass_queue,
518 role_tx,
519 )
520 .await?;
521
522 let timeout_duration =
524 Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
525 match timeout(timeout_duration, resp_rx).await {
526 Ok(Ok(Ok(response))) => {
528 debug!("Leadership check response: {:?}", response);
529
530 Ok(if response.is_write_success() {
532 QuorumVerificationResult::Success
533 } else if response.is_retry_required() {
534 QuorumVerificationResult::RetryRequired
536 } else {
537 QuorumVerificationResult::LeadershipLost
539 })
540 }
541
542 Ok(Ok(Err(status))) => {
544 warn!("Leadership rejected by follower: {status:?}");
545 Ok(QuorumVerificationResult::LeadershipLost)
546 }
547
548 Ok(Err(e)) => {
550 error!("Channel error during leadership check: {:?}", e);
551 Err(NetworkError::SingalReceiveFailed(e.to_string()).into())
552 }
553
554 Err(_) => {
556 warn!("Leadership check timed out after {:?}", timeout_duration);
557 Err(NetworkError::Timeout {
558 node_id: self.node_id(),
559 duration: timeout_duration,
560 }
561 .into())
562 }
563 }
564 }
565
566 fn is_leader(&self) -> bool {
567 true
568 }
569
570 fn become_leader(&self) -> Result<RaftRole<T>> {
571 warn!("I am leader already");
572
573 Err(StateTransitionError::InvalidTransition.into())
574 }
575
576 fn become_candidate(&self) -> Result<RaftRole<T>> {
577 error!("Leader can not become Candidate");
578
579 Err(StateTransitionError::InvalidTransition.into())
580 }
581
582 fn become_follower(&self) -> Result<RaftRole<T>> {
583 info!(
584 "Node {} term {} transitioning to Follower",
585 self.node_id(),
586 self.current_term(),
587 );
588 println!(
589 "[Node {}] Leader โ Follower (term {})",
590 self.node_id(),
591 self.current_term()
592 );
593 Ok(RaftRole::Follower(Box::new(self.into())))
594 }
595
596 fn become_learner(&self) -> Result<RaftRole<T>> {
597 error!("Leader can not become Learner");
598
599 Err(StateTransitionError::InvalidTransition.into())
600 }
601
602 fn is_timer_expired(&self) -> bool {
603 self.timer.is_expired()
604 }
605
606 fn reset_timer(&mut self) {
608 self.timer.reset_batch();
609 self.timer.reset_replication();
610 }
611
612 fn next_deadline(&self) -> Instant {
613 self.timer.next_deadline()
614 }
615
616 async fn tick(
618 &mut self,
619 role_tx: &mpsc::UnboundedSender<RoleEvent>,
620 _raft_tx: &mpsc::Sender<RaftEvent>,
621 ctx: &RaftContext<T>,
622 ) -> Result<()> {
623 let now = Instant::now();
624 self.shared_state().set_current_leader(self.node_id());
626
627 if let Err(e) = self.run_periodic_maintenance(role_tx, ctx).await {
629 error!("Failed to run periodic maintenance: {}", e);
630 }
631
632 if now >= self.timer.batch_deadline() {
634 self.timer.reset_batch();
635
636 if self.batch_buffer.should_flush() {
637 debug!(?now, "tick::reset_batch batch timer");
638 self.timer.reset_replication();
639
640 let batch = self.batch_buffer.take();
643 self.process_batch(batch, role_tx, ctx).await?;
644 }
645 }
646
647 if now >= self.timer.replication_deadline() {
650 debug!(?now, "tick::reset_replication timer");
651 self.timer.reset_replication();
652
653 let batch = self.batch_buffer.take();
655 self.process_batch(batch, role_tx, ctx).await?;
656 }
657
658 Ok(())
659 }
660
661 fn drain_read_buffer(&mut self) -> Result<()> {
662 if !self.read_buffer.is_empty() {
663 warn!(
664 "Read batch: draining {} requests due to role change",
665 self.read_buffer.len()
666 );
667 for (_, sender) in std::mem::take(&mut self.read_buffer) {
668 let _ = sender.send(Err(tonic::Status::unavailable("Leader stepped down")));
669 }
670 self.read_buffer_start_time = None;
671 }
672
673 Ok(())
674 }
675
676 async fn handle_raft_event(
677 &mut self,
678 raft_event: RaftEvent,
679 ctx: &RaftContext<T>,
680 role_tx: mpsc::UnboundedSender<RoleEvent>,
681 ) -> Result<()> {
682 let my_id = self.shared_state.node_id;
683 let my_term = self.current_term();
684
685 match raft_event {
686 RaftEvent::ReceiveVoteRequest(vote_request, sender) => {
693 debug!(
694 "handle_raft_event::RaftEvent::ReceiveVoteRequest: {:?}",
695 &vote_request
696 );
697
698 let my_term = self.current_term();
699 if my_term < vote_request.term {
700 self.update_current_term(vote_request.term);
701 self.send_become_follower_event(None, &role_tx)?;
703
704 info!("Leader will not process Vote request, it should let Follower do it.");
705 send_replay_raft_event(
706 &role_tx,
707 RaftEvent::ReceiveVoteRequest(vote_request, sender),
708 )?;
709 } else {
710 let last_log_id =
711 ctx.raft_log().last_log_id().unwrap_or(LogId { index: 0, term: 0 });
712 let response = VoteResponse {
713 term: my_term,
714 vote_granted: false,
715 last_log_index: last_log_id.index,
716 last_log_term: last_log_id.term,
717 };
718 sender.send(Ok(response)).map_err(|e| {
719 let error_str = format!("{e:?}");
720 error!("Failed to send: {}", error_str);
721 NetworkError::SingalSendFailed(error_str)
722 })?;
723 }
724 }
725
726 RaftEvent::ClusterConf(_metadata_request, sender) => {
727 let cluster_conf = ctx
728 .membership()
729 .retrieve_cluster_membership_config(self.shared_state().current_leader())
730 .await;
731 debug!("Leader receive ClusterConf: {:?}", &cluster_conf);
732
733 sender.send(Ok(cluster_conf)).map_err(|e| {
734 let error_str = format!("{e:?}");
735 error!("Failed to send: {}", error_str);
736 NetworkError::SingalSendFailed(error_str)
737 })?;
738 }
739
740 RaftEvent::ClusterConfUpdate(cluste_conf_change_request, sender) => {
741 let current_conf_version = ctx.membership().get_cluster_conf_version().await;
742 debug!(%current_conf_version, ?cluste_conf_change_request,
743 "handle_raft_event::RaftEvent::ClusterConfUpdate",
744 );
745
746 if my_term >= cluste_conf_change_request.term {
748 let response = ClusterConfUpdateResponse::higher_term(
749 my_id,
750 my_term,
751 current_conf_version,
752 );
753
754 sender.send(Ok(response)).map_err(|e| {
755 let error_str = format!("{e:?}");
756 error!("Failed to send: {}", error_str);
757 NetworkError::SingalSendFailed(error_str)
758 })?;
759 } else {
760 info!(
762 "my({}) term < request one, now I will step down to Follower",
763 my_id
764 );
765 self.send_become_follower_event(Some(cluste_conf_change_request.id), &role_tx)?;
767
768 info!(
769 "Leader will not process append_entries_request, it should let Follower do it."
770 );
771 send_replay_raft_event(
772 &role_tx,
773 RaftEvent::ClusterConfUpdate(cluste_conf_change_request, sender),
774 )?;
775 }
776 }
777
778 RaftEvent::AppendEntries(append_entries_request, sender) => {
779 debug!(
780 "handle_raft_event::RaftEvent::AppendEntries: {:?}",
781 &append_entries_request
782 );
783
784 if my_term >= append_entries_request.term {
786 let response = AppendEntriesResponse::higher_term(my_id, my_term);
787
788 sender.send(Ok(response)).map_err(|e| {
789 let error_str = format!("{e:?}");
790 error!("Failed to send: {}", error_str);
791 NetworkError::SingalSendFailed(error_str)
792 })?;
793 } else {
794 info!(
796 "my({}) term < request one, now I will step down to Follower",
797 my_id
798 );
799 self.send_become_follower_event(
801 Some(append_entries_request.leader_id),
802 &role_tx,
803 )?;
804
805 info!(
806 "Leader will not process append_entries_request, it should let Follower do it."
807 );
808 send_replay_raft_event(
809 &role_tx,
810 RaftEvent::AppendEntries(append_entries_request, sender),
811 )?;
812 }
813 }
814
815 RaftEvent::ClientPropose(client_write_request, sender) => {
816 if let Err(e) = self
817 .process_raft_request(
818 RaftRequestWithSignal {
819 id: nanoid!(),
820 payloads: client_command_to_entry_payloads(
821 client_write_request.commands,
822 ),
823 sender,
824 },
825 ctx,
826 false,
827 &role_tx,
828 )
829 .await
830 {
831 error("Leader::process_raft_request", &e);
832 return Err(e);
833 }
834 }
835
836 RaftEvent::ClientReadRequest(client_read_request, sender) => {
837 let _timer = ScopedTimer::new("leader_linear_read");
838 debug!(
839 "Leader::ClientReadRequest client_read_request:{:?}",
840 &client_read_request
841 );
842
843 let keys = client_read_request.keys.clone();
844 let response: std::result::Result<ClientResponse, tonic::Status> = {
845 let read_operation =
846 || -> std::result::Result<ClientResponse, tonic::Status> {
847 let results = ctx
848 .handlers
849 .state_machine_handler
850 .read_from_state_machine(keys)
851 .unwrap_or_default();
852 debug!("handle_client_read results: {:?}", results);
853 Ok(ClientResponse::read_results(results))
854 };
855
856 let effective_policy = if client_read_request.has_consistency_policy() {
858 if ctx.node_config().raft.read_consistency.allow_client_override {
860 match client_read_request.consistency_policy() {
861 ClientReadConsistencyPolicy::LeaseRead => {
862 ServerReadConsistencyPolicy::LeaseRead
863 }
864 ClientReadConsistencyPolicy::LinearizableRead => {
865 ServerReadConsistencyPolicy::LinearizableRead
866 }
867 ClientReadConsistencyPolicy::EventualConsistency => {
868 ServerReadConsistencyPolicy::EventualConsistency
869 }
870 }
871 } else {
872 ctx.node_config().raft.read_consistency.default_policy.clone()
874 }
875 } else {
876 ctx.node_config().raft.read_consistency.default_policy.clone()
878 };
879
880 match effective_policy {
882 ServerReadConsistencyPolicy::LinearizableRead => {
883 self.read_buffer.push((client_read_request, sender));
885
886 if self.read_buffer.len() == 1 {
888 self.read_buffer_start_time = Some(Instant::now());
889
890 let role_tx = role_tx.clone(); let timeout_ms = self
892 .node_config
893 .raft
894 .read_consistency
895 .read_batching
896 .time_threshold_ms;
897
898 debug!(
899 "Read batch: first request, spawning {}ms timeout",
900 timeout_ms
901 );
902
903 tokio::spawn(async move {
904 tokio::time::sleep(Duration::from_millis(timeout_ms)).await;
905 debug!(
906 "Read batch: timeout expired, sending flush signal via ReprocessEvent"
907 );
908 let _ = role_tx.send(RoleEvent::ReprocessEvent(Box::new(
910 RaftEvent::FlushReadBuffer,
911 )));
912 });
913 }
914
915 let size_threshold =
917 self.node_config.raft.read_consistency.read_batching.size_threshold;
918 if self.read_buffer.len() >= size_threshold {
919 debug!(
920 "Read batch: size threshold reached ({}), flushing",
921 size_threshold
922 );
923 self.process_linearizable_read_batch(ctx, &role_tx).await?;
924 }
925 return Ok(());
927 }
928 ServerReadConsistencyPolicy::LeaseRead => {
929 if !self.is_lease_valid(ctx) {
931 self.verify_leadership_and_refresh_lease(ctx, &role_tx).await?;
933 }
934 read_operation()
936 }
937 ServerReadConsistencyPolicy::EventualConsistency => {
938 debug!("EventualConsistency: serving local read without verification");
941 read_operation()
942 }
943 }
944 };
945
946 debug!(
947 "Leader::ClientReadRequest is going to response: {:?}",
948 &response
949 );
950 sender.send(response).map_err(|e| {
951 let error_str = format!("{e:?}");
952 error!("Failed to send: {}", error_str);
953 NetworkError::SingalSendFailed(error_str)
954 })?;
955 }
956
957 RaftEvent::FlushReadBuffer => {
958 if !self.read_buffer.is_empty() {
959 debug!(
960 "Read batch: flushing {} pending reads (timeout)",
961 self.read_buffer.len()
962 );
963 self.process_linearizable_read_batch(ctx, &role_tx).await?;
964 } else {
965 debug!("Read batch: flush signal received but buffer empty (already flushed)");
966 }
967 }
968
969 RaftEvent::InstallSnapshotChunk(_streaming, sender) => {
970 sender
971 .send(Err(Status::permission_denied("Not Follower or Learner. ")))
972 .map_err(|e| {
973 let error_str = format!("{e:?}");
974 error!("Failed to send: {}", error_str);
975 NetworkError::SingalSendFailed(error_str)
976 })?;
977
978 return Err(ConsensusError::RoleViolation {
979 current_role: "Leader",
980 required_role: "Follower or Learner",
981 context: format!(
982 "Leader node {} receives RaftEvent::InstallSnapshotChunk",
983 ctx.node_id
984 ),
985 }
986 .into());
987 }
988
989 RaftEvent::RaftLogCleanUp(_purchase_log_request, sender) => {
990 sender
991 .send(Err(Status::permission_denied(
992 "Leader should not receive RaftLogCleanUp event.",
993 )))
994 .map_err(|e| {
995 let error_str = format!("{e:?}");
996 error!("Failed to send: {}", error_str);
997 NetworkError::SingalSendFailed(error_str)
998 })?;
999
1000 return Err(ConsensusError::RoleViolation {
1001 current_role: "Leader",
1002 required_role: "None Leader",
1003 context: format!(
1004 "Leader node {} receives RaftEvent::RaftLogCleanUp",
1005 ctx.node_id
1006 ),
1007 }
1008 .into());
1009 }
1010
1011 RaftEvent::CreateSnapshotEvent => {
1012 if self.snapshot_in_progress.load(std::sync::atomic::Ordering::Acquire) {
1014 info!("Snapshot creation already in progress. Skipping duplicate request.");
1015 return Ok(());
1016 }
1017
1018 self.snapshot_in_progress.store(true, std::sync::atomic::Ordering::Release);
1019 let state_machine_handler = ctx.state_machine_handler().clone();
1020
1021 tokio::spawn(async move {
1023 let result = state_machine_handler.create_snapshot().await;
1024 info!("SnapshotCreated event will be processed in another event thread");
1025 if let Err(e) =
1026 send_replay_raft_event(&role_tx, RaftEvent::SnapshotCreated(result))
1027 {
1028 error!("Failed to send snapshot creation result: {}", e);
1029 }
1030 });
1031 }
1032
1033 RaftEvent::SnapshotCreated(result) => {
1034 self.snapshot_in_progress.store(false, Ordering::SeqCst);
1035 let my_id = self.shared_state.node_id;
1036 let my_term = self.current_term();
1037
1038 match result {
1039 Err(e) => {
1040 error!(%e, "State machine snapshot creation failed");
1041 }
1042 Ok((
1043 SnapshotMetadata {
1044 last_included: last_included_option,
1045 checksum,
1046 },
1047 _final_path,
1048 )) => {
1049 info!("Initiating log purge after snapshot creation");
1050
1051 if let Some(last_included) = last_included_option {
1052 trace!("Phase 1: Schedule log purge if possible");
1056 if self.can_purge_logs(self.last_purged_index, last_included) {
1057 trace!(?last_included, "Phase 1: Scheduling log purge");
1058 self.scheduled_purge_upto(last_included);
1059 }
1060
1061 if !self.cluster_metadata.replication_targets.is_empty() {
1069 trace!("Phase 2: Send Purge request to replication targets");
1070
1071 if self.cluster_metadata.total_voters == 0 {
1076 error!(
1077 "BUG: cluster_metadata not initialized! Leader must call init_cluster_metadata() after election"
1078 );
1079 return Err(
1080 MembershipError::ClusterMetadataNotInitialized.into()
1081 );
1082 }
1083
1084 let membership = ctx.membership();
1085 let transport = ctx.transport();
1086 match transport
1087 .send_purge_requests(
1088 PurgeLogRequest {
1089 term: my_term,
1090 leader_id: my_id,
1091 last_included: Some(last_included),
1092 snapshot_checksum: checksum.clone(),
1093 leader_commit: self.commit_index(),
1094 },
1095 &self.node_config.retry,
1096 membership,
1097 )
1098 .await
1099 {
1100 Ok(result) => {
1101 info!(?result, "receive PurgeLogResult");
1102 let should_step_down =
1106 self.peer_purge_progress(result, &role_tx)?;
1107 if should_step_down {
1108 return Ok(()); }
1110 }
1111 Err(e) => {
1112 error!(?e, "RaftEvent::CreateSnapshotEvent");
1113 return Err(e);
1114 }
1115 }
1116 } else {
1117 debug!(
1118 "Standalone node (leader={}): no replication targets, skip PurgeRequest",
1119 my_id
1120 );
1121 }
1122
1123 trace!("Phase 3: Execute scheduled purge task");
1127 debug!(?last_included, "Execute scheduled purge task");
1128 if let Some(scheduled) = self.scheduled_purge_upto {
1129 let purge_executor = ctx.purge_executor();
1130 match purge_executor.execute_purge(scheduled).await {
1131 Ok(_) => {
1132 if let Err(e) = send_replay_raft_event(
1133 &role_tx,
1134 RaftEvent::LogPurgeCompleted(scheduled),
1135 ) {
1136 error!(%e, "Failed to notify purge completion");
1137 }
1138 }
1139 Err(e) => {
1140 error!(?e, ?scheduled, "Log purge execution failed");
1141 }
1142 }
1143 }
1144 }
1145 }
1146 }
1147 }
1148
1149 RaftEvent::LogPurgeCompleted(purged_id) => {
1150 if self.last_purged_index.map_or(true, |current| purged_id.index > current.index) {
1152 debug!(
1153 ?purged_id,
1154 "Updating last purged index after successful execution"
1155 );
1156 self.last_purged_index = Some(purged_id);
1157 } else {
1158 warn!(
1159 ?purged_id,
1160 ?self.last_purged_index,
1161 "Received outdated purge completion, ignoring"
1162 );
1163 }
1164 }
1165
1166 RaftEvent::JoinCluster(join_request, sender) => {
1167 debug!(?join_request, "Leader::RaftEvent::JoinCluster");
1168 self.handle_join_cluster(join_request, sender, ctx, &role_tx).await?;
1169 }
1170
1171 RaftEvent::DiscoverLeader(request, sender) => {
1172 debug!(?request, "Leader::RaftEvent::DiscoverLeader");
1173
1174 if let Some(meta) = ctx.membership().retrieve_node_meta(my_id).await {
1175 let response = LeaderDiscoveryResponse {
1176 leader_id: my_id,
1177 leader_address: meta.address,
1178 term: my_term,
1179 };
1180 sender.send(Ok(response)).map_err(|e| {
1181 let error_str = format!("{e:?}");
1182 error!("Failed to send: {}", error_str);
1183 NetworkError::SingalSendFailed(error_str)
1184 })?;
1185 return Ok(());
1186 } else {
1187 let msg = "Leader can not find its address? It must be a bug.";
1188 error!("{}", msg);
1189 panic!("{}", msg);
1190 }
1191 }
1192 RaftEvent::StreamSnapshot(request, sender) => {
1193 debug!("Leader::RaftEvent::StreamSnapshot");
1194
1195 if let Some(metadata) = ctx.state_machine().snapshot_metadata() {
1197 let (response_tx, response_rx) =
1199 mpsc::channel::<std::result::Result<Arc<SnapshotChunk>, Status>>(32);
1200 let size = 1024 * 1024 * 1024; let response_stream = create_production_snapshot_stream(response_rx, size);
1203 sender.send(Ok(response_stream)).map_err(|e| {
1205 let error_str = format!("{e:?}");
1206 error!("Stream response failed: {}", error_str);
1207 NetworkError::SingalSendFailed(error_str)
1208 })?;
1209
1210 let state_machine_handler = ctx.state_machine_handler().clone();
1212 let config = ctx.node_config.raft.snapshot.clone();
1213 let data_stream =
1215 state_machine_handler.load_snapshot_data(metadata.clone()).await?;
1216
1217 tokio::spawn(async move {
1218 if let Err(e) = BackgroundSnapshotTransfer::<T>::run_pull_transfer(
1219 request,
1220 response_tx,
1221 data_stream,
1222 config,
1223 )
1224 .await
1225 {
1226 error!("StreamSnapshot failed: {:?}", e);
1227 }
1228 });
1229 } else {
1230 warn!("No snapshot available for streaming");
1231 sender.send(Err(Status::not_found("Snapshot not found"))).map_err(|e| {
1232 let error_str = format!("{e:?}");
1233 error!("Stream response failed: {}", error_str);
1234 NetworkError::SingalSendFailed(error_str)
1235 })?;
1236 }
1237 }
1238 RaftEvent::TriggerSnapshotPush { peer_id } => {
1239 if let Some(lastest_snapshot_metadata) = ctx.state_machine().snapshot_metadata() {
1240 Self::trigger_background_snapshot(
1241 peer_id,
1242 lastest_snapshot_metadata,
1243 ctx.state_machine_handler().clone(),
1244 ctx.membership(),
1245 ctx.node_config.raft.snapshot.clone(),
1246 )
1247 .await?;
1248 }
1249 }
1250
1251 RaftEvent::PromoteReadyLearners => {
1252 info!(
1254 "[Leader {}] โก PromoteReadyLearners event received, pending_promotions: {:?}",
1255 self.node_id(),
1256 self.pending_promotions.iter().map(|p| p.node_id).collect::<Vec<_>>()
1257 );
1258 self.process_pending_promotions(ctx, &role_tx).await?;
1259 }
1260
1261 RaftEvent::MembershipApplied => {
1262 let old_replication_targets = self.cluster_metadata.replication_targets.clone();
1264
1265 debug!("Refreshing cluster metadata cache after membership change");
1267 if let Err(e) = self.update_cluster_metadata(&ctx.membership()).await {
1268 warn!("Failed to update cluster metadata: {:?}", e);
1269 }
1270
1271 let newly_added: Vec<u32> = self
1274 .cluster_metadata
1275 .replication_targets
1276 .iter()
1277 .filter(|new_peer| {
1278 !old_replication_targets.iter().any(|old_peer| old_peer.id == new_peer.id)
1279 })
1280 .map(|peer| peer.id)
1281 .collect();
1282
1283 if !newly_added.is_empty() {
1284 debug!(
1285 "Initializing replication state for {} new peer(s): {:?}",
1286 newly_added.len(),
1287 newly_added
1288 );
1289 let last_entry_id = ctx.raft_log().last_entry_id();
1290 if let Err(e) =
1291 self.init_peers_next_index_and_match_index(last_entry_id, newly_added)
1292 {
1293 warn!("Failed to initialize next_index for new peers: {:?}", e);
1294 }
1297 }
1298 }
1299
1300 RaftEvent::StepDownSelfRemoved => {
1301 warn!(
1304 "[Leader-{}] Removed from cluster membership, stepping down to Follower",
1305 self.node_id()
1306 );
1307 role_tx.send(RoleEvent::BecomeFollower(None)).map_err(|e| {
1308 error!(
1309 "[Leader-{}] Failed to send BecomeFollower after self-removal: {:?}",
1310 self.node_id(),
1311 e
1312 );
1313 NetworkError::SingalSendFailed(format!(
1314 "BecomeFollower after self-removal: {e:?}"
1315 ))
1316 })?;
1317 return Ok(());
1318 }
1319 }
1320
1321 Ok(())
1322 }
1323}
1324
1325impl<T: TypeConfig> LeaderState<T> {
1326 pub async fn update_cluster_metadata(
1329 &mut self,
1330 membership: &Arc<T::M>,
1331 ) -> Result<()> {
1332 let voters = membership.voters().await;
1334 let total_voters = voters.len() + 1; let replication_targets = membership.replication_peers().await;
1338
1339 let single_voter = total_voters == 1;
1341
1342 self.cluster_metadata = ClusterMetadata {
1343 single_voter,
1344 total_voters,
1345 replication_targets: replication_targets.clone(),
1346 };
1347
1348 debug!(
1349 "Updated cluster metadata: single_voter={}, total_voters={}, replication_targets={}",
1350 single_voter,
1351 total_voters,
1352 replication_targets.len()
1353 );
1354 Ok(())
1355 }
1356
1357 pub fn state_snapshot(&self) -> StateSnapshot {
1359 StateSnapshot {
1360 current_term: self.current_term(),
1361 voted_for: None,
1362 commit_index: self.commit_index(),
1363 role: Leader as i32,
1364 }
1365 }
1366
1367 #[tracing::instrument]
1369 pub fn leader_state_snapshot(&self) -> LeaderStateSnapshot {
1370 LeaderStateSnapshot {
1371 next_index: self.next_index.clone(),
1372 match_index: self.match_index.clone(),
1373 noop_log_id: self.noop_log_id,
1374 }
1375 }
1376
1377 pub async fn process_raft_request(
1381 &mut self,
1382 raft_request_with_signal: RaftRequestWithSignal,
1383 ctx: &RaftContext<T>,
1384 execute_now: bool,
1385 role_tx: &mpsc::UnboundedSender<RoleEvent>,
1386 ) -> Result<()> {
1387 debug!(
1388 "Leader::process_raft_request, request_id: {}",
1389 raft_request_with_signal.id
1390 );
1391
1392 let push_result = self.batch_buffer.push(raft_request_with_signal);
1393 if execute_now || push_result.is_some() {
1395 let batch = self.batch_buffer.take();
1396
1397 trace!(
1398 "replication_handler.handle_raft_request_in_batch: batch size:{:?}",
1399 batch.len()
1400 );
1401
1402 self.process_batch(batch, role_tx, ctx).await?;
1403 }
1404
1405 Ok(())
1406 }
1407
1408 pub async fn process_batch(
1445 &mut self,
1446 batch: VecDeque<RaftRequestWithSignal>,
1447 role_tx: &mpsc::UnboundedSender<RoleEvent>,
1448 ctx: &RaftContext<T>,
1449 ) -> Result<()> {
1450 let entry_payloads: Vec<EntryPayload> =
1452 batch.iter().flat_map(|req| &req.payloads).cloned().collect();
1453 if !entry_payloads.is_empty() {
1454 trace!(?entry_payloads, "[Node-{} process_batch..", ctx.node_id);
1455 }
1456
1457 let cluster_size = self.cluster_metadata.total_voters;
1459 trace!(%cluster_size);
1460
1461 let result = ctx
1462 .replication_handler()
1463 .handle_raft_request_in_batch(
1464 entry_payloads,
1465 self.state_snapshot(),
1466 self.leader_state_snapshot(),
1467 &self.cluster_metadata,
1468 ctx,
1469 )
1470 .await;
1471 debug!(?result, "replication_handler::handle_raft_request_in_batch");
1472
1473 match result {
1475 Ok(AppendResults {
1477 commit_quorum_achieved: true,
1478 peer_updates,
1479 learner_progress,
1480 }) => {
1481 self.update_peer_indexes(&peer_updates);
1483
1484 if let Err(e) = self.check_learner_progress(&learner_progress, ctx, role_tx).await {
1486 error!(?e, "check_learner_progress failed");
1487 };
1488
1489 let new_commit_index = if self.cluster_metadata.single_voter {
1493 let last_log_index = ctx.raft_log().last_entry_id();
1494 if last_log_index > self.commit_index() {
1495 Some(last_log_index)
1496 } else {
1497 None
1498 }
1499 } else {
1500 self.calculate_new_commit_index(ctx.raft_log(), &peer_updates)
1501 };
1502
1503 if let Some(new_commit_index) = new_commit_index {
1504 debug!(
1505 "[Leader-{}] New commit been acknowledged: {}",
1506 self.node_id(),
1507 new_commit_index
1508 );
1509 self.update_commit_index_with_signal(
1510 Leader as i32,
1511 self.current_term(),
1512 new_commit_index,
1513 role_tx,
1514 )?;
1515 }
1516
1517 for request in batch {
1519 let _ = request.sender.send(Ok(ClientResponse::write_success()));
1520 }
1521 }
1522
1523 Ok(AppendResults {
1525 commit_quorum_achieved: false,
1526 peer_updates,
1527 learner_progress,
1528 }) => {
1529 self.update_peer_indexes(&peer_updates);
1531
1532 if let Err(e) = self.check_learner_progress(&learner_progress, ctx, role_tx).await {
1534 error!(?e, "check_learner_progress failed");
1535 };
1536
1537 let responses_received = peer_updates.len();
1539 let error_code = if is_majority(responses_received, cluster_size) {
1540 ErrorCode::RetryRequired
1541 } else {
1542 ErrorCode::ProposeFailed
1543 };
1544
1545 for request in batch {
1547 let _ = request.sender.send(Ok(ClientResponse::client_error(error_code)));
1548 }
1549 }
1550
1551 Err(Error::Consensus(ConsensusError::Replication(ReplicationError::HigherTerm(
1553 higher_term,
1554 )))) => {
1555 warn!("Higher term detected: {}", higher_term);
1556 self.update_current_term(higher_term);
1557 self.send_become_follower_event(None, role_tx)?;
1558
1559 for request in batch {
1561 let _ = request
1562 .sender
1563 .send(Ok(ClientResponse::client_error(ErrorCode::TermOutdated)));
1564 }
1565
1566 return Err(ReplicationError::HigherTerm(higher_term).into());
1567 }
1568
1569 Err(e) => {
1571 error!("Batch processing failed: {:?}", e);
1572
1573 for request in batch {
1575 let _ = request
1576 .sender
1577 .send(Ok(ClientResponse::client_error(ErrorCode::ProposeFailed)));
1578 }
1579
1580 return Err(e);
1581 }
1582 }
1583
1584 Ok(())
1585 }
1586
1587 #[instrument(skip(self))]
1589 fn update_peer_indexes(
1590 &mut self,
1591 peer_updates: &HashMap<u32, PeerUpdate>,
1592 ) {
1593 for (peer_id, update) in peer_updates {
1594 if let Err(e) = self.update_next_index(*peer_id, update.next_index) {
1595 error!("Failed to update next index: {:?}", e);
1596 }
1597 trace!(
1598 "Updated next index for peer {}-{}",
1599 peer_id, update.next_index
1600 );
1601 if let Some(match_index) = update.match_index {
1602 if let Err(e) = self.update_match_index(*peer_id, match_index) {
1603 error!("Failed to update match index: {:?}", e);
1604 }
1605 trace!("Updated match index for peer {}-{}", peer_id, match_index);
1606 }
1607 }
1608 }
1609
1610 pub async fn check_learner_progress(
1611 &mut self,
1612 learner_progress: &HashMap<u32, Option<u64>>,
1613 ctx: &RaftContext<T>,
1614 role_tx: &mpsc::UnboundedSender<RoleEvent>,
1615 ) -> Result<()> {
1616 debug!(?learner_progress, "check_learner_progress");
1617
1618 if !self.should_check_learner_progress(ctx) {
1619 return Ok(());
1620 }
1621
1622 if learner_progress.is_empty() {
1623 return Ok(());
1624 }
1625
1626 let ready_learners = self.find_promotable_learners(learner_progress, ctx).await;
1627 let new_promotions = self.deduplicate_promotions(ready_learners);
1628
1629 if !new_promotions.is_empty() {
1630 self.enqueue_and_notify_promotions(new_promotions, role_tx)?;
1631 }
1632
1633 Ok(())
1634 }
1635
1636 fn should_check_learner_progress(
1638 &mut self,
1639 ctx: &RaftContext<T>,
1640 ) -> bool {
1641 let throttle_interval =
1642 Duration::from_millis(ctx.node_config().raft.learner_check_throttle_ms);
1643 if self.last_learner_check.elapsed() < throttle_interval {
1644 return false;
1645 }
1646 self.last_learner_check = Instant::now();
1647 true
1648 }
1649
1650 async fn find_promotable_learners(
1652 &self,
1653 learner_progress: &HashMap<u32, Option<u64>>,
1654 ctx: &RaftContext<T>,
1655 ) -> Vec<u32> {
1656 let leader_commit = self.commit_index();
1657 let threshold = ctx.node_config().raft.learner_catchup_threshold;
1658 let membership = ctx.membership();
1659
1660 let mut ready_learners = Vec::new();
1661
1662 for (&node_id, &match_index_opt) in learner_progress.iter() {
1663 if !membership.contains_node(node_id).await {
1664 continue;
1665 }
1666
1667 if !self.is_learner_caught_up(match_index_opt, leader_commit, threshold) {
1668 continue;
1669 }
1670
1671 let node_status =
1672 membership.get_node_status(node_id).await.unwrap_or(NodeStatus::ReadOnly);
1673 if !node_status.is_promotable() {
1674 debug!(
1675 ?node_id,
1676 ?node_status,
1677 "Learner caught up but status is not Promotable, skipping"
1678 );
1679 continue;
1680 }
1681
1682 debug!(
1683 ?node_id,
1684 match_index = ?match_index_opt.unwrap_or(0),
1685 ?leader_commit,
1686 gap = leader_commit.saturating_sub(match_index_opt.unwrap_or(0)),
1687 "Learner caught up"
1688 );
1689 ready_learners.push(node_id);
1690 }
1691
1692 ready_learners
1693 }
1694
1695 fn is_learner_caught_up(
1697 &self,
1698 match_index: Option<u64>,
1699 leader_commit: u64,
1700 threshold: u64,
1701 ) -> bool {
1702 let match_index = match_index.unwrap_or(0);
1703 let gap = leader_commit.saturating_sub(match_index);
1704 gap <= threshold
1705 }
1706
1707 fn deduplicate_promotions(
1709 &self,
1710 ready_learners: Vec<u32>,
1711 ) -> Vec<u32> {
1712 let already_pending: std::collections::HashSet<_> =
1713 self.pending_promotions.iter().map(|p| p.node_id).collect();
1714
1715 ready_learners.into_iter().filter(|id| !already_pending.contains(id)).collect()
1716 }
1717
1718 fn enqueue_and_notify_promotions(
1720 &mut self,
1721 new_promotions: Vec<u32>,
1722 role_tx: &mpsc::UnboundedSender<RoleEvent>,
1723 ) -> Result<()> {
1724 info!(
1725 ?new_promotions,
1726 "Learners caught up, adding to pending promotions"
1727 );
1728
1729 for node_id in new_promotions {
1730 self.pending_promotions
1731 .push_back(PendingPromotion::new(node_id, Instant::now()));
1732 }
1733
1734 role_tx
1735 .send(RoleEvent::ReprocessEvent(Box::new(
1736 RaftEvent::PromoteReadyLearners,
1737 )))
1738 .map_err(|e| {
1739 let error_str = format!("{e:?}");
1740 error!("Failed to send PromoteReadyLearners: {}", error_str);
1741 Error::System(SystemError::Network(NetworkError::SingalSendFailed(
1742 error_str,
1743 )))
1744 })?;
1745
1746 Ok(())
1747 }
1748
1749 #[allow(dead_code)]
1750 pub async fn batch_promote_learners(
1751 &mut self,
1752 ready_learners_ids: Vec<u32>,
1753 ctx: &RaftContext<T>,
1754 role_tx: &mpsc::UnboundedSender<RoleEvent>,
1755 ) -> Result<()> {
1756 debug!("1. Determine optimal promotion status based on quorum safety");
1758 let membership = ctx.membership();
1759 let current_voters = membership.voters().await.len();
1760 let new_active_count = current_voters + ready_learners_ids.len();
1762
1763 trace!(
1765 ?current_voters,
1766 ?ready_learners_ids,
1767 "[Node-{}] new_active_count: {}",
1768 self.node_id(),
1769 new_active_count
1770 );
1771 let target_status = if ensure_safe_join(self.node_id(), new_active_count).is_ok() {
1772 trace!(
1773 "Going to update nodes-{:?} status to Active",
1774 ready_learners_ids
1775 );
1776 NodeStatus::Active
1777 } else {
1778 trace!(
1779 "Not enough quorum to promote learners: {:?}",
1780 ready_learners_ids
1781 );
1782 return Ok(());
1783 };
1784
1785 debug!("2. Create configuration change payload");
1787 let config_change = Change::BatchPromote(BatchPromote {
1788 node_ids: ready_learners_ids.clone(),
1789 new_status: target_status as i32,
1790 });
1791
1792 info!(?config_change, "Replicating cluster config");
1793
1794 debug!("3. Submit single config change for all ready learners");
1796 match self
1798 .verify_leadership_persistent(
1799 vec![EntryPayload::config(config_change)],
1800 true,
1801 ctx,
1802 role_tx,
1803 )
1804 .await
1805 {
1806 Ok(true) => {
1807 info!(
1808 "Batch promotion committed for nodes: {:?}",
1809 ready_learners_ids
1810 );
1811 }
1812 Ok(false) => {
1813 warn!("Failed to commit batch promotion");
1814 }
1815 Err(e) => {
1816 error!("Batch promotion error: {:?}", e);
1817 return Err(e);
1818 }
1819 }
1820
1821 Ok(())
1822 }
1823
1824 #[instrument(skip(self))]
1826 fn calculate_new_commit_index(
1827 &mut self,
1828 raft_log: &Arc<ROF<T>>,
1829 peer_updates: &HashMap<u32, PeerUpdate>,
1830 ) -> Option<u64> {
1831 let old_commit_index = self.commit_index();
1832 let current_term = self.current_term();
1833
1834 let matched_ids: Vec<u64> =
1835 peer_updates.keys().filter_map(|&id| self.match_index(id)).collect();
1836
1837 let new_commit_index =
1838 raft_log.calculate_majority_matched_index(current_term, old_commit_index, matched_ids);
1839
1840 if new_commit_index.is_some() && new_commit_index.unwrap() > old_commit_index {
1841 new_commit_index
1842 } else {
1843 None
1844 }
1845 }
1846
1847 #[allow(dead_code)]
1848 fn if_update_commit_index(
1849 &self,
1850 new_commit_index_option: Option<u64>,
1851 ) -> (bool, u64) {
1852 let current_commit_index = self.commit_index();
1853 if let Some(new_commit_index) = new_commit_index_option {
1854 debug!("Leader::update_commit_index: {:?}", new_commit_index);
1855 if current_commit_index < new_commit_index {
1856 return (true, new_commit_index);
1857 }
1858 }
1859 debug!("Leader::update_commit_index: false");
1860 (false, current_commit_index)
1861 }
1862
1863 #[doc(hidden)]
1873 pub fn calculate_read_index(&self) -> u64 {
1874 let commit_index = self.commit_index();
1875 let noop_index = self.noop_log_id.unwrap_or(0);
1876 std::cmp::max(commit_index, noop_index)
1877 }
1878
1879 #[doc(hidden)]
1884 pub async fn wait_until_applied(
1885 &self,
1886 target_index: u64,
1887 state_machine_handler: &Arc<SMHOF<T>>,
1888 last_applied: u64,
1889 ) -> Result<()> {
1890 if last_applied < target_index {
1891 state_machine_handler.update_pending(target_index);
1892
1893 let timeout_ms = self.node_config.raft.read_consistency.state_machine_sync_timeout_ms;
1894 state_machine_handler
1895 .wait_applied(target_index, std::time::Duration::from_millis(timeout_ms))
1896 .await?;
1897
1898 debug!("wait_until_applied: target_index={} success", target_index);
1899 }
1900 Ok(())
1901 }
1902
1903 #[instrument(skip(self))]
1904 fn scheduled_purge_upto(
1905 &mut self,
1906 received_last_included: LogId,
1907 ) {
1908 if let Some(existing) = self.scheduled_purge_upto {
1909 if existing.index >= received_last_included.index {
1910 warn!(
1911 ?received_last_included,
1912 ?existing,
1913 "Will not update scheduled_purge_upto, received invalid last_included log"
1914 );
1915 return;
1916 }
1917 }
1918 info!(?self.scheduled_purge_upto, ?received_last_included, "Updte scheduled_purge_upto.");
1919 self.scheduled_purge_upto = Some(received_last_included);
1920 }
1921
1922 pub(crate) fn peer_purge_progress(
1940 &mut self,
1941 responses: Vec<Result<PurgeLogResponse>>,
1942 role_tx: &mpsc::UnboundedSender<RoleEvent>,
1943 ) -> Result<bool> {
1944 if responses.is_empty() {
1945 return Ok(false);
1946 }
1947 for r in responses.iter().flatten() {
1948 if r.term > self.current_term() {
1951 self.update_current_term(r.term);
1952 self.send_become_follower_event(None, role_tx)?;
1953 return Ok(true); }
1955
1956 if let Some(last_purged) = r.last_purged {
1957 self.peer_purge_progress
1958 .entry(r.node_id)
1959 .and_modify(|v| *v = last_purged.index)
1960 .or_insert(last_purged.index);
1961 }
1962 }
1963
1964 Ok(false)
1965 }
1966
1967 fn send_become_follower_event(
1968 &self,
1969 new_leader_id: Option<u32>,
1970 role_tx: &mpsc::UnboundedSender<RoleEvent>,
1971 ) -> Result<()> {
1972 info!(
1973 ?new_leader_id,
1974 "Leader is going to step down as Follower..."
1975 );
1976 role_tx.send(RoleEvent::BecomeFollower(new_leader_id)).map_err(|e| {
1977 let error_str = format!("{e:?}");
1978 error!("Failed to send: {}", error_str);
1979 NetworkError::SingalSendFailed(error_str)
1980 })?;
1981
1982 Ok(())
1983 }
1984
1985 #[instrument(skip(self))]
2016 pub fn can_purge_logs(
2017 &self,
2018 last_purge_index: Option<LogId>,
2019 last_included_in_snapshot: LogId,
2020 ) -> bool {
2021 let commit_index = self.commit_index();
2022 debug!(?self
2023 .peer_purge_progress, ?commit_index, ?last_purge_index, ?last_included_in_snapshot, "can_purge_logs");
2024 let monotonic_check = last_purge_index
2025 .map(|lid| lid.index < last_included_in_snapshot.index)
2026 .unwrap_or(true);
2027
2028 last_included_in_snapshot.index < commit_index
2029 && monotonic_check
2030 && self.peer_purge_progress.values().all(|&v| v >= last_included_in_snapshot.index)
2031 }
2032
2033 pub async fn handle_join_cluster(
2034 &mut self,
2035 join_request: JoinRequest,
2036 sender: MaybeCloneOneshotSender<std::result::Result<JoinResponse, Status>>,
2037 ctx: &RaftContext<T>,
2038 role_tx: &mpsc::UnboundedSender<RoleEvent>,
2039 ) -> Result<()> {
2040 let node_id = join_request.node_id;
2041 let node_role = join_request.node_role;
2042 let address = join_request.address;
2043 let status = join_request.status;
2044 let membership = ctx.membership();
2045
2046 debug!("1. Validate join request");
2048 if membership.contains_node(node_id).await {
2049 let error_msg = format!("Node {node_id} already exists in cluster");
2050 warn!(%error_msg);
2051 return self.send_join_error(sender, MembershipError::NodeAlreadyExists(node_id)).await;
2052 }
2053
2054 debug!("2. Create configuration change payload");
2056 if let Err(e) = membership.can_rejoin(node_id, node_role).await {
2057 let error_msg = format!("Node {node_id} cannot rejoin: {e}",);
2058 warn!(%error_msg);
2059 return self
2060 .send_join_error(sender, MembershipError::JoinClusterError(error_msg))
2061 .await;
2062 }
2063
2064 let config_change = Change::AddNode(AddNode {
2065 node_id,
2066 address: address.clone(),
2067 status,
2068 });
2069
2070 debug!("3. Wait for quorum confirmation");
2072 match self
2074 .verify_leadership_persistent(
2075 vec![EntryPayload::config(config_change)],
2076 true,
2077 ctx,
2078 role_tx,
2079 )
2080 .await
2081 {
2082 Ok(true) => {
2083 debug!("4. Update node status to Syncing");
2085
2086 debug!(
2087 "After updating, the replications peers: {:?}",
2088 ctx.membership().replication_peers().await
2089 );
2090
2091 debug!("5. Send successful response");
2096 info!("Join config committed for node {}", node_id);
2097 self.send_join_success(node_id, &address, sender, ctx).await?;
2098 }
2099 Ok(false) => {
2100 warn!("Failed to commit join config for node {}", node_id);
2101 self.send_join_error(sender, MembershipError::CommitTimeout).await?
2102 }
2103 Err(e) => {
2104 error!("Error waiting for commit: {:?}", e);
2105 self.send_join_error(sender, e).await?
2106 }
2107 }
2108 Ok(())
2109 }
2110
2111 async fn send_join_success(
2112 &self,
2113 node_id: u32,
2114 address: &str,
2115 sender: MaybeCloneOneshotSender<std::result::Result<JoinResponse, Status>>,
2116 ctx: &RaftContext<T>,
2117 ) -> Result<()> {
2118 let snapshot_metadata = ctx.state_machine_handler().get_latest_snapshot_metadata();
2120
2121 let response = JoinResponse {
2123 success: true,
2124 error: String::new(),
2125 config: Some(
2126 ctx.membership()
2127 .retrieve_cluster_membership_config(self.shared_state().current_leader())
2128 .await,
2129 ),
2130 config_version: ctx.membership().get_cluster_conf_version().await,
2131 snapshot_metadata,
2132 leader_id: self.node_id(),
2133 };
2134
2135 sender.send(Ok(response)).map_err(|e| {
2136 error!("Failed to send join response: {:?}", e);
2137 NetworkError::SingalSendFailed(format!("{e:?}"))
2138 })?;
2139
2140 info!(
2141 "Node {} ({}) successfully added as learner",
2142 node_id, address
2143 );
2144
2145 crate::utils::cluster_printer::print_leader_accepting_new_node(
2147 self.node_id(),
2148 node_id,
2149 address,
2150 d_engine_proto::common::NodeRole::Learner as i32,
2151 );
2152
2153 Ok(())
2154 }
2155
2156 async fn send_join_error(
2157 &self,
2158 sender: MaybeCloneOneshotSender<std::result::Result<JoinResponse, Status>>,
2159 error: impl Into<Error>,
2160 ) -> Result<()> {
2161 let error = error.into();
2162 let status = Status::failed_precondition(error.to_string());
2163
2164 sender.send(Err(status)).map_err(|e| {
2165 error!("Failed to send join error: {:?}", e);
2166 NetworkError::SingalSendFailed(format!("{e:?}"))
2167 })?;
2168
2169 Err(error)
2170 }
2171
2172 #[cfg(any(test, feature = "__test_support"))]
2173 pub fn new(
2174 node_id: u32,
2175 node_config: Arc<RaftNodeConfig>,
2176 ) -> Self {
2177 let ReplicationConfig {
2178 rpc_append_entries_in_batch_threshold,
2179 rpc_append_entries_batch_process_delay_in_ms,
2180 rpc_append_entries_clock_in_ms,
2181 ..
2182 } = node_config.raft.replication;
2183
2184 LeaderState {
2185 cluster_metadata: ClusterMetadata {
2186 single_voter: false,
2187 total_voters: 0,
2188 replication_targets: vec![],
2189 },
2190 shared_state: SharedState::new(node_id, None, None),
2191 timer: Box::new(ReplicationTimer::new(
2192 rpc_append_entries_clock_in_ms,
2193 rpc_append_entries_batch_process_delay_in_ms,
2194 )),
2195 next_index: HashMap::new(),
2196 match_index: HashMap::new(),
2197 noop_log_id: None,
2198
2199 batch_buffer: Box::new(BatchBuffer::new(
2200 rpc_append_entries_in_batch_threshold,
2201 Duration::from_millis(rpc_append_entries_batch_process_delay_in_ms),
2202 )),
2203
2204 node_config,
2205 scheduled_purge_upto: None,
2206 last_purged_index: None, last_learner_check: Instant::now(),
2208 peer_purge_progress: HashMap::new(),
2209 snapshot_in_progress: AtomicBool::new(false),
2210 next_membership_maintenance_check: Instant::now(),
2211 pending_promotions: VecDeque::new(),
2212 lease_timestamp: AtomicU64::new(0),
2213 read_buffer: Vec::new(),
2214 read_buffer_start_time: None,
2215 _marker: PhantomData,
2216 }
2217 }
2218
2219 pub async fn trigger_background_snapshot(
2220 node_id: u32,
2221 metadata: SnapshotMetadata,
2222 state_machine_handler: Arc<SMHOF<T>>,
2223 membership: Arc<MOF<T>>,
2224 config: SnapshotConfig,
2225 ) -> Result<()> {
2226 let (result_tx, result_rx) = oneshot::channel();
2227
2228 tokio::task::spawn_blocking(move || {
2230 let rt = tokio::runtime::Handle::current();
2231 let result = rt.block_on(async move {
2232 let bulk_channel = membership
2233 .get_peer_channel(node_id, ConnectionType::Bulk)
2234 .await
2235 .ok_or(NetworkError::PeerConnectionNotFound(node_id))?;
2236
2237 let data_stream =
2238 state_machine_handler.load_snapshot_data(metadata.clone()).await?;
2239
2240 BackgroundSnapshotTransfer::<T>::run_push_transfer(
2241 node_id,
2242 data_stream,
2243 bulk_channel,
2244 config,
2245 )
2246 .await
2247 });
2248
2249 let _ = result_tx.send(result);
2251 });
2252
2253 tokio::spawn(async move {
2255 match result_rx.await {
2256 Ok(Ok(_)) => info!("Snapshot to {} completed", node_id),
2257 Ok(Err(e)) => error!("Snapshot to {} failed: {:?}", node_id, e),
2258 Err(_) => warn!("Snapshot result channel closed unexpectedly"),
2259 }
2260 });
2261
2262 Ok(())
2263 }
2264
2265 pub async fn process_pending_promotions(
2267 &mut self,
2268 ctx: &RaftContext<T>,
2269 role_tx: &mpsc::UnboundedSender<RoleEvent>,
2270 ) -> Result<()> {
2271 debug!(
2272 "[Leader {}] ๐ process_pending_promotions called, pending: {:?}",
2273 self.node_id(),
2274 self.pending_promotions.iter().map(|p| p.node_id).collect::<Vec<_>>()
2275 );
2276
2277 let config = &ctx.node_config().raft.membership.promotion;
2279
2280 let now = Instant::now();
2282 self.pending_promotions.retain(|entry| {
2283 now.duration_since(entry.ready_since) <= config.stale_learner_threshold
2284 });
2285
2286 if self.pending_promotions.is_empty() {
2287 debug!(
2288 "[Leader {}] โ pending_promotions is empty after stale cleanup",
2289 self.node_id()
2290 );
2291 return Ok(());
2292 }
2293
2294 let membership = ctx.membership();
2296 let current_voters = membership.voters().await.len() + 1; debug!(
2298 "[Leader {}] ๐ current_voters: {}, pending: {}",
2299 self.node_id(),
2300 current_voters,
2301 self.pending_promotions.len()
2302 );
2303
2304 let max_batch_size =
2306 calculate_safe_batch_size(current_voters, self.pending_promotions.len());
2307 debug!(
2308 "[Leader {}] ๐ฏ max_batch_size: {}",
2309 self.node_id(),
2310 max_batch_size
2311 );
2312
2313 if max_batch_size == 0 {
2314 debug!(
2316 "[Leader {}] โ ๏ธ max_batch_size is 0, cannot promote now",
2317 self.node_id()
2318 );
2319 return Ok(());
2320 }
2321
2322 let promotion_entries = self.drain_batch(max_batch_size);
2324 let promotion_node_ids = promotion_entries.iter().map(|e| e.node_id).collect::<Vec<_>>();
2325
2326 if !promotion_node_ids.is_empty() {
2328 info!(
2330 "Promoting learner batch of {} nodes: {:?} (total voters: {} -> {})",
2331 promotion_node_ids.len(),
2332 promotion_node_ids,
2333 current_voters,
2334 current_voters + promotion_node_ids.len()
2335 );
2336
2337 let result = self.safe_batch_promote(promotion_node_ids.clone(), ctx, role_tx).await;
2339
2340 if let Err(e) = result {
2341 for entry in promotion_entries.into_iter().rev() {
2343 self.pending_promotions.push_front(entry);
2344 }
2345 return Err(e);
2346 }
2347
2348 info!(
2349 "Promotion successful. Cluster members: {:?}",
2350 membership.voters().await
2351 );
2352 }
2353
2354 trace!(
2355 ?self.pending_promotions,
2356 "Step 6: Reschedule if any pending promotions remain"
2357 );
2358 if !self.pending_promotions.is_empty() {
2360 debug!(
2361 "[Leader {}] ๐ Re-sending PromoteReadyLearners for remaining pending: {:?}",
2362 self.node_id(),
2363 self.pending_promotions.iter().map(|p| p.node_id).collect::<Vec<_>>()
2364 );
2365 role_tx
2367 .send(RoleEvent::ReprocessEvent(Box::new(
2368 RaftEvent::PromoteReadyLearners,
2369 )))
2370 .map_err(|e| {
2371 let error_str = format!("{e:?}");
2372 error!("Send PromoteReadyLearners event failed: {}", error_str);
2373 NetworkError::SingalSendFailed(error_str)
2374 })?;
2375 }
2376
2377 Ok(())
2378 }
2379
2380 pub(super) fn drain_batch(
2382 &mut self,
2383 count: usize,
2384 ) -> Vec<PendingPromotion> {
2385 let mut batch = Vec::with_capacity(count);
2386 for _ in 0..count {
2387 if let Some(entry) = self.pending_promotions.pop_front() {
2388 batch.push(entry);
2389 } else {
2390 break;
2391 }
2392 }
2393 batch
2394 }
2395
2396 async fn safe_batch_promote(
2397 &mut self,
2398 batch: Vec<u32>,
2399 ctx: &RaftContext<T>,
2400 role_tx: &mpsc::UnboundedSender<RoleEvent>,
2401 ) -> Result<()> {
2402 let change = Change::BatchPromote(BatchPromote {
2403 node_ids: batch.clone(),
2404 new_status: NodeStatus::Active as i32,
2405 });
2406
2407 self.verify_leadership_persistent(vec![EntryPayload::config(change)], true, ctx, role_tx)
2410 .await?;
2411
2412 Ok(())
2413 }
2414
2415 async fn run_periodic_maintenance(
2416 &mut self,
2417 role_tx: &mpsc::UnboundedSender<RoleEvent>,
2418 ctx: &RaftContext<T>,
2419 ) -> Result<()> {
2420 if let Err(e) = self.conditionally_purge_stale_learners(role_tx, ctx).await {
2421 error!("Stale learner purge failed: {}", e);
2422 }
2423
2424 if let Err(e) = self.conditionally_purge_zombie_nodes(role_tx, ctx).await {
2425 error!("Zombie node purge failed: {}", e);
2426 }
2427
2428 self.reset_next_membership_maintenance_check(
2431 ctx.node_config().raft.membership.membership_maintenance_interval,
2432 );
2433 Ok(())
2434 }
2435
2436 pub async fn conditionally_purge_stale_learners(
2440 &mut self,
2441 role_tx: &mpsc::UnboundedSender<RoleEvent>,
2442 ctx: &RaftContext<T>,
2443 ) -> Result<()> {
2444 let config = &ctx.node_config.raft.membership.promotion;
2445
2446 if self.pending_promotions.is_empty()
2448 || self.next_membership_maintenance_check > Instant::now()
2449 {
2450 trace!("Skipping stale learner check");
2451 return Ok(());
2452 }
2453
2454 let now = Instant::now();
2455 let queue_len = self.pending_promotions.len();
2456
2457 let inspect_count = queue_len.min(100).min(1.max(queue_len / 100));
2459 let mut stale_entries = Vec::new();
2460
2461 trace!("Inspecting {} entries", inspect_count);
2462 for _ in 0..inspect_count {
2463 if let Some(entry) = self.pending_promotions.pop_front() {
2464 trace!(
2465 "Inspecting entry: {:?} - {:?} - {:?}",
2466 entry,
2467 now.duration_since(entry.ready_since),
2468 &config.stale_learner_threshold
2469 );
2470 if now.duration_since(entry.ready_since) > config.stale_learner_threshold {
2471 stale_entries.push(entry);
2472 } else {
2473 self.pending_promotions.push_front(entry);
2475 break;
2476 }
2477 } else {
2478 break;
2479 }
2480 }
2481
2482 trace!("Stale learner check completed: {:?}", stale_entries);
2483
2484 for entry in stale_entries {
2486 if let Err(e) = self.handle_stale_learner(entry.node_id, role_tx, ctx).await {
2487 error!("Failed to handle stale learner: {}", e);
2488 }
2489 }
2490
2491 Ok(())
2492 }
2493
2494 async fn conditionally_purge_zombie_nodes(
2496 &mut self,
2497 role_tx: &mpsc::UnboundedSender<RoleEvent>,
2498 ctx: &RaftContext<T>,
2499 ) -> Result<()> {
2500 let membership = ctx.membership();
2502 let zombie_candidates = membership.get_zombie_candidates().await;
2503 let mut nodes_to_remove = Vec::new();
2504
2505 for node_id in zombie_candidates {
2506 if let Some(status) = membership.get_node_status(node_id).await {
2507 if status != NodeStatus::Active {
2508 nodes_to_remove.push(node_id);
2509 }
2510 }
2511 }
2512 if !nodes_to_remove.is_empty() {
2514 let change = Change::BatchRemove(BatchRemove {
2515 node_ids: nodes_to_remove.clone(),
2516 });
2517
2518 info!(
2519 "Proposing batch removal of zombie nodes: {:?}",
2520 nodes_to_remove
2521 );
2522
2523 match self
2526 .verify_leadership_persistent(
2527 vec![EntryPayload::config(change)],
2528 false,
2529 ctx,
2530 role_tx,
2531 )
2532 .await
2533 {
2534 Ok(true) => {
2535 info!("Batch removal committed for nodes: {:?}", nodes_to_remove);
2536 }
2537 Ok(false) => {
2538 warn!("Failed to commit batch removal");
2539 }
2540 Err(e) => {
2541 error!("Batch removal error: {:?}", e);
2542 return Err(e);
2543 }
2544 }
2545 }
2546
2547 Ok(())
2548 }
2549
2550 pub fn reset_next_membership_maintenance_check(
2551 &mut self,
2552 membership_maintenance_interval: Duration,
2553 ) {
2554 self.next_membership_maintenance_check = Instant::now() + membership_maintenance_interval;
2555 }
2556
2557 pub async fn handle_stale_learner(
2559 &mut self,
2560 node_id: u32,
2561 role_tx: &mpsc::UnboundedSender<RoleEvent>,
2562 ctx: &RaftContext<T>,
2563 ) -> Result<()> {
2564 warn!(
2566 "Learner {} is stalled, removing from cluster via consensus",
2567 node_id
2568 );
2569
2570 let change = Change::BatchRemove(BatchRemove {
2571 node_ids: vec![node_id],
2572 });
2573
2574 match self
2577 .verify_leadership_persistent(vec![EntryPayload::config(change)], true, ctx, role_tx)
2578 .await
2579 {
2580 Ok(true) => {
2581 info!(
2582 "Stalled learner {} successfully removed from cluster",
2583 node_id
2584 );
2585 }
2586 Ok(false) => {
2587 warn!("Failed to commit removal of stalled learner {}", node_id);
2588 }
2589 Err(e) => {
2590 error!("Error removing stalled learner {}: {:?}", node_id, e);
2591 return Err(e);
2592 }
2593 }
2594
2595 Ok(())
2596 }
2597
2598 pub fn is_lease_valid(
2600 &self,
2601 ctx: &RaftContext<T>,
2602 ) -> bool {
2603 let now = std::time::SystemTime::now()
2604 .duration_since(std::time::UNIX_EPOCH)
2605 .unwrap_or_default()
2606 .as_millis() as u64;
2607
2608 let last_confirmed = self.lease_timestamp.load(std::sync::atomic::Ordering::Acquire);
2609 let lease_duration = ctx.node_config().raft.read_consistency.lease_duration_ms;
2610
2611 if now < last_confirmed {
2612 error!("Clock moved backwards: Now {now}, Last Confirmed {last_confirmed}");
2614 return false;
2615 }
2616
2617 if now == last_confirmed {
2619 return true;
2620 }
2621 (now - last_confirmed) < lease_duration
2622 }
2623
2624 fn update_lease_timestamp(&self) {
2626 let now = std::time::SystemTime::now()
2627 .duration_since(std::time::UNIX_EPOCH)
2628 .unwrap_or_default()
2629 .as_millis() as u64;
2630
2631 self.lease_timestamp.store(now, std::sync::atomic::Ordering::Release);
2632 }
2633
2634 async fn verify_leadership_and_refresh_lease(
2640 &mut self,
2641 ctx: &RaftContext<T>,
2642 role_tx: &mpsc::UnboundedSender<RoleEvent>,
2643 ) -> Result<()> {
2644 match self.verify_internal_quorum(vec![], true, ctx, role_tx).await {
2645 Ok(QuorumVerificationResult::Success) => {
2646 self.update_lease_timestamp();
2647 Ok(())
2648 }
2649 Ok(QuorumVerificationResult::LeadershipLost) => Err(ConsensusError::Replication(
2650 ReplicationError::NotLeader { leader_id: None },
2651 )
2652 .into()),
2653 Ok(QuorumVerificationResult::RetryRequired) => {
2654 Err(ConsensusError::Replication(ReplicationError::QuorumNotReached).into())
2655 }
2656 Err(e) => Err(e),
2657 }
2658 }
2659
2660 pub(super) async fn process_linearizable_read_batch(
2667 &mut self,
2668 ctx: &RaftContext<T>,
2669 role_tx: &mpsc::UnboundedSender<RoleEvent>,
2670 ) -> Result<()> {
2671 let batch = std::mem::take(&mut self.read_buffer);
2673 self.read_buffer_start_time = None;
2674
2675 if batch.is_empty() {
2676 return Ok(());
2677 }
2678
2679 let batch_size = batch.len();
2680 info!("Read batch: flushing {} requests", batch_size);
2681
2682 if let Err(e) = self.verify_leadership_and_refresh_lease(ctx, role_tx).await {
2685 error!(
2687 "Read batch: verification failed - draining {} requests: {}",
2688 batch_size, e
2689 );
2690 let status = tonic::Status::failed_precondition(format!(
2691 "Leadership verification failed: {e:?}"
2692 ));
2693 for (_, sender) in batch {
2694 let _ = sender.send(Err(status.clone()));
2695 }
2696 return Ok(());
2697 }
2698
2699 let last_applied = ctx.state_machine().last_applied().index;
2701 let read_index = self.calculate_read_index();
2702
2703 if last_applied < read_index {
2704 ctx.handlers.state_machine_handler.update_pending(read_index);
2705 let timeout_ms = self.node_config.raft.read_consistency.state_machine_sync_timeout_ms;
2706 ctx.handlers
2707 .state_machine_handler
2708 .wait_applied(read_index, Duration::from_millis(timeout_ms))
2709 .await
2710 .map_err(|e| {
2711 error!("Read batch: wait_applied failed: {}", e);
2712 e
2713 })?;
2714 }
2715
2716 for (req, sender) in batch {
2718 let results = ctx
2719 .handlers
2720 .state_machine_handler
2721 .read_from_state_machine(req.keys)
2722 .unwrap_or_default();
2723 let _ = sender.send(Ok(ClientResponse::read_results(results)));
2724 }
2725
2726 Ok(())
2732 }
2733
2734 #[cfg(test)]
2735 pub(crate) fn test_update_lease_timestamp(&self) {
2736 self.update_lease_timestamp();
2737 }
2738}
2739
2740impl<T: TypeConfig> From<&CandidateState<T>> for LeaderState<T> {
2741 fn from(candidate: &CandidateState<T>) -> Self {
2742 let ReplicationConfig {
2743 rpc_append_entries_in_batch_threshold,
2744 rpc_append_entries_batch_process_delay_in_ms,
2745 rpc_append_entries_clock_in_ms,
2746 ..
2747 } = candidate.node_config.raft.replication;
2748
2749 let shared_state = candidate.shared_state.clone();
2751 shared_state.set_current_leader(candidate.node_id());
2752
2753 Self {
2754 shared_state,
2755 timer: Box::new(ReplicationTimer::new(
2756 rpc_append_entries_clock_in_ms,
2757 rpc_append_entries_batch_process_delay_in_ms,
2758 )),
2759 next_index: HashMap::new(),
2760 match_index: HashMap::new(),
2761 noop_log_id: None,
2762
2763 batch_buffer: Box::new(BatchBuffer::new(
2764 rpc_append_entries_in_batch_threshold,
2765 Duration::from_millis(rpc_append_entries_batch_process_delay_in_ms),
2766 )),
2767
2768 node_config: candidate.node_config.clone(),
2769
2770 scheduled_purge_upto: None,
2771 last_purged_index: candidate.last_purged_index,
2772 last_learner_check: Instant::now(),
2773 snapshot_in_progress: AtomicBool::new(false),
2774 peer_purge_progress: HashMap::new(),
2775 next_membership_maintenance_check: Instant::now(),
2776 pending_promotions: VecDeque::new(),
2777 cluster_metadata: ClusterMetadata {
2778 single_voter: false,
2779 total_voters: 0,
2780 replication_targets: vec![],
2781 },
2782 lease_timestamp: AtomicU64::new(0),
2783 read_buffer: Vec::new(),
2784 read_buffer_start_time: None,
2785 _marker: PhantomData,
2786 }
2787 }
2788}
2789
2790impl<T: TypeConfig> Debug for LeaderState<T> {
2791 fn fmt(
2792 &self,
2793 f: &mut std::fmt::Formatter<'_>,
2794 ) -> std::fmt::Result {
2795 f.debug_struct("LeaderState")
2796 .field("shared_state", &self.shared_state)
2797 .field("next_index", &self.next_index)
2798 .field("match_index", &self.match_index)
2799 .field("noop_log_id", &self.noop_log_id)
2800 .finish()
2801 }
2802}
2803
2804pub fn calculate_safe_batch_size(
2812 current: usize,
2813 available: usize,
2814) -> usize {
2815 if (current + available) % 2 == 1 {
2816 available
2818 } else {
2819 available.saturating_sub(1)
2822 }
2823}
2824
2825pub(super) fn send_replay_raft_event(
2826 role_tx: &mpsc::UnboundedSender<RoleEvent>,
2827 raft_event: RaftEvent,
2828) -> Result<()> {
2829 role_tx.send(RoleEvent::ReprocessEvent(Box::new(raft_event))).map_err(|e| {
2830 let error_str = format!("{e:?}");
2831 error!("Failed to send: {}", error_str);
2832 NetworkError::SingalSendFailed(error_str).into()
2833 })
2834}