1use std::collections::HashMap;
2use std::collections::VecDeque;
3use std::fmt::Debug;
4use std::marker::PhantomData;
5use std::sync::Arc;
6use std::sync::atomic::AtomicBool;
7use std::sync::atomic::AtomicU64;
8use std::sync::atomic::Ordering;
9use std::time::Duration;
10
11use d_engine_proto::client::ClientResponse;
12use d_engine_proto::client::ReadConsistencyPolicy as ClientReadConsistencyPolicy;
13use d_engine_proto::common::AddNode;
14use d_engine_proto::common::BatchPromote;
15use d_engine_proto::common::BatchRemove;
16use d_engine_proto::common::EntryPayload;
17use d_engine_proto::common::LogId;
18use d_engine_proto::common::NodeRole::Leader;
19use d_engine_proto::common::NodeStatus;
20use d_engine_proto::common::membership_change::Change;
21use d_engine_proto::error::ErrorCode;
22use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
23use d_engine_proto::server::cluster::JoinRequest;
24use d_engine_proto::server::cluster::JoinResponse;
25use d_engine_proto::server::cluster::LeaderDiscoveryResponse;
26use d_engine_proto::server::cluster::NodeMeta;
27use d_engine_proto::server::election::VoteResponse;
28use d_engine_proto::server::election::VotedFor;
29use d_engine_proto::server::replication::AppendEntriesResponse;
30use d_engine_proto::server::storage::PurgeLogRequest;
31use d_engine_proto::server::storage::PurgeLogResponse;
32use d_engine_proto::server::storage::SnapshotChunk;
33use d_engine_proto::server::storage::SnapshotMetadata;
34use nanoid::nanoid;
35use tokio::sync::mpsc;
36use tokio::sync::oneshot;
37use tokio::time::Instant;
38use tokio::time::sleep;
39use tokio::time::timeout;
40use tonic::Status;
41use tonic::async_trait;
42use tracing::debug;
43use tracing::error;
44use tracing::info;
45use tracing::instrument;
46use tracing::trace;
47use tracing::warn;
48
49use super::LeaderStateSnapshot;
50use super::RaftRole;
51use super::SharedState;
52use super::StateSnapshot;
53use super::candidate_state::CandidateState;
54use super::role_state::RaftRoleState;
55use crate::AppendResults;
56use crate::BackgroundSnapshotTransfer;
57use crate::BatchBuffer;
58use crate::ConnectionType;
59use crate::ConsensusError;
60use crate::Error;
61use crate::MaybeCloneOneshot;
62use crate::MaybeCloneOneshotSender;
63use crate::Membership;
64use crate::MembershipError;
65use crate::NetworkError;
66use crate::PeerUpdate;
67use crate::PurgeExecutor;
68use crate::QuorumVerificationResult;
69use crate::RaftContext;
70use crate::RaftEvent;
71use crate::RaftLog;
72use crate::RaftNodeConfig;
73use crate::RaftOneshot;
74use crate::RaftRequestWithSignal;
75use crate::ReadConsistencyPolicy as ServerReadConsistencyPolicy;
76use crate::ReplicationConfig;
77use crate::ReplicationCore;
78use crate::ReplicationError;
79use crate::ReplicationTimer;
80use crate::Result;
81use crate::RoleEvent;
82use crate::SnapshotConfig;
83use crate::StateMachine;
84use crate::StateMachineHandler;
85use crate::StateTransitionError;
86use crate::SystemError;
87use crate::Transport;
88use crate::TypeConfig;
89use crate::alias::MOF;
90use crate::alias::ROF;
91use crate::alias::SMHOF;
92use crate::client_command_to_entry_payloads;
93use crate::cluster::is_majority;
94use crate::ensure_safe_join;
95use crate::scoped_timer::ScopedTimer;
96use crate::stream::create_production_snapshot_stream;
97use crate::utils::cluster::error;
98
99#[derive(Debug, Clone)]
101pub struct PendingPromotion {
102 pub node_id: u32,
103 pub ready_since: Instant,
104}
105
106impl PendingPromotion {
107 pub fn new(
108 node_id: u32,
109 ready_since: Instant,
110 ) -> Self {
111 PendingPromotion {
112 node_id,
113 ready_since,
114 }
115 }
116}
117
118#[derive(Debug, Clone)]
123pub struct ClusterMetadata {
124 pub single_voter: bool,
126 pub total_voters: usize,
128 pub replication_targets: Vec<NodeMeta>,
130}
131
132pub struct LeaderState<T: TypeConfig> {
140 pub shared_state: SharedState,
143
144 pub next_index: HashMap<u32, u64>,
149
150 pub(super) match_index: HashMap<u32, u64>,
155
156 pub(super) noop_log_id: Option<u64>,
159
160 pub scheduled_purge_upto: Option<LogId>,
170
171 pub last_purged_index: Option<LogId>,
182
183 pub peer_purge_progress: HashMap<u32, u64>,
189
190 pub snapshot_in_progress: AtomicBool,
192
193 batch_buffer: Box<BatchBuffer<RaftRequestWithSignal>>,
200
201 timer: Box<ReplicationTimer>,
209
210 pub(super) node_config: Arc<RaftNodeConfig>,
218
219 pub last_learner_check: Instant,
221
222 pub next_membership_maintenance_check: Instant,
236
237 pub pending_promotions: VecDeque<PendingPromotion>,
239
240 pub(super) lease_timestamp: AtomicU64,
243
244 pub(super) cluster_metadata: ClusterMetadata,
248
249 _marker: PhantomData<T>,
252}
253
254#[async_trait]
255impl<T: TypeConfig> RaftRoleState for LeaderState<T> {
256 type T = T;
257
258 fn shared_state(&self) -> &SharedState {
259 &self.shared_state
260 }
261
262 fn shared_state_mut(&mut self) -> &mut SharedState {
263 &mut self.shared_state
264 }
265
266 #[tracing::instrument]
270 fn update_commit_index(
271 &mut self,
272 new_commit_index: u64,
273 ) -> Result<()> {
274 if self.commit_index() < new_commit_index {
275 debug!("update_commit_index to: {:?}", new_commit_index);
276 self.shared_state.commit_index = new_commit_index;
277 } else {
278 warn!(
279 "Illegal operation, might be a bug! I am Leader old_commit_index({}) >= new_commit_index:({})",
280 self.commit_index(),
281 new_commit_index
282 )
283 }
284 Ok(())
285 }
286
287 fn voted_for(&self) -> Result<Option<VotedFor>> {
289 self.shared_state().voted_for()
290 }
291
292 fn update_voted_for(
295 &mut self,
296 voted_for: VotedFor,
297 ) -> Result<bool> {
298 self.shared_state_mut().update_voted_for(voted_for)
299 }
300
301 fn next_index(
302 &self,
303 node_id: u32,
304 ) -> Option<u64> {
305 Some(if let Some(n) = self.next_index.get(&node_id) {
306 *n
307 } else {
308 1
309 })
310 }
311
312 fn update_next_index(
313 &mut self,
314 node_id: u32,
315 new_next_id: u64,
316 ) -> Result<()> {
317 debug!("update_next_index({}) to {}", node_id, new_next_id);
318 self.next_index.insert(node_id, new_next_id);
319 Ok(())
320 }
321
322 fn update_match_index(
323 &mut self,
324 node_id: u32,
325 new_match_id: u64,
326 ) -> Result<()> {
327 self.match_index.insert(node_id, new_match_id);
328 Ok(())
329 }
330
331 fn match_index(
332 &self,
333 node_id: u32,
334 ) -> Option<u64> {
335 self.match_index.get(&node_id).copied()
336 }
337
338 fn init_peers_next_index_and_match_index(
339 &mut self,
340 last_entry_id: u64,
341 peer_ids: Vec<u32>,
342 ) -> Result<()> {
343 for peer_id in peer_ids {
344 debug!("init leader state for peer_id: {:?}", peer_id);
345 let new_next_id = last_entry_id + 1;
346 self.update_next_index(peer_id, new_next_id)?;
347 self.update_match_index(peer_id, 0)?;
348 }
349 Ok(())
350 }
351 fn noop_log_id(&self) -> Result<Option<u64>> {
352 Ok(self.noop_log_id)
353 }
354
355 async fn verify_leadership_persistent(
374 &mut self,
375 payloads: Vec<EntryPayload>,
376 bypass_queue: bool,
377 ctx: &RaftContext<T>,
378 role_tx: &mpsc::UnboundedSender<RoleEvent>,
379 ) -> Result<bool> {
380 let initial_delay =
381 Duration::from_millis(ctx.node_config.retry.internal_quorum.base_delay_ms);
382 let max_delay = Duration::from_millis(ctx.node_config.retry.internal_quorum.max_delay_ms);
383 let global_timeout = ctx.node_config.raft.membership.verify_leadership_persistent_timeout;
384
385 let mut current_delay = initial_delay;
386 let start_time = Instant::now();
387
388 loop {
389 match self.verify_internal_quorum(payloads.clone(), bypass_queue, ctx, role_tx).await {
390 Ok(QuorumVerificationResult::Success) => return Ok(true),
391 Ok(QuorumVerificationResult::LeadershipLost) => return Ok(false),
392 Ok(QuorumVerificationResult::RetryRequired) => {
393 if start_time.elapsed() > global_timeout {
395 return Err(NetworkError::GlobalTimeout(
396 "Leadership verification timed out".to_string(),
397 )
398 .into());
399 }
400
401 current_delay =
402 current_delay.checked_mul(2).unwrap_or(max_delay).min(max_delay);
403 let jitter = Duration::from_millis(rand::random::<u64>() % 500);
404 sleep(current_delay + jitter).await;
405 }
406 Err(e) => return Err(e),
407 }
408 }
409 }
410
411 async fn verify_leadership_limited_retry(
434 &mut self,
435 payloads: Vec<EntryPayload>,
436 bypass_queue: bool,
437 ctx: &RaftContext<T>,
438 role_tx: &mpsc::UnboundedSender<RoleEvent>,
439 ) -> Result<bool> {
440 let retry_policy = ctx.node_config.retry.internal_quorum;
441 let max_retries = retry_policy.max_retries;
442 let initial_delay =
443 Duration::from_millis(ctx.node_config.retry.internal_quorum.base_delay_ms);
444 let max_delay = Duration::from_millis(ctx.node_config.retry.internal_quorum.max_delay_ms);
445
446 let mut current_delay = initial_delay;
447 let mut attempts = 0;
448
449 loop {
450 match self.verify_internal_quorum(payloads.clone(), bypass_queue, ctx, role_tx).await {
451 Ok(QuorumVerificationResult::Success) => return Ok(true),
452 Ok(QuorumVerificationResult::LeadershipLost) => return Ok(false),
453 Ok(QuorumVerificationResult::RetryRequired) => {
454 debug!(%attempts, "verify_internal_quorum");
455 if attempts >= max_retries {
456 return Err(NetworkError::TaskBackoffFailed(
457 "Max retries exceeded".to_string(),
458 )
459 .into());
460 }
461
462 current_delay =
463 current_delay.checked_mul(2).unwrap_or(max_delay).min(max_delay);
464 let jitter = Duration::from_millis(rand::random::<u64>() % 500);
465 sleep(current_delay + jitter).await;
466
467 attempts += 1;
468 }
469 Err(e) => return Err(e),
470 }
471 }
472 }
473
474 async fn init_cluster_metadata(
507 &mut self,
508 membership: &Arc<T::M>,
509 ) -> Result<()> {
510 let voters = membership.voters().await;
512 let total_voters = voters.len() + 1; let replication_targets = membership.replication_peers().await;
516
517 let single_voter = total_voters == 1;
519
520 self.cluster_metadata = ClusterMetadata {
521 single_voter,
522 total_voters,
523 replication_targets: replication_targets.clone(),
524 };
525
526 debug!(
527 "Initialized cluster metadata: single_voter={}, total_voters={}, replication_targets={}",
528 single_voter,
529 total_voters,
530 replication_targets.len()
531 );
532 Ok(())
533 }
534
535 async fn verify_internal_quorum(
536 &mut self,
537 payloads: Vec<EntryPayload>,
538 bypass_queue: bool,
539 ctx: &RaftContext<T>,
540 role_tx: &mpsc::UnboundedSender<RoleEvent>,
541 ) -> Result<QuorumVerificationResult> {
542 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
543
544 self.process_raft_request(
545 RaftRequestWithSignal {
546 id: nanoid!(),
547 payloads,
548 sender: resp_tx,
549 },
550 ctx,
551 bypass_queue,
552 role_tx,
553 )
554 .await?;
555
556 let timeout_duration =
558 Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
559 match timeout(timeout_duration, resp_rx).await {
560 Ok(Ok(Ok(response))) => {
562 debug!("Leadership check response: {:?}", response);
563
564 Ok(if response.is_write_success() {
566 QuorumVerificationResult::Success
567 } else if response.is_retry_required() {
568 QuorumVerificationResult::RetryRequired
570 } else {
571 QuorumVerificationResult::LeadershipLost
573 })
574 }
575
576 Ok(Ok(Err(status))) => {
578 warn!("Leadership rejected by follower: {status:?}");
579 Ok(QuorumVerificationResult::LeadershipLost)
580 }
581
582 Ok(Err(e)) => {
584 error!("Channel error during leadership check: {:?}", e);
585 Err(NetworkError::SingalReceiveFailed(e.to_string()).into())
586 }
587
588 Err(_) => {
590 warn!("Leadership check timed out after {:?}", timeout_duration);
591 Err(NetworkError::Timeout {
592 node_id: self.node_id(),
593 duration: timeout_duration,
594 }
595 .into())
596 }
597 }
598 }
599
600 fn is_leader(&self) -> bool {
601 true
602 }
603
604 fn become_leader(&self) -> Result<RaftRole<T>> {
605 warn!("I am leader already");
606
607 Err(StateTransitionError::InvalidTransition.into())
608 }
609
610 fn become_candidate(&self) -> Result<RaftRole<T>> {
611 error!("Leader can not become Candidate");
612
613 Err(StateTransitionError::InvalidTransition.into())
614 }
615
616 fn become_follower(&self) -> Result<RaftRole<T>> {
617 info!(
618 "Node {} term {} transitioning to Follower",
619 self.node_id(),
620 self.current_term(),
621 );
622 println!(
623 "[Node {}] Leader โ Follower (term {})",
624 self.node_id(),
625 self.current_term()
626 );
627 Ok(RaftRole::Follower(Box::new(self.into())))
628 }
629
630 fn become_learner(&self) -> Result<RaftRole<T>> {
631 error!("Leader can not become Learner");
632
633 Err(StateTransitionError::InvalidTransition.into())
634 }
635
636 fn is_timer_expired(&self) -> bool {
637 self.timer.is_expired()
638 }
639
640 fn reset_timer(&mut self) {
642 self.timer.reset_batch();
643 self.timer.reset_replication();
644 }
645
646 fn next_deadline(&self) -> Instant {
647 self.timer.next_deadline()
648 }
649
650 async fn tick(
652 &mut self,
653 role_tx: &mpsc::UnboundedSender<RoleEvent>,
654 _raft_tx: &mpsc::Sender<RaftEvent>,
655 ctx: &RaftContext<T>,
656 ) -> Result<()> {
657 let now = Instant::now();
658 self.shared_state().set_current_leader(self.node_id());
660
661 if let Err(e) = self.run_periodic_maintenance(role_tx, ctx).await {
663 error!("Failed to run periodic maintenance: {}", e);
664 }
665
666 if now >= self.timer.batch_deadline() {
668 self.timer.reset_batch();
669
670 if self.batch_buffer.should_flush() {
671 debug!(?now, "tick::reset_batch batch timer");
672 self.timer.reset_replication();
673
674 let batch = self.batch_buffer.take();
677 self.process_batch(batch, role_tx, ctx).await?;
678 }
679 }
680
681 if now >= self.timer.replication_deadline() {
684 debug!(?now, "tick::reset_replication timer");
685 self.timer.reset_replication();
686
687 let batch = self.batch_buffer.take();
689 self.process_batch(batch, role_tx, ctx).await?;
690 }
691
692 Ok(())
693 }
694
695 async fn handle_raft_event(
696 &mut self,
697 raft_event: RaftEvent,
698 ctx: &RaftContext<T>,
699 role_tx: mpsc::UnboundedSender<RoleEvent>,
700 ) -> Result<()> {
701 let state_machine = ctx.state_machine();
702 let last_applied_index = state_machine.last_applied().index;
703 let my_id = self.shared_state.node_id;
704 let my_term = self.current_term();
705
706 match raft_event {
707 RaftEvent::ReceiveVoteRequest(vote_request, sender) => {
714 debug!(
715 "handle_raft_event::RaftEvent::ReceiveVoteRequest: {:?}",
716 &vote_request
717 );
718
719 let my_term = self.current_term();
720 if my_term < vote_request.term {
721 self.update_current_term(vote_request.term);
722 self.send_become_follower_event(None, &role_tx)?;
724
725 info!("Leader will not process Vote request, it should let Follower do it.");
726 send_replay_raft_event(
727 &role_tx,
728 RaftEvent::ReceiveVoteRequest(vote_request, sender),
729 )?;
730 } else {
731 let last_log_id =
732 ctx.raft_log().last_log_id().unwrap_or(LogId { index: 0, term: 0 });
733 let response = VoteResponse {
734 term: my_term,
735 vote_granted: false,
736 last_log_index: last_log_id.index,
737 last_log_term: last_log_id.term,
738 };
739 sender.send(Ok(response)).map_err(|e| {
740 let error_str = format!("{e:?}");
741 error!("Failed to send: {}", error_str);
742 NetworkError::SingalSendFailed(error_str)
743 })?;
744 }
745 }
746
747 RaftEvent::ClusterConf(_metadata_request, sender) => {
748 let cluster_conf = ctx
749 .membership()
750 .retrieve_cluster_membership_config(self.shared_state().current_leader())
751 .await;
752 debug!("Leader receive ClusterConf: {:?}", &cluster_conf);
753
754 sender.send(Ok(cluster_conf)).map_err(|e| {
755 let error_str = format!("{e:?}");
756 error!("Failed to send: {}", error_str);
757 NetworkError::SingalSendFailed(error_str)
758 })?;
759 }
760
761 RaftEvent::ClusterConfUpdate(cluste_conf_change_request, sender) => {
762 let current_conf_version = ctx.membership().get_cluster_conf_version().await;
763 debug!(%current_conf_version, ?cluste_conf_change_request,
764 "handle_raft_event::RaftEvent::ClusterConfUpdate",
765 );
766
767 if my_term >= cluste_conf_change_request.term {
769 let response = ClusterConfUpdateResponse::higher_term(
770 my_id,
771 my_term,
772 current_conf_version,
773 );
774
775 sender.send(Ok(response)).map_err(|e| {
776 let error_str = format!("{e:?}");
777 error!("Failed to send: {}", error_str);
778 NetworkError::SingalSendFailed(error_str)
779 })?;
780 } else {
781 info!(
783 "my({}) term < request one, now I will step down to Follower",
784 my_id
785 );
786 self.send_become_follower_event(Some(cluste_conf_change_request.id), &role_tx)?;
788
789 info!(
790 "Leader will not process append_entries_request, it should let Follower do it."
791 );
792 send_replay_raft_event(
793 &role_tx,
794 RaftEvent::ClusterConfUpdate(cluste_conf_change_request, sender),
795 )?;
796 }
797 }
798
799 RaftEvent::AppendEntries(append_entries_request, sender) => {
800 debug!(
801 "handle_raft_event::RaftEvent::AppendEntries: {:?}",
802 &append_entries_request
803 );
804
805 if my_term >= append_entries_request.term {
807 let response = AppendEntriesResponse::higher_term(my_id, my_term);
808
809 sender.send(Ok(response)).map_err(|e| {
810 let error_str = format!("{e:?}");
811 error!("Failed to send: {}", error_str);
812 NetworkError::SingalSendFailed(error_str)
813 })?;
814 } else {
815 info!(
817 "my({}) term < request one, now I will step down to Follower",
818 my_id
819 );
820 self.send_become_follower_event(
822 Some(append_entries_request.leader_id),
823 &role_tx,
824 )?;
825
826 info!(
827 "Leader will not process append_entries_request, it should let Follower do it."
828 );
829 send_replay_raft_event(
830 &role_tx,
831 RaftEvent::AppendEntries(append_entries_request, sender),
832 )?;
833 }
834 }
835
836 RaftEvent::ClientPropose(client_write_request, sender) => {
837 if let Err(e) = self
838 .process_raft_request(
839 RaftRequestWithSignal {
840 id: nanoid!(),
841 payloads: client_command_to_entry_payloads(
842 client_write_request.commands,
843 ),
844 sender,
845 },
846 ctx,
847 false,
848 &role_tx,
849 )
850 .await
851 {
852 error("Leader::process_raft_request", &e);
853 return Err(e);
854 }
855 }
856
857 RaftEvent::ClientReadRequest(client_read_request, sender) => {
858 let _timer = ScopedTimer::new("leader_linear_read");
859 debug!(
860 "Leader::ClientReadRequest client_read_request:{:?}",
861 &client_read_request
862 );
863
864 let keys = client_read_request.keys.clone();
865 let response: std::result::Result<ClientResponse, tonic::Status> = {
866 let read_operation =
867 || -> std::result::Result<ClientResponse, tonic::Status> {
868 let results = ctx
869 .handlers
870 .state_machine_handler
871 .read_from_state_machine(keys)
872 .unwrap_or_default();
873 debug!("handle_client_read results: {:?}", results);
874 Ok(ClientResponse::read_results(results))
875 };
876
877 let effective_policy = if client_read_request.has_consistency_policy() {
879 if ctx.node_config().raft.read_consistency.allow_client_override {
881 match client_read_request.consistency_policy() {
882 ClientReadConsistencyPolicy::LeaseRead => {
883 ServerReadConsistencyPolicy::LeaseRead
884 }
885 ClientReadConsistencyPolicy::LinearizableRead => {
886 ServerReadConsistencyPolicy::LinearizableRead
887 }
888 ClientReadConsistencyPolicy::EventualConsistency => {
889 ServerReadConsistencyPolicy::EventualConsistency
890 }
891 }
892 } else {
893 ctx.node_config().raft.read_consistency.default_policy.clone()
895 }
896 } else {
897 ctx.node_config().raft.read_consistency.default_policy.clone()
899 };
900
901 match effective_policy {
903 ServerReadConsistencyPolicy::LinearizableRead => {
904 if !self
905 .verify_leadership_limited_retry(vec![], true, ctx, &role_tx)
906 .await
907 .unwrap_or(false)
908 {
909 warn!("enforce_quorum_consensus failed for linear read request");
910
911 Err(tonic::Status::failed_precondition(
912 "enforce_quorum_consensus failed".to_string(),
913 ))
914 } else if let Err(e) = self
915 .ensure_state_machine_upto_commit_index(
916 &ctx.handlers.state_machine_handler,
917 last_applied_index,
918 )
919 .await
920 {
921 warn!(
922 "ensure_state_machine_upto_commit_index failed for linear read request"
923 );
924 Err(tonic::Status::failed_precondition(format!(
925 "ensure_state_machine_upto_commit_index failed: {e:?}"
926 )))
927 } else {
928 read_operation()
929 }
930 }
931 ServerReadConsistencyPolicy::LeaseRead => {
932 if self.is_lease_valid(ctx) {
934 read_operation()
936 } else {
937 if !self
939 .verify_leadership_limited_retry(vec![], true, ctx, &role_tx)
940 .await
941 .unwrap_or(false)
942 {
943 warn!("LeaseRead policy: lease renewal failed");
944 Err(tonic::Status::failed_precondition(
945 "LeaseRead policy: lease renewal failed".to_string(),
946 ))
947 } else {
948 self.update_lease_timestamp();
950 read_operation()
951 }
952 }
953 }
954 ServerReadConsistencyPolicy::EventualConsistency => {
955 debug!("EventualConsistency: serving local read without verification");
958 read_operation()
959 }
960 }
961 };
962
963 debug!(
964 "Leader::ClientReadRequest is going to response: {:?}",
965 &response
966 );
967 sender.send(response).map_err(|e| {
968 let error_str = format!("{e:?}");
969 error!("Failed to send: {}", error_str);
970 NetworkError::SingalSendFailed(error_str)
971 })?;
972 }
973
974 RaftEvent::InstallSnapshotChunk(_streaming, sender) => {
975 sender
976 .send(Err(Status::permission_denied("Not Follower or Learner. ")))
977 .map_err(|e| {
978 let error_str = format!("{e:?}");
979 error!("Failed to send: {}", error_str);
980 NetworkError::SingalSendFailed(error_str)
981 })?;
982
983 return Err(ConsensusError::RoleViolation {
984 current_role: "Leader",
985 required_role: "Follower or Learner",
986 context: format!(
987 "Leader node {} receives RaftEvent::InstallSnapshotChunk",
988 ctx.node_id
989 ),
990 }
991 .into());
992 }
993
994 RaftEvent::RaftLogCleanUp(_purchase_log_request, sender) => {
995 sender
996 .send(Err(Status::permission_denied(
997 "Leader should not receive RaftLogCleanUp event.",
998 )))
999 .map_err(|e| {
1000 let error_str = format!("{e:?}");
1001 error!("Failed to send: {}", error_str);
1002 NetworkError::SingalSendFailed(error_str)
1003 })?;
1004
1005 return Err(ConsensusError::RoleViolation {
1006 current_role: "Leader",
1007 required_role: "None Leader",
1008 context: format!(
1009 "Leader node {} receives RaftEvent::RaftLogCleanUp",
1010 ctx.node_id
1011 ),
1012 }
1013 .into());
1014 }
1015
1016 RaftEvent::CreateSnapshotEvent => {
1017 if self.snapshot_in_progress.load(std::sync::atomic::Ordering::Acquire) {
1019 info!("Snapshot creation already in progress. Skipping duplicate request.");
1020 return Ok(());
1021 }
1022
1023 self.snapshot_in_progress.store(true, std::sync::atomic::Ordering::Release);
1024 let state_machine_handler = ctx.state_machine_handler().clone();
1025
1026 tokio::spawn(async move {
1028 let result = state_machine_handler.create_snapshot().await;
1029 info!("SnapshotCreated event will be processed in another event thread");
1030 if let Err(e) =
1031 send_replay_raft_event(&role_tx, RaftEvent::SnapshotCreated(result))
1032 {
1033 error!("Failed to send snapshot creation result: {}", e);
1034 }
1035 });
1036 }
1037
1038 RaftEvent::SnapshotCreated(result) => {
1039 self.snapshot_in_progress.store(false, Ordering::SeqCst);
1040 let my_id = self.shared_state.node_id;
1041 let my_term = self.current_term();
1042
1043 match result {
1044 Err(e) => {
1045 error!(%e, "State machine snapshot creation failed");
1046 }
1047 Ok((
1048 SnapshotMetadata {
1049 last_included: last_included_option,
1050 checksum,
1051 },
1052 _final_path,
1053 )) => {
1054 info!("Initiating log purge after snapshot creation");
1055
1056 if let Some(last_included) = last_included_option {
1057 trace!("Phase 1: Schedule log purge if possible");
1061 if self.can_purge_logs(self.last_purged_index, last_included) {
1062 trace!(?last_included, "Phase 1: Scheduling log purge");
1063 self.scheduled_purge_upto(last_included);
1064 }
1065
1066 trace!("Phase 2.1: Pre-Checks before sending Purge request");
1070 let membership = ctx.membership();
1071 let members = membership.voters().await;
1072 if members.is_empty() {
1073 warn!("no peer found for leader({})", my_id);
1074 return Err(MembershipError::NoPeersAvailable.into());
1075 }
1076
1077 trace!("Phase 2.2: Send Purge request to the other nodes");
1081 let transport = ctx.transport();
1082 match transport
1083 .send_purge_requests(
1084 PurgeLogRequest {
1085 term: my_term,
1086 leader_id: my_id,
1087 last_included: Some(last_included),
1088 snapshot_checksum: checksum.clone(),
1089 leader_commit: self.commit_index(),
1090 },
1091 &self.node_config.retry,
1092 membership,
1093 )
1094 .await
1095 {
1096 Ok(result) => {
1097 info!(?result, "receive PurgeLogResult");
1098
1099 self.peer_purge_progress(result, &role_tx)?;
1100 }
1101 Err(e) => {
1102 error!(?e, "RaftEvent::CreateSnapshotEvent");
1103 return Err(e);
1104 }
1105 }
1106
1107 trace!("Phase 3: Execute scheduled purge task");
1111 debug!(?last_included, "Execute scheduled purge task");
1112 if let Some(scheduled) = self.scheduled_purge_upto {
1113 let purge_executor = ctx.purge_executor();
1114 match purge_executor.execute_purge(scheduled).await {
1117 Ok(_) => {
1118 if let Err(e) = send_replay_raft_event(
1119 &role_tx,
1120 RaftEvent::LogPurgeCompleted(scheduled),
1121 ) {
1122 error!(%e, "Failed to notify purge completion");
1123 }
1124 }
1125 Err(e) => {
1126 error!(?e, ?scheduled, "Log purge execution failed");
1127 }
1128 }
1129 }
1130 }
1131 }
1132 }
1133 }
1134
1135 RaftEvent::LogPurgeCompleted(purged_id) => {
1136 if self.last_purged_index.map_or(true, |current| purged_id.index > current.index) {
1138 debug!(
1139 ?purged_id,
1140 "Updating last purged index after successful execution"
1141 );
1142 self.last_purged_index = Some(purged_id);
1143 } else {
1144 warn!(
1145 ?purged_id,
1146 ?self.last_purged_index,
1147 "Received outdated purge completion, ignoring"
1148 );
1149 }
1150 }
1151
1152 RaftEvent::JoinCluster(join_request, sender) => {
1153 debug!(?join_request, "Leader::RaftEvent::JoinCluster");
1154 self.handle_join_cluster(join_request, sender, ctx, &role_tx).await?;
1155 }
1156
1157 RaftEvent::DiscoverLeader(request, sender) => {
1158 debug!(?request, "Leader::RaftEvent::DiscoverLeader");
1159
1160 if let Some(meta) = ctx.membership().retrieve_node_meta(my_id).await {
1161 let response = LeaderDiscoveryResponse {
1162 leader_id: my_id,
1163 leader_address: meta.address,
1164 term: my_term,
1165 };
1166 sender.send(Ok(response)).map_err(|e| {
1167 let error_str = format!("{e:?}");
1168 error!("Failed to send: {}", error_str);
1169 NetworkError::SingalSendFailed(error_str)
1170 })?;
1171 return Ok(());
1172 } else {
1173 let msg = "Leader can not find its address? It must be a bug.";
1174 error!("{}", msg);
1175 panic!("{}", msg);
1176 }
1177 }
1178 RaftEvent::StreamSnapshot(request, sender) => {
1179 debug!("Leader::RaftEvent::StreamSnapshot");
1180
1181 if let Some(metadata) = ctx.state_machine().snapshot_metadata() {
1183 let (response_tx, response_rx) =
1185 mpsc::channel::<std::result::Result<Arc<SnapshotChunk>, Status>>(32);
1186 let size = 1024 * 1024 * 1024; let response_stream = create_production_snapshot_stream(response_rx, size);
1189 sender.send(Ok(response_stream)).map_err(|e| {
1191 let error_str = format!("{e:?}");
1192 error!("Stream response failed: {}", error_str);
1193 NetworkError::SingalSendFailed(error_str)
1194 })?;
1195
1196 let state_machine_handler = ctx.state_machine_handler().clone();
1198 let config = ctx.node_config.raft.snapshot.clone();
1199 let data_stream =
1201 state_machine_handler.load_snapshot_data(metadata.clone()).await?;
1202
1203 tokio::spawn(async move {
1204 if let Err(e) = BackgroundSnapshotTransfer::<T>::run_pull_transfer(
1205 request,
1206 response_tx,
1207 data_stream,
1208 config,
1209 )
1210 .await
1211 {
1212 error!("StreamSnapshot failed: {:?}", e);
1213 }
1214 });
1215 } else {
1216 warn!("No snapshot available for streaming");
1217 sender.send(Err(Status::not_found("Snapshot not found"))).map_err(|e| {
1218 let error_str = format!("{e:?}");
1219 error!("Stream response failed: {}", error_str);
1220 NetworkError::SingalSendFailed(error_str)
1221 })?;
1222 }
1223 }
1224 RaftEvent::TriggerSnapshotPush { peer_id } => {
1225 if let Some(lastest_snapshot_metadata) = ctx.state_machine().snapshot_metadata() {
1226 Self::trigger_background_snapshot(
1227 peer_id,
1228 lastest_snapshot_metadata,
1229 ctx.state_machine_handler().clone(),
1230 ctx.membership(),
1231 ctx.node_config.raft.snapshot.clone(),
1232 )
1233 .await?;
1234 }
1235 }
1236
1237 RaftEvent::PromoteReadyLearners => {
1238 info!(
1240 "[Leader {}] โก PromoteReadyLearners event received, pending_promotions: {:?}",
1241 self.node_id(),
1242 self.pending_promotions.iter().map(|p| p.node_id).collect::<Vec<_>>()
1243 );
1244 self.process_pending_promotions(ctx, &role_tx).await?;
1245 }
1246
1247 RaftEvent::MembershipApplied => {
1248 let old_replication_targets = self.cluster_metadata.replication_targets.clone();
1250
1251 debug!("Refreshing cluster metadata cache after membership change");
1253 if let Err(e) = self.update_cluster_metadata(&ctx.membership()).await {
1254 warn!("Failed to update cluster metadata: {:?}", e);
1255 }
1256
1257 let newly_added: Vec<u32> = self
1260 .cluster_metadata
1261 .replication_targets
1262 .iter()
1263 .filter(|new_peer| {
1264 !old_replication_targets.iter().any(|old_peer| old_peer.id == new_peer.id)
1265 })
1266 .map(|peer| peer.id)
1267 .collect();
1268
1269 if !newly_added.is_empty() {
1270 debug!(
1271 "Initializing replication state for {} new peer(s): {:?}",
1272 newly_added.len(),
1273 newly_added
1274 );
1275 let last_entry_id = ctx.raft_log().last_entry_id();
1276 if let Err(e) =
1277 self.init_peers_next_index_and_match_index(last_entry_id, newly_added)
1278 {
1279 warn!("Failed to initialize next_index for new peers: {:?}", e);
1280 }
1283 }
1284 }
1285
1286 RaftEvent::StepDownSelfRemoved => {
1287 warn!(
1290 "[Leader-{}] Removed from cluster membership, stepping down to Follower",
1291 self.node_id()
1292 );
1293 role_tx.send(RoleEvent::BecomeFollower(None)).map_err(|e| {
1294 error!(
1295 "[Leader-{}] Failed to send BecomeFollower after self-removal: {:?}",
1296 self.node_id(),
1297 e
1298 );
1299 NetworkError::SingalSendFailed(format!(
1300 "BecomeFollower after self-removal: {e:?}"
1301 ))
1302 })?;
1303 return Ok(());
1304 }
1305 }
1306
1307 Ok(())
1308 }
1309}
1310
1311impl<T: TypeConfig> LeaderState<T> {
1312 pub async fn update_cluster_metadata(
1315 &mut self,
1316 membership: &Arc<T::M>,
1317 ) -> Result<()> {
1318 let voters = membership.voters().await;
1320 let total_voters = voters.len() + 1; let replication_targets = membership.replication_peers().await;
1324
1325 let single_voter = total_voters == 1;
1327
1328 self.cluster_metadata = ClusterMetadata {
1329 single_voter,
1330 total_voters,
1331 replication_targets: replication_targets.clone(),
1332 };
1333
1334 debug!(
1335 "Updated cluster metadata: single_voter={}, total_voters={}, replication_targets={}",
1336 single_voter,
1337 total_voters,
1338 replication_targets.len()
1339 );
1340 Ok(())
1341 }
1342
1343 pub fn state_snapshot(&self) -> StateSnapshot {
1345 StateSnapshot {
1346 current_term: self.current_term(),
1347 voted_for: None,
1348 commit_index: self.commit_index(),
1349 role: Leader as i32,
1350 }
1351 }
1352
1353 #[tracing::instrument]
1355 pub fn leader_state_snapshot(&self) -> LeaderStateSnapshot {
1356 LeaderStateSnapshot {
1357 next_index: self.next_index.clone(),
1358 match_index: self.match_index.clone(),
1359 noop_log_id: self.noop_log_id,
1360 }
1361 }
1362
1363 pub async fn process_raft_request(
1367 &mut self,
1368 raft_request_with_signal: RaftRequestWithSignal,
1369 ctx: &RaftContext<T>,
1370 execute_now: bool,
1371 role_tx: &mpsc::UnboundedSender<RoleEvent>,
1372 ) -> Result<()> {
1373 debug!(
1374 "Leader::process_raft_request, request_id: {}",
1375 raft_request_with_signal.id
1376 );
1377
1378 let push_result = self.batch_buffer.push(raft_request_with_signal);
1379 if execute_now || push_result.is_some() {
1381 let batch = self.batch_buffer.take();
1382
1383 trace!(
1384 "replication_handler.handle_raft_request_in_batch: batch size:{:?}",
1385 batch.len()
1386 );
1387
1388 self.process_batch(batch, role_tx, ctx).await?;
1389 }
1390
1391 Ok(())
1392 }
1393
1394 pub async fn process_batch(
1431 &mut self,
1432 batch: VecDeque<RaftRequestWithSignal>,
1433 role_tx: &mpsc::UnboundedSender<RoleEvent>,
1434 ctx: &RaftContext<T>,
1435 ) -> Result<()> {
1436 let entry_payloads: Vec<EntryPayload> =
1438 batch.iter().flat_map(|req| &req.payloads).cloned().collect();
1439 if !entry_payloads.is_empty() {
1440 trace!(?entry_payloads, "[Node-{} process_batch..", ctx.node_id);
1441 }
1442
1443 let cluster_size = self.cluster_metadata.total_voters;
1445 trace!(%cluster_size);
1446
1447 let result = ctx
1448 .replication_handler()
1449 .handle_raft_request_in_batch(
1450 entry_payloads,
1451 self.state_snapshot(),
1452 self.leader_state_snapshot(),
1453 &self.cluster_metadata,
1454 ctx,
1455 )
1456 .await;
1457 debug!(?result, "replication_handler::handle_raft_request_in_batch");
1458
1459 match result {
1461 Ok(AppendResults {
1463 commit_quorum_achieved: true,
1464 peer_updates,
1465 learner_progress,
1466 }) => {
1467 self.update_peer_indexes(&peer_updates);
1469
1470 if let Err(e) = self.check_learner_progress(&learner_progress, ctx, role_tx).await {
1472 error!(?e, "check_learner_progress failed");
1473 };
1474
1475 let new_commit_index = if self.cluster_metadata.single_voter {
1479 let last_log_index = ctx.raft_log().last_entry_id();
1480 if last_log_index > self.commit_index() {
1481 Some(last_log_index)
1482 } else {
1483 None
1484 }
1485 } else {
1486 self.calculate_new_commit_index(ctx.raft_log(), &peer_updates)
1487 };
1488
1489 if let Some(new_commit_index) = new_commit_index {
1490 debug!(
1491 "[Leader-{}] New commit been acknowledged: {}",
1492 self.node_id(),
1493 new_commit_index
1494 );
1495 self.update_commit_index_with_signal(
1496 Leader as i32,
1497 self.current_term(),
1498 new_commit_index,
1499 role_tx,
1500 )?;
1501 }
1502
1503 for request in batch {
1505 let _ = request.sender.send(Ok(ClientResponse::write_success()));
1506 }
1507 }
1508
1509 Ok(AppendResults {
1511 commit_quorum_achieved: false,
1512 peer_updates,
1513 learner_progress,
1514 }) => {
1515 self.update_peer_indexes(&peer_updates);
1517
1518 if let Err(e) = self.check_learner_progress(&learner_progress, ctx, role_tx).await {
1520 error!(?e, "check_learner_progress failed");
1521 };
1522
1523 let responses_received = peer_updates.len();
1525 let error_code = if is_majority(responses_received, cluster_size) {
1526 ErrorCode::RetryRequired
1527 } else {
1528 ErrorCode::ProposeFailed
1529 };
1530
1531 for request in batch {
1533 let _ = request.sender.send(Ok(ClientResponse::client_error(error_code)));
1534 }
1535 }
1536
1537 Err(Error::Consensus(ConsensusError::Replication(ReplicationError::HigherTerm(
1539 higher_term,
1540 )))) => {
1541 warn!("Higher term detected: {}", higher_term);
1542 self.update_current_term(higher_term);
1543 self.send_become_follower_event(None, role_tx)?;
1544
1545 for request in batch {
1547 let _ = request
1548 .sender
1549 .send(Ok(ClientResponse::client_error(ErrorCode::TermOutdated)));
1550 }
1551
1552 return Err(ReplicationError::HigherTerm(higher_term).into());
1553 }
1554
1555 Err(e) => {
1557 error!("Batch processing failed: {:?}", e);
1558
1559 for request in batch {
1561 let _ = request
1562 .sender
1563 .send(Ok(ClientResponse::client_error(ErrorCode::ProposeFailed)));
1564 }
1565
1566 return Err(e);
1567 }
1568 }
1569
1570 Ok(())
1571 }
1572
1573 #[instrument(skip(self))]
1575 fn update_peer_indexes(
1576 &mut self,
1577 peer_updates: &HashMap<u32, PeerUpdate>,
1578 ) {
1579 for (peer_id, update) in peer_updates {
1580 if let Err(e) = self.update_next_index(*peer_id, update.next_index) {
1581 error!("Failed to update next index: {:?}", e);
1582 }
1583 trace!(
1584 "Updated next index for peer {}-{}",
1585 peer_id, update.next_index
1586 );
1587 if let Some(match_index) = update.match_index {
1588 if let Err(e) = self.update_match_index(*peer_id, match_index) {
1589 error!("Failed to update match index: {:?}", e);
1590 }
1591 trace!("Updated match index for peer {}-{}", peer_id, match_index);
1592 }
1593 }
1594 }
1595
1596 pub async fn check_learner_progress(
1597 &mut self,
1598 learner_progress: &HashMap<u32, Option<u64>>,
1599 ctx: &RaftContext<T>,
1600 role_tx: &mpsc::UnboundedSender<RoleEvent>,
1601 ) -> Result<()> {
1602 debug!(?learner_progress, "check_learner_progress");
1603
1604 if !self.should_check_learner_progress(ctx) {
1605 return Ok(());
1606 }
1607
1608 if learner_progress.is_empty() {
1609 return Ok(());
1610 }
1611
1612 let ready_learners = self.find_promotable_learners(learner_progress, ctx).await;
1613 let new_promotions = self.deduplicate_promotions(ready_learners);
1614
1615 if !new_promotions.is_empty() {
1616 self.enqueue_and_notify_promotions(new_promotions, role_tx)?;
1617 }
1618
1619 Ok(())
1620 }
1621
1622 fn should_check_learner_progress(
1624 &mut self,
1625 ctx: &RaftContext<T>,
1626 ) -> bool {
1627 let throttle_interval =
1628 Duration::from_millis(ctx.node_config().raft.learner_check_throttle_ms);
1629 if self.last_learner_check.elapsed() < throttle_interval {
1630 return false;
1631 }
1632 self.last_learner_check = Instant::now();
1633 true
1634 }
1635
1636 async fn find_promotable_learners(
1638 &self,
1639 learner_progress: &HashMap<u32, Option<u64>>,
1640 ctx: &RaftContext<T>,
1641 ) -> Vec<u32> {
1642 let leader_commit = self.commit_index();
1643 let threshold = ctx.node_config().raft.learner_catchup_threshold;
1644 let membership = ctx.membership();
1645
1646 let mut ready_learners = Vec::new();
1647
1648 for (&node_id, &match_index_opt) in learner_progress.iter() {
1649 if !membership.contains_node(node_id).await {
1650 continue;
1651 }
1652
1653 if !self.is_learner_caught_up(match_index_opt, leader_commit, threshold) {
1654 continue;
1655 }
1656
1657 let node_status =
1658 membership.get_node_status(node_id).await.unwrap_or(NodeStatus::ReadOnly);
1659 if !node_status.is_promotable() {
1660 debug!(
1661 ?node_id,
1662 ?node_status,
1663 "Learner caught up but status is not Promotable, skipping"
1664 );
1665 continue;
1666 }
1667
1668 debug!(
1669 ?node_id,
1670 match_index = ?match_index_opt.unwrap_or(0),
1671 ?leader_commit,
1672 gap = leader_commit.saturating_sub(match_index_opt.unwrap_or(0)),
1673 "Learner caught up"
1674 );
1675 ready_learners.push(node_id);
1676 }
1677
1678 ready_learners
1679 }
1680
1681 fn is_learner_caught_up(
1683 &self,
1684 match_index: Option<u64>,
1685 leader_commit: u64,
1686 threshold: u64,
1687 ) -> bool {
1688 let match_index = match_index.unwrap_or(0);
1689 let gap = leader_commit.saturating_sub(match_index);
1690 gap <= threshold
1691 }
1692
1693 fn deduplicate_promotions(
1695 &self,
1696 ready_learners: Vec<u32>,
1697 ) -> Vec<u32> {
1698 let already_pending: std::collections::HashSet<_> =
1699 self.pending_promotions.iter().map(|p| p.node_id).collect();
1700
1701 ready_learners.into_iter().filter(|id| !already_pending.contains(id)).collect()
1702 }
1703
1704 fn enqueue_and_notify_promotions(
1706 &mut self,
1707 new_promotions: Vec<u32>,
1708 role_tx: &mpsc::UnboundedSender<RoleEvent>,
1709 ) -> Result<()> {
1710 info!(
1711 ?new_promotions,
1712 "Learners caught up, adding to pending promotions"
1713 );
1714
1715 for node_id in new_promotions {
1716 self.pending_promotions
1717 .push_back(PendingPromotion::new(node_id, Instant::now()));
1718 }
1719
1720 role_tx
1721 .send(RoleEvent::ReprocessEvent(Box::new(
1722 RaftEvent::PromoteReadyLearners,
1723 )))
1724 .map_err(|e| {
1725 let error_str = format!("{e:?}");
1726 error!("Failed to send PromoteReadyLearners: {}", error_str);
1727 Error::System(SystemError::Network(NetworkError::SingalSendFailed(
1728 error_str,
1729 )))
1730 })?;
1731
1732 Ok(())
1733 }
1734
1735 #[allow(dead_code)]
1736 pub async fn batch_promote_learners(
1737 &mut self,
1738 ready_learners_ids: Vec<u32>,
1739 ctx: &RaftContext<T>,
1740 role_tx: &mpsc::UnboundedSender<RoleEvent>,
1741 ) -> Result<()> {
1742 debug!("1. Determine optimal promotion status based on quorum safety");
1744 let membership = ctx.membership();
1745 let current_voters = membership.voters().await.len();
1746 let new_active_count = current_voters + ready_learners_ids.len();
1748
1749 trace!(
1751 ?current_voters,
1752 ?ready_learners_ids,
1753 "[Node-{}] new_active_count: {}",
1754 self.node_id(),
1755 new_active_count
1756 );
1757 let target_status = if ensure_safe_join(self.node_id(), new_active_count).is_ok() {
1758 trace!(
1759 "Going to update nodes-{:?} status to Active",
1760 ready_learners_ids
1761 );
1762 NodeStatus::Active
1763 } else {
1764 trace!(
1765 "Not enough quorum to promote learners: {:?}",
1766 ready_learners_ids
1767 );
1768 return Ok(());
1769 };
1770
1771 debug!("2. Create configuration change payload");
1773 let config_change = Change::BatchPromote(BatchPromote {
1774 node_ids: ready_learners_ids.clone(),
1775 new_status: target_status as i32,
1776 });
1777
1778 info!(?config_change, "Replicating cluster config");
1779
1780 debug!("3. Submit single config change for all ready learners");
1782 match self
1783 .verify_leadership_limited_retry(
1784 vec![EntryPayload::config(config_change)],
1785 true,
1786 ctx,
1787 role_tx,
1788 )
1789 .await
1790 {
1791 Ok(true) => {
1792 info!(
1793 "Batch promotion committed for nodes: {:?}",
1794 ready_learners_ids
1795 );
1796 }
1797 Ok(false) => {
1798 warn!("Failed to commit batch promotion");
1799 }
1800 Err(e) => {
1801 error!("Batch promotion error: {:?}", e);
1802 return Err(e);
1803 }
1804 }
1805
1806 Ok(())
1807 }
1808
1809 #[instrument(skip(self))]
1811 fn calculate_new_commit_index(
1812 &mut self,
1813 raft_log: &Arc<ROF<T>>,
1814 peer_updates: &HashMap<u32, PeerUpdate>,
1815 ) -> Option<u64> {
1816 let old_commit_index = self.commit_index();
1817 let current_term = self.current_term();
1818
1819 let matched_ids: Vec<u64> =
1820 peer_updates.keys().filter_map(|&id| self.match_index(id)).collect();
1821
1822 let new_commit_index =
1823 raft_log.calculate_majority_matched_index(current_term, old_commit_index, matched_ids);
1824
1825 if new_commit_index.is_some() && new_commit_index.unwrap() > old_commit_index {
1826 new_commit_index
1827 } else {
1828 None
1829 }
1830 }
1831
1832 #[allow(dead_code)]
1833 fn if_update_commit_index(
1834 &self,
1835 new_commit_index_option: Option<u64>,
1836 ) -> (bool, u64) {
1837 let current_commit_index = self.commit_index();
1838 if let Some(new_commit_index) = new_commit_index_option {
1839 debug!("Leader::update_commit_index: {:?}", new_commit_index);
1840 if current_commit_index < new_commit_index {
1841 return (true, new_commit_index);
1842 }
1843 }
1844 debug!("Leader::update_commit_index: false");
1845 (false, current_commit_index)
1846 }
1847
1848 pub async fn ensure_state_machine_upto_commit_index(
1849 &self,
1850 state_machine_handler: &Arc<SMHOF<T>>,
1851 last_applied: u64,
1852 ) -> Result<()> {
1853 let commit_index = self.commit_index();
1854
1855 debug!(
1856 "ensure_state_machine_upto_commit_index: last_applied:{} < commit_index:{} ?",
1857 last_applied, commit_index
1858 );
1859 if last_applied < commit_index {
1860 state_machine_handler.update_pending(commit_index);
1861
1862 let timeout_ms = self.node_config.raft.read_consistency.state_machine_sync_timeout_ms;
1865 state_machine_handler
1866 .wait_applied(commit_index, std::time::Duration::from_millis(timeout_ms))
1867 .await?;
1868
1869 debug!("ensure_state_machine_upto_commit_index success");
1870 }
1871 Ok(())
1872 }
1873
1874 #[instrument(skip(self))]
1875 fn scheduled_purge_upto(
1876 &mut self,
1877 received_last_included: LogId,
1878 ) {
1879 if let Some(existing) = self.scheduled_purge_upto {
1880 if existing.index >= received_last_included.index {
1881 warn!(
1882 ?received_last_included,
1883 ?existing,
1884 "Will not update scheduled_purge_upto, received invalid last_included log"
1885 );
1886 return;
1887 }
1888 }
1889 info!(?self.scheduled_purge_upto, ?received_last_included, "Updte scheduled_purge_upto.");
1890 self.scheduled_purge_upto = Some(received_last_included);
1891 }
1892
1893 fn peer_purge_progress(
1894 &mut self,
1895 responses: Vec<Result<PurgeLogResponse>>,
1896 role_tx: &mpsc::UnboundedSender<RoleEvent>,
1897 ) -> Result<()> {
1898 if responses.is_empty() {
1899 return Ok(());
1900 }
1901 for r in responses.iter().flatten() {
1902 if r.term > self.current_term() {
1903 self.update_current_term(r.term);
1904 self.send_become_follower_event(None, role_tx)?;
1905 }
1906
1907 if let Some(last_purged) = r.last_purged {
1908 self.peer_purge_progress
1909 .entry(r.node_id)
1910 .and_modify(|v| *v = last_purged.index)
1911 .or_insert(last_purged.index);
1912 }
1913 }
1914
1915 Ok(())
1916 }
1917
1918 fn send_become_follower_event(
1919 &self,
1920 new_leader_id: Option<u32>,
1921 role_tx: &mpsc::UnboundedSender<RoleEvent>,
1922 ) -> Result<()> {
1923 info!(
1924 ?new_leader_id,
1925 "Leader is going to step down as Follower..."
1926 );
1927 role_tx.send(RoleEvent::BecomeFollower(new_leader_id)).map_err(|e| {
1928 let error_str = format!("{e:?}");
1929 error!("Failed to send: {}", error_str);
1930 NetworkError::SingalSendFailed(error_str)
1931 })?;
1932
1933 Ok(())
1934 }
1935
1936 #[instrument(skip(self))]
1967 pub fn can_purge_logs(
1968 &self,
1969 last_purge_index: Option<LogId>,
1970 last_included_in_snapshot: LogId,
1971 ) -> bool {
1972 let commit_index = self.commit_index();
1973 debug!(?self
1974 .peer_purge_progress, ?commit_index, ?last_purge_index, ?last_included_in_snapshot, "can_purge_logs");
1975 let monotonic_check = last_purge_index
1976 .map(|lid| lid.index < last_included_in_snapshot.index)
1977 .unwrap_or(true);
1978
1979 last_included_in_snapshot.index < commit_index
1980 && monotonic_check
1981 && self.peer_purge_progress.values().all(|&v| v >= last_included_in_snapshot.index)
1982 }
1983
1984 pub async fn handle_join_cluster(
1985 &mut self,
1986 join_request: JoinRequest,
1987 sender: MaybeCloneOneshotSender<std::result::Result<JoinResponse, Status>>,
1988 ctx: &RaftContext<T>,
1989 role_tx: &mpsc::UnboundedSender<RoleEvent>,
1990 ) -> Result<()> {
1991 let node_id = join_request.node_id;
1992 let node_role = join_request.node_role;
1993 let address = join_request.address;
1994 let status = join_request.status;
1995 let membership = ctx.membership();
1996
1997 debug!("1. Validate join request");
1999 if membership.contains_node(node_id).await {
2000 let error_msg = format!("Node {node_id} already exists in cluster");
2001 warn!(%error_msg);
2002 return self.send_join_error(sender, MembershipError::NodeAlreadyExists(node_id)).await;
2003 }
2004
2005 debug!("2. Create configuration change payload");
2007 if let Err(e) = membership.can_rejoin(node_id, node_role).await {
2008 let error_msg = format!("Node {node_id} cannot rejoin: {e}",);
2009 warn!(%error_msg);
2010 return self
2011 .send_join_error(sender, MembershipError::JoinClusterError(error_msg))
2012 .await;
2013 }
2014
2015 let config_change = Change::AddNode(AddNode {
2016 node_id,
2017 address: address.clone(),
2018 status,
2019 });
2020
2021 debug!("3. Wait for quorum confirmation");
2023 match self
2024 .verify_leadership_limited_retry(
2025 vec![EntryPayload::config(config_change)],
2026 true,
2027 ctx,
2028 role_tx,
2029 )
2030 .await
2031 {
2032 Ok(true) => {
2033 debug!("4. Update node status to Syncing");
2035
2036 debug!(
2037 "After updating, the replications peers: {:?}",
2038 ctx.membership().replication_peers().await
2039 );
2040
2041 debug!("5. Send successful response");
2046 info!("Join config committed for node {}", node_id);
2047 self.send_join_success(node_id, &address, sender, ctx).await?;
2048 }
2049 Ok(false) => {
2050 warn!("Failed to commit join config for node {}", node_id);
2051 self.send_join_error(sender, MembershipError::CommitTimeout).await?
2052 }
2053 Err(e) => {
2054 error!("Error waiting for commit: {:?}", e);
2055 self.send_join_error(sender, e).await?
2056 }
2057 }
2058 Ok(())
2059 }
2060
2061 async fn send_join_success(
2062 &self,
2063 node_id: u32,
2064 address: &str,
2065 sender: MaybeCloneOneshotSender<std::result::Result<JoinResponse, Status>>,
2066 ctx: &RaftContext<T>,
2067 ) -> Result<()> {
2068 let snapshot_metadata = ctx.state_machine_handler().get_latest_snapshot_metadata();
2070
2071 let response = JoinResponse {
2073 success: true,
2074 error: String::new(),
2075 config: Some(
2076 ctx.membership()
2077 .retrieve_cluster_membership_config(self.shared_state().current_leader())
2078 .await,
2079 ),
2080 config_version: ctx.membership().get_cluster_conf_version().await,
2081 snapshot_metadata,
2082 leader_id: self.node_id(),
2083 };
2084
2085 sender.send(Ok(response)).map_err(|e| {
2086 error!("Failed to send join response: {:?}", e);
2087 NetworkError::SingalSendFailed(format!("{e:?}"))
2088 })?;
2089
2090 info!(
2091 "Node {} ({}) successfully added as learner",
2092 node_id, address
2093 );
2094
2095 crate::utils::cluster_printer::print_leader_accepting_new_node(
2097 self.node_id(),
2098 node_id,
2099 address,
2100 d_engine_proto::common::NodeRole::Learner as i32,
2101 );
2102
2103 Ok(())
2104 }
2105
2106 async fn send_join_error(
2107 &self,
2108 sender: MaybeCloneOneshotSender<std::result::Result<JoinResponse, Status>>,
2109 error: impl Into<Error>,
2110 ) -> Result<()> {
2111 let error = error.into();
2112 let status = Status::failed_precondition(error.to_string());
2113
2114 sender.send(Err(status)).map_err(|e| {
2115 error!("Failed to send join error: {:?}", e);
2116 NetworkError::SingalSendFailed(format!("{e:?}"))
2117 })?;
2118
2119 Err(error)
2120 }
2121
2122 #[cfg(any(test, feature = "test-utils"))]
2123 pub fn new(
2124 node_id: u32,
2125 node_config: Arc<RaftNodeConfig>,
2126 ) -> Self {
2127 let ReplicationConfig {
2128 rpc_append_entries_in_batch_threshold,
2129 rpc_append_entries_batch_process_delay_in_ms,
2130 rpc_append_entries_clock_in_ms,
2131 ..
2132 } = node_config.raft.replication;
2133
2134 LeaderState {
2135 cluster_metadata: ClusterMetadata {
2136 single_voter: false,
2137 total_voters: 0,
2138 replication_targets: vec![],
2139 },
2140 shared_state: SharedState::new(node_id, None, None),
2141 timer: Box::new(ReplicationTimer::new(
2142 rpc_append_entries_clock_in_ms,
2143 rpc_append_entries_batch_process_delay_in_ms,
2144 )),
2145 next_index: HashMap::new(),
2146 match_index: HashMap::new(),
2147 noop_log_id: None,
2148
2149 batch_buffer: Box::new(BatchBuffer::new(
2150 rpc_append_entries_in_batch_threshold,
2151 Duration::from_millis(rpc_append_entries_batch_process_delay_in_ms),
2152 )),
2153
2154 node_config,
2155 scheduled_purge_upto: None,
2156 last_purged_index: None, last_learner_check: Instant::now(),
2158 peer_purge_progress: HashMap::new(),
2159 snapshot_in_progress: AtomicBool::new(false),
2160 next_membership_maintenance_check: Instant::now(),
2161 pending_promotions: VecDeque::new(),
2162 _marker: PhantomData,
2163 lease_timestamp: AtomicU64::new(0),
2164 }
2165 }
2166
2167 pub async fn trigger_background_snapshot(
2168 node_id: u32,
2169 metadata: SnapshotMetadata,
2170 state_machine_handler: Arc<SMHOF<T>>,
2171 membership: Arc<MOF<T>>,
2172 config: SnapshotConfig,
2173 ) -> Result<()> {
2174 let (result_tx, result_rx) = oneshot::channel();
2175
2176 tokio::task::spawn_blocking(move || {
2178 let rt = tokio::runtime::Handle::current();
2179 let result = rt.block_on(async move {
2180 let bulk_channel = membership
2181 .get_peer_channel(node_id, ConnectionType::Bulk)
2182 .await
2183 .ok_or(NetworkError::PeerConnectionNotFound(node_id))?;
2184
2185 let data_stream =
2186 state_machine_handler.load_snapshot_data(metadata.clone()).await?;
2187
2188 BackgroundSnapshotTransfer::<T>::run_push_transfer(
2189 node_id,
2190 data_stream,
2191 bulk_channel,
2192 config,
2193 )
2194 .await
2195 });
2196
2197 let _ = result_tx.send(result);
2199 });
2200
2201 tokio::spawn(async move {
2203 match result_rx.await {
2204 Ok(Ok(_)) => info!("Snapshot to {} completed", node_id),
2205 Ok(Err(e)) => error!("Snapshot to {} failed: {:?}", node_id, e),
2206 Err(_) => warn!("Snapshot result channel closed unexpectedly"),
2207 }
2208 });
2209
2210 Ok(())
2211 }
2212
2213 pub async fn process_pending_promotions(
2215 &mut self,
2216 ctx: &RaftContext<T>,
2217 role_tx: &mpsc::UnboundedSender<RoleEvent>,
2218 ) -> Result<()> {
2219 debug!(
2220 "[Leader {}] ๐ process_pending_promotions called, pending: {:?}",
2221 self.node_id(),
2222 self.pending_promotions.iter().map(|p| p.node_id).collect::<Vec<_>>()
2223 );
2224
2225 let config = &ctx.node_config().raft.membership.promotion;
2227
2228 let now = Instant::now();
2230 self.pending_promotions.retain(|entry| {
2231 now.duration_since(entry.ready_since) <= config.stale_learner_threshold
2232 });
2233
2234 if self.pending_promotions.is_empty() {
2235 debug!(
2236 "[Leader {}] โ pending_promotions is empty after stale cleanup",
2237 self.node_id()
2238 );
2239 return Ok(());
2240 }
2241
2242 let membership = ctx.membership();
2244 let current_voters = membership.voters().await.len() + 1; debug!(
2246 "[Leader {}] ๐ current_voters: {}, pending: {}",
2247 self.node_id(),
2248 current_voters,
2249 self.pending_promotions.len()
2250 );
2251
2252 let max_batch_size =
2254 calculate_safe_batch_size(current_voters, self.pending_promotions.len());
2255 debug!(
2256 "[Leader {}] ๐ฏ max_batch_size: {}",
2257 self.node_id(),
2258 max_batch_size
2259 );
2260
2261 if max_batch_size == 0 {
2262 debug!(
2264 "[Leader {}] โ ๏ธ max_batch_size is 0, cannot promote now",
2265 self.node_id()
2266 );
2267 return Ok(());
2268 }
2269
2270 let promotion_entries = self.drain_batch(max_batch_size);
2272 let promotion_node_ids = promotion_entries.iter().map(|e| e.node_id).collect::<Vec<_>>();
2273
2274 if !promotion_node_ids.is_empty() {
2276 info!(
2278 "Promoting learner batch of {} nodes: {:?} (total voters: {} -> {})",
2279 promotion_node_ids.len(),
2280 promotion_node_ids,
2281 current_voters,
2282 current_voters + promotion_node_ids.len()
2283 );
2284
2285 let result = self.safe_batch_promote(promotion_node_ids.clone(), ctx, role_tx).await;
2287
2288 if let Err(e) = result {
2289 for entry in promotion_entries.into_iter().rev() {
2291 self.pending_promotions.push_front(entry);
2292 }
2293 return Err(e);
2294 }
2295
2296 info!(
2297 "Promotion successful. Cluster members: {:?}",
2298 membership.voters().await
2299 );
2300 }
2301
2302 trace!(
2303 ?self.pending_promotions,
2304 "Step 6: Reschedule if any pending promotions remain"
2305 );
2306 if !self.pending_promotions.is_empty() {
2308 debug!(
2309 "[Leader {}] ๐ Re-sending PromoteReadyLearners for remaining pending: {:?}",
2310 self.node_id(),
2311 self.pending_promotions.iter().map(|p| p.node_id).collect::<Vec<_>>()
2312 );
2313 role_tx
2315 .send(RoleEvent::ReprocessEvent(Box::new(
2316 RaftEvent::PromoteReadyLearners,
2317 )))
2318 .map_err(|e| {
2319 let error_str = format!("{e:?}");
2320 error!("Send PromoteReadyLearners event failed: {}", error_str);
2321 NetworkError::SingalSendFailed(error_str)
2322 })?;
2323 }
2324
2325 Ok(())
2326 }
2327
2328 pub(super) fn drain_batch(
2330 &mut self,
2331 count: usize,
2332 ) -> Vec<PendingPromotion> {
2333 let mut batch = Vec::with_capacity(count);
2334 for _ in 0..count {
2335 if let Some(entry) = self.pending_promotions.pop_front() {
2336 batch.push(entry);
2337 } else {
2338 break;
2339 }
2340 }
2341 batch
2342 }
2343
2344 async fn safe_batch_promote(
2345 &mut self,
2346 batch: Vec<u32>,
2347 ctx: &RaftContext<T>,
2348 role_tx: &mpsc::UnboundedSender<RoleEvent>,
2349 ) -> Result<()> {
2350 let change = Change::BatchPromote(BatchPromote {
2351 node_ids: batch.clone(),
2352 new_status: NodeStatus::Active as i32,
2353 });
2354
2355 self.verify_leadership_limited_retry(
2357 vec![EntryPayload::config(change)],
2358 true,
2359 ctx,
2360 role_tx,
2361 )
2362 .await?;
2363
2364 Ok(())
2365 }
2366
2367 async fn run_periodic_maintenance(
2368 &mut self,
2369 role_tx: &mpsc::UnboundedSender<RoleEvent>,
2370 ctx: &RaftContext<T>,
2371 ) -> Result<()> {
2372 if let Err(e) = self.conditionally_purge_stale_learners(role_tx, ctx).await {
2373 error!("Stale learner purge failed: {}", e);
2374 }
2375
2376 if let Err(e) = self.conditionally_purge_zombie_nodes(role_tx, ctx).await {
2377 error!("Zombie node purge failed: {}", e);
2378 }
2379
2380 self.reset_next_membership_maintenance_check(
2383 ctx.node_config().raft.membership.membership_maintenance_interval,
2384 );
2385 Ok(())
2386 }
2387
2388 pub async fn conditionally_purge_stale_learners(
2392 &mut self,
2393 role_tx: &mpsc::UnboundedSender<RoleEvent>,
2394 ctx: &RaftContext<T>,
2395 ) -> Result<()> {
2396 let config = &ctx.node_config.raft.membership.promotion;
2397
2398 if self.pending_promotions.is_empty()
2400 || self.next_membership_maintenance_check > Instant::now()
2401 {
2402 trace!("Skipping stale learner check");
2403 return Ok(());
2404 }
2405
2406 let now = Instant::now();
2407 let queue_len = self.pending_promotions.len();
2408
2409 let inspect_count = queue_len.min(100).min(1.max(queue_len / 100));
2411 let mut stale_entries = Vec::new();
2412
2413 trace!("Inspecting {} entries", inspect_count);
2414 for _ in 0..inspect_count {
2415 if let Some(entry) = self.pending_promotions.pop_front() {
2416 trace!(
2417 "Inspecting entry: {:?} - {:?} - {:?}",
2418 entry,
2419 now.duration_since(entry.ready_since),
2420 &config.stale_learner_threshold
2421 );
2422 if now.duration_since(entry.ready_since) > config.stale_learner_threshold {
2423 stale_entries.push(entry);
2424 } else {
2425 self.pending_promotions.push_front(entry);
2427 break;
2428 }
2429 } else {
2430 break;
2431 }
2432 }
2433
2434 trace!("Stale learner check completed: {:?}", stale_entries);
2435
2436 for entry in stale_entries {
2438 if let Err(e) = self.handle_stale_learner(entry.node_id, role_tx, ctx).await {
2439 error!("Failed to handle stale learner: {}", e);
2440 }
2441 }
2442
2443 Ok(())
2444 }
2445
2446 async fn conditionally_purge_zombie_nodes(
2448 &mut self,
2449 role_tx: &mpsc::UnboundedSender<RoleEvent>,
2450 ctx: &RaftContext<T>,
2451 ) -> Result<()> {
2452 let membership = ctx.membership();
2454 let zombie_candidates = membership.get_zombie_candidates().await;
2455 let mut nodes_to_remove = Vec::new();
2456
2457 for node_id in zombie_candidates {
2458 if let Some(status) = membership.get_node_status(node_id).await {
2459 if status != NodeStatus::Active {
2460 nodes_to_remove.push(node_id);
2461 }
2462 }
2463 }
2464 if !nodes_to_remove.is_empty() {
2466 let change = Change::BatchRemove(BatchRemove {
2467 node_ids: nodes_to_remove.clone(),
2468 });
2469
2470 info!(
2471 "Proposing batch removal of zombie nodes: {:?}",
2472 nodes_to_remove
2473 );
2474
2475 match self
2477 .verify_leadership_limited_retry(
2478 vec![EntryPayload::config(change)],
2479 false,
2480 ctx,
2481 role_tx,
2482 )
2483 .await
2484 {
2485 Ok(true) => {
2486 info!("Batch removal committed for nodes: {:?}", nodes_to_remove);
2487 }
2488 Ok(false) => {
2489 warn!("Failed to commit batch removal");
2490 }
2491 Err(e) => {
2492 error!("Batch removal error: {:?}", e);
2493 return Err(e);
2494 }
2495 }
2496 }
2497
2498 Ok(())
2499 }
2500
2501 pub fn reset_next_membership_maintenance_check(
2502 &mut self,
2503 membership_maintenance_interval: Duration,
2504 ) {
2505 self.next_membership_maintenance_check = Instant::now() + membership_maintenance_interval;
2506 }
2507
2508 pub async fn handle_stale_learner(
2510 &mut self,
2511 node_id: u32,
2512 role_tx: &mpsc::UnboundedSender<RoleEvent>,
2513 ctx: &RaftContext<T>,
2514 ) -> Result<()> {
2515 warn!(
2517 "Learner {} is stalled, removing from cluster via consensus",
2518 node_id
2519 );
2520
2521 let change = Change::BatchRemove(BatchRemove {
2522 node_ids: vec![node_id],
2523 });
2524
2525 match self
2527 .verify_leadership_limited_retry(vec![EntryPayload::config(change)], true, ctx, role_tx)
2528 .await
2529 {
2530 Ok(true) => {
2531 info!(
2532 "Stalled learner {} successfully removed from cluster",
2533 node_id
2534 );
2535 }
2536 Ok(false) => {
2537 warn!("Failed to commit removal of stalled learner {}", node_id);
2538 }
2539 Err(e) => {
2540 error!("Error removing stalled learner {}: {:?}", node_id, e);
2541 return Err(e);
2542 }
2543 }
2544
2545 Ok(())
2546 }
2547
2548 pub fn is_lease_valid(
2550 &self,
2551 ctx: &RaftContext<T>,
2552 ) -> bool {
2553 let now = std::time::SystemTime::now()
2554 .duration_since(std::time::UNIX_EPOCH)
2555 .unwrap_or_default()
2556 .as_millis() as u64;
2557
2558 let last_confirmed = self.lease_timestamp.load(std::sync::atomic::Ordering::Acquire);
2559 let lease_duration = ctx.node_config().raft.read_consistency.lease_duration_ms;
2560
2561 if now <= last_confirmed {
2562 error!("Clock moved backwards or equal: Now {now}, Last Confirmed {last_confirmed}");
2564 return false;
2565 }
2566 (now - last_confirmed) < lease_duration
2567 }
2568
2569 fn update_lease_timestamp(&self) {
2571 let now = std::time::SystemTime::now()
2572 .duration_since(std::time::UNIX_EPOCH)
2573 .unwrap_or_default()
2574 .as_millis() as u64;
2575
2576 self.lease_timestamp.store(now, std::sync::atomic::Ordering::Release);
2577 }
2578
2579 #[cfg(any(test, feature = "test-utils"))]
2580 pub fn test_update_lease_timestamp(&self) {
2581 self.update_lease_timestamp();
2582 }
2583
2584 #[cfg(any(test, feature = "test-utils"))]
2585 pub fn test_set_lease_timestamp(
2586 &self,
2587 timestamp: u64,
2588 ) {
2589 self.lease_timestamp.store(timestamp, std::sync::atomic::Ordering::Release);
2590 }
2591}
2592
2593impl<T: TypeConfig> From<&CandidateState<T>> for LeaderState<T> {
2594 fn from(candidate: &CandidateState<T>) -> Self {
2595 let ReplicationConfig {
2596 rpc_append_entries_in_batch_threshold,
2597 rpc_append_entries_batch_process_delay_in_ms,
2598 rpc_append_entries_clock_in_ms,
2599 ..
2600 } = candidate.node_config.raft.replication;
2601
2602 let shared_state = candidate.shared_state.clone();
2604 shared_state.set_current_leader(candidate.node_id());
2605
2606 Self {
2607 shared_state,
2608 timer: Box::new(ReplicationTimer::new(
2609 rpc_append_entries_clock_in_ms,
2610 rpc_append_entries_batch_process_delay_in_ms,
2611 )),
2612 next_index: HashMap::new(),
2613 match_index: HashMap::new(),
2614 noop_log_id: None,
2615
2616 batch_buffer: Box::new(BatchBuffer::new(
2617 rpc_append_entries_in_batch_threshold,
2618 Duration::from_millis(rpc_append_entries_batch_process_delay_in_ms),
2619 )),
2620
2621 node_config: candidate.node_config.clone(),
2622
2623 scheduled_purge_upto: None,
2624 last_purged_index: candidate.last_purged_index,
2625 last_learner_check: Instant::now(),
2626 snapshot_in_progress: AtomicBool::new(false),
2627 peer_purge_progress: HashMap::new(),
2628 next_membership_maintenance_check: Instant::now(),
2629 pending_promotions: VecDeque::new(),
2630 cluster_metadata: ClusterMetadata {
2631 single_voter: false,
2632 total_voters: 0,
2633 replication_targets: vec![],
2634 },
2635
2636 _marker: PhantomData,
2637 lease_timestamp: AtomicU64::new(0),
2638 }
2639 }
2640}
2641
2642impl<T: TypeConfig> Debug for LeaderState<T> {
2643 fn fmt(
2644 &self,
2645 f: &mut std::fmt::Formatter<'_>,
2646 ) -> std::fmt::Result {
2647 f.debug_struct("LeaderState")
2648 .field("shared_state", &self.shared_state)
2649 .field("next_index", &self.next_index)
2650 .field("match_index", &self.match_index)
2651 .field("noop_log_id", &self.noop_log_id)
2652 .finish()
2653 }
2654}
2655
2656pub fn calculate_safe_batch_size(
2664 current: usize,
2665 available: usize,
2666) -> usize {
2667 if (current + available) % 2 == 1 {
2668 available
2670 } else {
2671 available.saturating_sub(1)
2674 }
2675}
2676
2677pub(super) fn send_replay_raft_event(
2678 role_tx: &mpsc::UnboundedSender<RoleEvent>,
2679 raft_event: RaftEvent,
2680) -> Result<()> {
2681 role_tx.send(RoleEvent::ReprocessEvent(Box::new(raft_event))).map_err(|e| {
2682 let error_str = format!("{e:?}");
2683 error!("Failed to send: {}", error_str);
2684 NetworkError::SingalSendFailed(error_str).into()
2685 })
2686}