1pub use d_engine_proto::common::LeaderInfo;
3use d_engine_proto::server::election::VotedFor;
4use tokio::sync::mpsc;
5use tokio::sync::watch;
6use tokio::time::sleep_until;
7use tracing::debug;
8use tracing::error;
9use tracing::info;
10use tracing::trace;
11use tracing::warn;
12
13use super::NewCommitData;
14use super::RaftContext;
15use super::RaftCoreHandlers;
16use super::RaftEvent;
17use super::RaftRole;
18use super::RaftStorageHandles;
19use super::RoleEvent;
20#[cfg(test)]
21use super::raft_event_to_test_event;
22use crate::Membership;
23use crate::NetworkError;
24use crate::RaftLog;
25use crate::RaftNodeConfig;
26use crate::Result;
27use crate::TypeConfig;
28use crate::alias::MOF;
29use crate::alias::TROF;
30use std::sync::Arc;
31
32pub struct Raft<T>
33where
34 T: TypeConfig,
35{
36 pub node_id: u32,
37 pub role: RaftRole<T>,
38 pub ctx: RaftContext<T>,
39
40 event_tx: mpsc::Sender<RaftEvent>,
42 event_rx: mpsc::Receiver<RaftEvent>,
43
44 cmd_tx: mpsc::Sender<super::ClientCmd>,
46 cmd_rx: mpsc::Receiver<super::ClientCmd>,
47
48 role_tx: mpsc::UnboundedSender<RoleEvent>,
50 role_rx: mpsc::UnboundedReceiver<RoleEvent>,
51
52 new_commit_listener: Vec<mpsc::UnboundedSender<NewCommitData>>,
54
55 leader_change_listener: Option<watch::Sender<Option<LeaderInfo>>>,
58
59 shutdown_signal: watch::Receiver<()>,
61
62 #[cfg(test)]
64 test_role_transition_listener: Vec<mpsc::UnboundedSender<i32>>,
65
66 #[cfg(test)]
67 test_raft_event_listener: Vec<mpsc::UnboundedSender<super::TestEvent>>,
68}
69
70pub struct SignalParams {
71 pub(crate) role_tx: mpsc::UnboundedSender<RoleEvent>,
72 pub(crate) role_rx: mpsc::UnboundedReceiver<RoleEvent>,
73 pub(crate) event_tx: mpsc::Sender<RaftEvent>,
74 pub(crate) event_rx: mpsc::Receiver<RaftEvent>,
75 pub(crate) cmd_tx: mpsc::Sender<super::ClientCmd>,
76 pub(crate) cmd_rx: mpsc::Receiver<super::ClientCmd>,
77 pub(crate) shutdown_signal: watch::Receiver<()>,
78}
79
80impl SignalParams {
81 pub fn new(
86 role_tx: mpsc::UnboundedSender<RoleEvent>,
87 role_rx: mpsc::UnboundedReceiver<RoleEvent>,
88 event_tx: mpsc::Sender<RaftEvent>,
89 event_rx: mpsc::Receiver<RaftEvent>,
90 cmd_tx: mpsc::Sender<super::ClientCmd>,
91 cmd_rx: mpsc::Receiver<super::ClientCmd>,
92 shutdown_signal: watch::Receiver<()>,
93 ) -> Self {
94 Self {
95 role_tx,
96 role_rx,
97 event_tx,
98 event_rx,
99 cmd_tx,
100 cmd_rx,
101 shutdown_signal,
102 }
103 }
104}
105
106impl<T> Raft<T>
107where
108 T: TypeConfig,
109{
110 #[allow(clippy::too_many_arguments)]
111 pub fn new(
112 node_id: u32,
113 role: RaftRole<T>,
114 storage: RaftStorageHandles<T>,
115 transport: TROF<T>,
116 handlers: RaftCoreHandlers<T>,
117 membership: Arc<MOF<T>>,
118 signal_params: SignalParams,
119 node_config: Arc<RaftNodeConfig>,
120 ) -> Self {
121 let ctx = Self::build_context(
122 node_id,
123 storage,
124 transport,
125 membership,
126 handlers,
127 node_config.clone(),
128 );
129
130 Raft {
131 node_id,
132 ctx,
133 role,
134
135 event_tx: signal_params.event_tx,
136 event_rx: signal_params.event_rx,
137
138 cmd_tx: signal_params.cmd_tx,
139 cmd_rx: signal_params.cmd_rx,
140
141 role_tx: signal_params.role_tx,
142 role_rx: signal_params.role_rx,
143
144 new_commit_listener: Vec::new(),
145
146 shutdown_signal: signal_params.shutdown_signal,
147
148 leader_change_listener: None,
149
150 #[cfg(test)]
151 test_role_transition_listener: Vec::new(),
152
153 #[cfg(test)]
154 test_raft_event_listener: Vec::new(),
155 }
156 }
157
158 pub fn register_leader_change_listener(
167 &mut self,
168 tx: watch::Sender<Option<LeaderInfo>>,
169 ) {
170 self.leader_change_listener = Some(tx);
171 }
172
173 fn notify_leader_change(
178 &self,
179 leader_id: Option<u32>,
180 term: u64,
181 ) {
182 if let Some(tx) = &self.leader_change_listener {
183 tx.send_if_modified(|current| {
184 let new_info = leader_id.map(|id| LeaderInfo {
185 leader_id: id,
186 term,
187 });
188 if *current != new_info {
189 *current = new_info;
190 true
191 } else {
192 false
193 }
194 });
195 }
196 }
197
198 fn build_context(
199 id: u32,
200 storage: RaftStorageHandles<T>,
201 transport: TROF<T>,
202 membership: Arc<MOF<T>>,
203 handlers: RaftCoreHandlers<T>,
204 node_config: Arc<RaftNodeConfig>,
205 ) -> RaftContext<T> {
206 RaftContext {
207 node_id: id,
208 storage,
209 transport: Arc::new(transport),
210 membership,
211 handlers,
212
213 node_config,
214 }
215 }
216
217 pub async fn join_cluster(&self) -> Result<()> {
218 self.role.join_cluster(&self.ctx).await
219 }
220
221 pub async fn run(&mut self) -> Result<()> {
222 if self.ctx.node_config.is_learner() {
224 info!(
225 "Node({}) is learner and needs to fetch initial snapshot.",
226 self.node_id
227 );
228 if let Err(e) = self.role.fetch_initial_snapshot(&self.ctx).await {
229 warn!(
230 "Initial snapshot failed: {:?}.
231 ================================================
232 Leader has not generate snapshot yet. New node
233 will sync with Leader via append entries requests.
234 ================================================
235 ",
236 e
237 );
238 println!(
239 "
240 ================================================
241 Leader has not generate snapshot yet. New node
242 will sync with Leader via append entries requests
243 ================================================
244 "
245 );
246 }
247 }
248
249 info!("Node is running");
250
251 if self.role.is_timer_expired() {
252 self.role.reset_timer();
253 }
254
255 loop {
256 let tick = sleep_until(self.role.next_deadline());
258
259 tokio::select! {
260 biased;
262 _ = self.shutdown_signal.changed() => {
264 info!("[Raft:{}] shutdown signal received.", self.node_id);
265 self.ctx.storage.raft_log.close().await;
268 self.event_rx.close();
272 return Ok(());
273 }
274 _ = tick => {
276
277 trace!("receive tick");
278 let role_tx = &self.role_tx;
279 let event_tx = &self.event_tx;
280
281 if let Err(e) = self.role.tick(role_tx, event_tx, &self.ctx).await {
282 error!("tick failed: {:?}", e);
283 } else {
284 trace!("tick success");
285 }
286 }
287
288 Some(role_event) = self.role_rx.recv() => {
290 debug!(%self.node_id, ?role_event, "receive role event");
291
292 if let Err(e) = self.handle_role_event(role_event).await {
293 error!(%self.node_id, ?e, "handle_role_event error");
294 }
295 }
296
297 Some(first_cmd) = self.cmd_rx.recv() => {
299 trace!(%self.node_id, "receive first client command");
300 self.role.push_client_cmd(first_cmd, &self.ctx);
301 }
302
303 Some(raft_event) = self.event_rx.recv() => {
305 trace!(%self.node_id, ?raft_event, "receive raft event");
306
307 #[cfg(test)]
308 let event = raft_event_to_test_event(&raft_event);
309
310 if let Err(e) = self.role.handle_raft_event(raft_event, &self.ctx, self.role_tx.clone()).await {
311 if e.is_fatal() {
312 error!(%self.node_id, ?e, "Fatal error in handle_raft_event, shutting down");
313 return Err(e);
314 }
315 warn!(%self.node_id, ?e, "Non-fatal error in handle_raft_event, continuing");
316 }
317
318 #[cfg(test)]
319 self.notify_raft_event(event);
320 }
321
322 }
323
324 tokio::task::yield_now().await;
329 self.drain_role_events().await?;
330 tokio::task::yield_now().await;
333 self.drain_client_cmds().await?;
334 self.drain_raft_events().await?;
335 }
336 }
337
338 async fn drain_role_events(&mut self) -> Result<()> {
340 let max = self.ctx.node_config.raft.batching.max_batch_size;
341 let mut count = 0;
342 while count < max {
343 match self.role_rx.try_recv() {
344 Ok(role_event) => {
345 if let Err(e) = self.handle_role_event(role_event).await {
346 error!(%self.node_id, ?e, "drain_role_events: handle_role_event error");
347 }
348 count += 1;
349 }
350 Err(_) => break,
351 }
352 }
353 Ok(())
354 }
355
356 async fn drain_client_cmds(&mut self) -> Result<()> {
358 let max = self.ctx.node_config.raft.batching.max_batch_size;
359 let mut count = 0;
360 while count < max {
361 match self.cmd_rx.try_recv() {
362 Ok(cmd) => {
363 self.role.push_client_cmd(cmd, &self.ctx);
364 count += 1;
365 }
366 Err(_) => break,
367 }
368 }
369 if count > 0 {
370 trace!("Drained {} client commands", count);
371 }
372 if let Err(e) = self.role.flush_cmd_buffers(&self.ctx, &self.role_tx).await {
375 error!(%self.node_id, ?e, "drain_client_cmds: flush_cmd_buffers error");
376 return Err(e);
377 }
378 Ok(())
379 }
380
381 async fn drain_raft_events(&mut self) -> Result<()> {
383 let max = self.ctx.node_config.raft.batching.max_batch_size;
384 let mut count = 0;
385 while count < max {
386 match self.event_rx.try_recv() {
387 Ok(raft_event) => {
388 if let Err(e) = self
389 .role
390 .handle_raft_event(raft_event, &self.ctx, self.role_tx.clone())
391 .await
392 {
393 if e.is_fatal() {
394 error!(%self.node_id, ?e, "Fatal error in drain_raft_events, shutting down");
395 return Err(e);
396 }
397 warn!(%self.node_id, ?e, "Non-fatal error in drain_raft_events, continuing");
398 }
399 count += 1;
400 }
401 Err(_) => break,
402 }
403 }
404 Ok(())
405 }
406
407 pub async fn handle_role_event(
410 &mut self,
411 role_event: RoleEvent,
412 ) -> Result<()> {
413 match role_event {
416 RoleEvent::BecomeFollower(leader_id_option) => {
417 let _ = self.role.drain_read_buffer();
419
420 debug!("BecomeFollower");
421 self.role = self.role.become_follower()?;
422
423 self.role.state_mut().reset_voted_for()?;
425
426 let current_term = self.role.current_term();
428 self.notify_leader_change(leader_id_option, current_term);
429
430 #[cfg(test)]
431 self.notify_role_transition();
432
433 }
435 RoleEvent::BecomeCandidate => {
436 let _ = self.role.drain_read_buffer();
438
439 debug!("BecomeCandidate");
440 self.role = self.role.become_candidate()?;
441
442 let current_term = self.role.current_term();
444 self.notify_leader_change(None, current_term);
445
446 #[cfg(test)]
447 self.notify_role_transition();
448 }
449 RoleEvent::BecomeLeader => {
450 debug!("BecomeLeader");
451 self.role = self.role.become_leader()?;
452
453 let current_term = self.role.current_term();
455 let _ = self.role.state_mut().update_voted_for(VotedFor {
456 voted_for_id: self.node_id,
457 voted_for_term: current_term,
458 committed: true,
459 })?;
460
461 let peer_ids = self.ctx.membership().get_peers_id_with_condition(|_| true).await;
462
463 self.role.init_peers_next_index_and_match_index(
464 self.ctx.raft_log().last_entry_id(),
465 peer_ids,
466 )?;
467
468 self.role.state_mut().init_cluster_metadata(&self.ctx.membership()).await?;
470
471 if let Err(e) = self.role.initiate_noop_commit(&self.ctx, &self.role_tx).await {
474 warn!(?e, "initiate_noop_commit failed — stepping down");
475 self.role_tx.send(RoleEvent::BecomeFollower(None)).map_err(|e| {
476 let error_str = format!("{e:?}");
477 error!("Failed to send: {}", error_str);
478 NetworkError::SingalSendFailed(error_str)
479 })?;
480 }
481
482 #[cfg(test)]
483 self.notify_role_transition();
484 }
485 RoleEvent::BecomeLearner => {
486 let _ = self.role.drain_read_buffer();
488
489 debug!("BecomeLearner");
490 self.role = self.role.become_learner()?;
491
492 let current_term = self.role.current_term();
494 self.notify_leader_change(None, current_term);
495
496 #[cfg(test)]
497 self.notify_role_transition();
498 }
499 RoleEvent::NotifyNewCommitIndex(mut new_commit_data) => {
500 let max_batch = self.ctx.node_config.raft.batching.max_batch_size;
503 let mut count = 1;
504
505 while count < max_batch {
506 match self.role_rx.try_recv() {
507 Ok(RoleEvent::NotifyNewCommitIndex(next)) => {
508 if next.new_commit_index > new_commit_data.new_commit_index {
510 new_commit_data = next;
511 }
512 count += 1;
513 }
514 Ok(other) => {
515 self.role_tx.send(other).map_err(|e| {
516 let error_str = format!("{e:?}");
517 error!("Failed to resend role event: {}", error_str);
518 crate::Error::Fatal(error_str)
519 })?;
520 break;
521 }
522 Err(_) => break,
523 }
524 }
525
526 debug!(
527 "[{}] NotifyNewCommitIndex drained: {} events, max_commit_index={}",
528 self.node_id, count, new_commit_data.new_commit_index
529 );
530
531 self.notify_new_commit(new_commit_data);
532 }
533
534 RoleEvent::LeaderDiscovered(leader_id, term) => {
535 debug!("LeaderDiscovered: leader_id={}, term={}", leader_id, term);
536 self.notify_leader_change(Some(leader_id), term);
539 }
540
541 RoleEvent::ReprocessEvent(raft_event) => {
542 info!("Replay the RaftEvent: {:?}", &raft_event);
543 self.event_tx.send(*raft_event).await.map_err(|e| {
544 let error_str = format!("{e:?}");
545 error!("Failed to send: {}", error_str);
546 NetworkError::SingalSendFailed(error_str)
547 })?;
548 }
549
550 RoleEvent::LogFlushed { durable_index } => {
551 debug!("LogFlushed: durable_index={}", durable_index);
552 self.role.handle_log_flushed(durable_index, &self.ctx, &self.role_tx).await;
553 }
554
555 RoleEvent::AppendResult {
556 follower_id,
557 result,
558 } => {
559 debug!("AppendResult: follower_id={}", follower_id);
560 if let Err(e) = self
561 .role
562 .handle_append_result(follower_id, result, &self.ctx, &self.role_tx)
563 .await
564 {
565 error!("handle_append_result failed: {:?}", e);
566 }
567 }
568
569 RoleEvent::NoopCommitted { term } => {
570 debug!("NoopCommitted: term={}", term);
571 self.notify_leader_change(Some(self.node_id), term);
574 }
575
576 RoleEvent::FatalError { source, error } => {
577 error!(%self.node_id, %source, %error, "Fatal error from SM worker — shutting down");
578 return Err(crate::Error::Fatal(format!("{source}: {error}")));
579 }
580
581 RoleEvent::ApplyCompleted {
582 last_index,
583 results,
584 } => {
585 if let Err(e) = self
587 .role
588 .handle_apply_completed(last_index, results, &self.ctx, &self.role_tx)
589 .await
590 {
591 if e.is_fatal() {
592 error!(%self.node_id, ?e, "Fatal error in ApplyCompleted handler");
593 return Err(e);
594 }
595 warn!(%self.node_id, ?e, "Non-fatal error in ApplyCompleted handler");
596 }
597 }
598
599 RoleEvent::PeerStreamError { peer_id } => {
600 debug!(%peer_id, "PeerStreamError: bidi stream disconnected, resetting next_index");
601 self.role.handle_peer_stream_error(peer_id);
602 }
603
604 RoleEvent::ZombieDetected(node_id) => {
605 debug!(%node_id, "ZombieDetected: forwarding to leader for BatchRemove");
606 if let Err(e) =
607 self.role.handle_zombie_detected(node_id, &self.role_tx, &self.ctx).await
608 {
609 error!(%node_id, ?e, "handle_zombie_detected failed");
610 }
611 }
612
613 RoleEvent::SnapshotPushCompleted { peer_id, success } => {
614 debug!(%peer_id, %success, "SnapshotPushCompleted");
615 if success {
616 let last_entry_id = self.ctx.raft_log().last_entry_id();
621 let _ = self
622 .role
623 .init_peers_next_index_and_match_index(last_entry_id, vec![peer_id]);
624 }
625 let policy = &self.ctx.node_config.retry.install_snapshot;
628 self.role.handle_snapshot_push_completed(peer_id, success, policy, self.node_id);
629 }
630 };
631
632 Ok(())
633 }
634
635 pub fn register_new_commit_listener(
636 &mut self,
637 tx: mpsc::UnboundedSender<NewCommitData>,
638 ) {
639 self.new_commit_listener.push(tx);
640 }
641
642 pub fn notify_new_commit(
643 &self,
644 new_commit_data: NewCommitData,
645 ) {
646 debug!(?new_commit_data, "notify_new_commit",);
647
648 for tx in &self.new_commit_listener {
649 if let Err(e) = tx.send(new_commit_data.clone()) {
650 error!("notify_new_commit failed: {:?}", e);
651 }
652 }
653 }
654
655 #[cfg(test)]
656 pub fn register_role_transition_listener(
657 &mut self,
658 tx: mpsc::UnboundedSender<i32>,
659 ) {
660 self.test_role_transition_listener.push(tx);
661 }
662
663 #[cfg(test)]
664 pub fn notify_role_transition(&self) {
665 let new_role_i32 = self.role.as_i32();
666 for tx in &self.test_role_transition_listener {
667 tx.send(new_role_i32).expect("should succeed");
668 }
669 }
670
671 #[cfg(test)]
672 pub fn register_raft_event_listener(
673 &mut self,
674 tx: mpsc::UnboundedSender<super::TestEvent>,
675 ) {
676 self.test_raft_event_listener.push(tx);
677 }
678
679 #[cfg(test)]
680 pub fn notify_raft_event(
681 &self,
682 event: super::TestEvent,
683 ) {
684 debug!("unit test:: notify new raft event: {:?}", &event);
685
686 for tx in &self.test_raft_event_listener {
687 assert!(tx.send(event.clone()).is_ok(), "should succeed");
688 }
689 }
690
691 #[cfg(test)]
692 pub fn set_role(
693 &mut self,
694 role: RaftRole<T>,
695 ) {
696 self.role = role
697 }
698
699 pub fn event_sender(&self) -> mpsc::Sender<RaftEvent> {
710 self.event_tx.clone()
711 }
712
713 pub fn cmd_sender(&self) -> mpsc::Sender<super::ClientCmd> {
714 self.cmd_tx.clone()
715 }
716
717 #[doc(hidden)]
723 pub fn role_event_sender(&self) -> mpsc::UnboundedSender<RoleEvent> {
724 self.role_tx.clone()
725 }
726}
727
728impl<T> Drop for Raft<T>
729where
730 T: TypeConfig,
731{
732 fn drop(&mut self) {
733 info!("Raft been dropped.");
734
735 if let Err(e) = self
736 .ctx
737 .raft_log()
738 .save_hard_state(&self.role.state().shared_state().hard_state)
739 {
740 error!(?e, "State storage persist node hard state failed.");
741 }
742
743 info!("Graceful shutdown node state ...");
744 }
745}