1use std::sync::Arc;
2
3use d_engine_proto::common::EntryPayload;
4pub use d_engine_proto::common::LeaderInfo;
6use d_engine_proto::server::election::VotedFor;
7use tokio::sync::mpsc;
8use tokio::sync::watch;
9use tokio::time::sleep_until;
10use tracing::debug;
11use tracing::error;
12use tracing::info;
13use tracing::trace;
14use tracing::warn;
15
16use super::NewCommitData;
17use super::RaftContext;
18use super::RaftCoreHandlers;
19use super::RaftEvent;
20use super::RaftRole;
21use super::RaftStorageHandles;
22use super::RoleEvent;
23#[cfg(test)]
24use super::raft_event_to_test_event;
25use crate::Membership;
26use crate::NetworkError;
27use crate::RaftLog;
28use crate::RaftNodeConfig;
29use crate::Result;
30use crate::TypeConfig;
31use crate::alias::MOF;
32use crate::alias::TROF;
33
34pub struct Raft<T>
35where
36 T: TypeConfig,
37{
38 pub node_id: u32,
39 pub role: RaftRole<T>,
40 pub ctx: RaftContext<T>,
41
42 event_tx: mpsc::Sender<RaftEvent>,
44 event_rx: mpsc::Receiver<RaftEvent>,
45
46 cmd_tx: mpsc::UnboundedSender<super::ClientCmd>,
48 cmd_rx: mpsc::UnboundedReceiver<super::ClientCmd>,
49
50 role_tx: mpsc::UnboundedSender<RoleEvent>,
52 role_rx: mpsc::UnboundedReceiver<RoleEvent>,
53
54 new_commit_listener: Vec<mpsc::UnboundedSender<NewCommitData>>,
56
57 leader_change_listener: Option<watch::Sender<Option<LeaderInfo>>>,
60
61 shutdown_signal: watch::Receiver<()>,
63
64 #[cfg(test)]
66 test_role_transition_listener: Vec<mpsc::UnboundedSender<i32>>,
67
68 #[cfg(test)]
69 test_raft_event_listener: Vec<mpsc::UnboundedSender<super::TestEvent>>,
70}
71
72pub struct SignalParams {
73 pub(crate) role_tx: mpsc::UnboundedSender<RoleEvent>,
74 pub(crate) role_rx: mpsc::UnboundedReceiver<RoleEvent>,
75 pub(crate) event_tx: mpsc::Sender<RaftEvent>,
76 pub(crate) event_rx: mpsc::Receiver<RaftEvent>,
77 pub(crate) cmd_tx: mpsc::UnboundedSender<super::ClientCmd>,
78 pub(crate) cmd_rx: mpsc::UnboundedReceiver<super::ClientCmd>,
79 pub(crate) shutdown_signal: watch::Receiver<()>,
80}
81
82impl SignalParams {
83 pub fn new(
88 role_tx: mpsc::UnboundedSender<RoleEvent>,
89 role_rx: mpsc::UnboundedReceiver<RoleEvent>,
90 event_tx: mpsc::Sender<RaftEvent>,
91 event_rx: mpsc::Receiver<RaftEvent>,
92 cmd_tx: mpsc::UnboundedSender<super::ClientCmd>,
93 cmd_rx: mpsc::UnboundedReceiver<super::ClientCmd>,
94 shutdown_signal: watch::Receiver<()>,
95 ) -> Self {
96 Self {
97 role_tx,
98 role_rx,
99 event_tx,
100 event_rx,
101 cmd_tx,
102 cmd_rx,
103 shutdown_signal,
104 }
105 }
106}
107
108impl<T> Raft<T>
109where
110 T: TypeConfig,
111{
112 #[allow(clippy::too_many_arguments)]
113 pub fn new(
114 node_id: u32,
115 role: RaftRole<T>,
116 storage: RaftStorageHandles<T>,
117 transport: TROF<T>,
118 handlers: RaftCoreHandlers<T>,
119 membership: Arc<MOF<T>>,
120 signal_params: SignalParams,
121 node_config: Arc<RaftNodeConfig>,
122 ) -> Self {
123 let ctx = Self::build_context(
124 node_id,
125 storage,
126 transport,
127 membership,
128 handlers,
129 node_config.clone(),
130 );
131
132 Raft {
133 node_id,
134 ctx,
135 role,
136
137 event_tx: signal_params.event_tx,
138 event_rx: signal_params.event_rx,
139
140 cmd_tx: signal_params.cmd_tx,
141 cmd_rx: signal_params.cmd_rx,
142
143 role_tx: signal_params.role_tx,
144 role_rx: signal_params.role_rx,
145
146 new_commit_listener: Vec::new(),
147
148 shutdown_signal: signal_params.shutdown_signal,
149
150 leader_change_listener: None,
151
152 #[cfg(test)]
153 test_role_transition_listener: Vec::new(),
154
155 #[cfg(test)]
156 test_raft_event_listener: Vec::new(),
157 }
158 }
159
160 pub fn register_leader_change_listener(
169 &mut self,
170 tx: watch::Sender<Option<LeaderInfo>>,
171 ) {
172 self.leader_change_listener = Some(tx);
173 }
174
175 fn notify_leader_change(
180 &self,
181 leader_id: Option<u32>,
182 term: u64,
183 ) {
184 if let Some(tx) = &self.leader_change_listener {
185 tx.send_if_modified(|current| {
186 let new_info = leader_id.map(|id| LeaderInfo {
187 leader_id: id,
188 term,
189 });
190 if *current != new_info {
191 *current = new_info;
192 true
193 } else {
194 false
195 }
196 });
197 }
198 }
199
200 fn build_context(
201 id: u32,
202 storage: RaftStorageHandles<T>,
203 transport: TROF<T>,
204 membership: Arc<MOF<T>>,
205 handlers: RaftCoreHandlers<T>,
206 node_config: Arc<RaftNodeConfig>,
207 ) -> RaftContext<T> {
208 RaftContext {
209 node_id: id,
210 storage,
211 transport: Arc::new(transport),
212 membership,
213 handlers,
214
215 node_config,
216 }
217 }
218
219 pub async fn join_cluster(&self) -> Result<()> {
220 self.role.join_cluster(&self.ctx).await
221 }
222
223 pub async fn run(&mut self) -> Result<()> {
224 if self.ctx.node_config.is_learner() {
226 info!(
227 "Node({}) is learner and needs to fetch initial snapshot.",
228 self.node_id
229 );
230 if let Err(e) = self.role.fetch_initial_snapshot(&self.ctx).await {
231 warn!(
232 "Initial snapshot failed: {:?}.
233 ================================================
234 Leader has not generate snapshot yet. New node
235 will sync with Leader via append entries requests.
236 ================================================
237 ",
238 e
239 );
240 println!(
241 "
242 ================================================
243 Leader has not generate snapshot yet. New node
244 will sync with Leader via append entries requests
245 ================================================
246 "
247 );
248 }
249 }
250
251 info!("Node is running");
252
253 if self.role.is_timer_expired() {
254 self.role.reset_timer();
255 }
256
257 loop {
258 let tick = sleep_until(self.role.next_deadline());
260
261 tokio::select! {
262 biased;
264 _ = self.shutdown_signal.changed() => {
266 info!("[Raft:{}] shutdown signal received.", self.node_id);
267 return Ok(());
268 }
269 _ = tick => {
271
272 trace!("receive tick");
273 let role_tx = &self.role_tx;
274 let event_tx = &self.event_tx;
275
276 if let Err(e) = self.role.tick(role_tx, event_tx, &self.ctx).await {
277 error!("tick failed: {:?}", e);
278 } else {
279 trace!("tick success");
280 }
281 }
282
283 Some(role_event) = self.role_rx.recv() => {
285 debug!(%self.node_id, ?role_event, "receive role event");
286
287 if let Err(e) = self.handle_role_event(role_event).await {
288 error!(%self.node_id, ?e, "handle_role_event error");
289 }
290 }
291
292 Some(first_cmd) = self.cmd_rx.recv() => {
294 trace!(%self.node_id, "receive first client command");
295
296 self.role.push_client_cmd(first_cmd, &self.ctx);
298
299 let max_batch = self.ctx.node_config.raft.batching.max_batch_size;
301 let mut count = 1;
302
303 while count < max_batch {
304 match self.cmd_rx.try_recv() {
305 Ok(cmd) => {
306 self.role.push_client_cmd(cmd, &self.ctx);
307 count += 1;
308 }
309 Err(_) => break,
310 }
311 }
312
313 trace!("Drained {} client commands", count);
314
315 if let Err(e) = self.role.flush_cmd_buffers(&self.ctx, &self.role_tx).await {
317 error!(%self.node_id, ?e, "flush_cmd_buffers error");
318 return Err(e);
319 }
320 }
321
322 Some(raft_event) = self.event_rx.recv() => {
324 trace!(%self.node_id, ?raft_event, "receive raft event");
325
326 #[cfg(test)]
327 let event = raft_event_to_test_event(&raft_event);
328
329 if let Err(e) = self.role.handle_raft_event(raft_event, &self.ctx, self.role_tx.clone()).await {
330 error!(%self.node_id, ?e, "handle_raft_event error");
331 return Err(e);
333 }
334
335 #[cfg(test)]
336 self.notify_raft_event(event);
337 }
338
339 }
340 }
341 }
342
343 pub async fn handle_role_event(
346 &mut self,
347 role_event: RoleEvent,
348 ) -> Result<()> {
349 match role_event {
352 RoleEvent::BecomeFollower(leader_id_option) => {
353 let _ = self.role.drain_read_buffer();
355
356 debug!("BecomeFollower");
357 self.role = self.role.become_follower()?;
358
359 self.role.state_mut().reset_voted_for()?;
361
362 let current_term = self.role.current_term();
364 self.notify_leader_change(leader_id_option, current_term);
365
366 #[cfg(test)]
367 self.notify_role_transition();
368
369 }
371 RoleEvent::BecomeCandidate => {
372 let _ = self.role.drain_read_buffer();
374
375 debug!("BecomeCandidate");
376 self.role = self.role.become_candidate()?;
377
378 let current_term = self.role.current_term();
380 self.notify_leader_change(None, current_term);
381
382 #[cfg(test)]
383 self.notify_role_transition();
384 }
385 RoleEvent::BecomeLeader => {
386 debug!("BecomeLeader");
387 self.role = self.role.become_leader()?;
388
389 let current_term = self.role.current_term();
391 let _ = self.role.state_mut().update_voted_for(VotedFor {
392 voted_for_id: self.node_id,
393 voted_for_term: current_term,
394 committed: true,
395 })?;
396
397 let peer_ids = self.ctx.membership().get_peers_id_with_condition(|_| true).await;
398
399 self.role.init_peers_next_index_and_match_index(
400 self.ctx.raft_log().last_entry_id(),
401 peer_ids,
402 )?;
403
404 self.role.state_mut().init_cluster_metadata(&self.ctx.membership()).await?;
406
407 if !self
409 .role
410 .verify_leadership_persistent(
411 vec![EntryPayload::noop()],
412 &self.ctx,
413 &self.role_tx,
414 )
415 .await
416 .unwrap_or(false)
417 {
418 warn!(
419 "Verify leadership in new term failed. Now the node is going to step back to Follower..."
420 );
421 self.role_tx.send(RoleEvent::BecomeFollower(None)).map_err(|e| {
422 let error_str = format!("{e:?}");
423 error!("Failed to send: {}", error_str);
424 NetworkError::SingalSendFailed(error_str)
425 })?;
426 } else {
427 if let Err(e) = self.role.on_noop_committed(&self.ctx) {
429 warn!(
430 ?e,
431 "Failed to track no-op commit index after leadership verification"
432 );
433 } else {
434 self.notify_leader_change(Some(self.node_id), current_term);
436 }
437 }
438
439 #[cfg(test)]
440 self.notify_role_transition();
441 }
442 RoleEvent::BecomeLearner => {
443 let _ = self.role.drain_read_buffer();
445
446 debug!("BecomeLearner");
447 self.role = self.role.become_learner()?;
448
449 let current_term = self.role.current_term();
451 self.notify_leader_change(None, current_term);
452
453 #[cfg(test)]
454 self.notify_role_transition();
455 }
456 RoleEvent::NotifyNewCommitIndex(mut new_commit_data) => {
457 let max_batch = self.ctx.node_config.raft.batching.max_batch_size;
460 let mut count = 1;
461
462 while count < max_batch {
463 match self.role_rx.try_recv() {
464 Ok(RoleEvent::NotifyNewCommitIndex(next)) => {
465 if next.new_commit_index > new_commit_data.new_commit_index {
467 new_commit_data = next;
468 }
469 count += 1;
470 }
471 Ok(other) => {
472 self.role_tx.send(other).map_err(|e| {
473 let error_str = format!("{e:?}");
474 error!("Failed to resend role event: {}", error_str);
475 crate::Error::Fatal(error_str)
476 })?;
477 break;
478 }
479 Err(_) => break,
480 }
481 }
482
483 debug!(
484 "[{}] NotifyNewCommitIndex drained: {} events, max_commit_index={}",
485 self.node_id, count, new_commit_data.new_commit_index
486 );
487
488 self.notify_new_commit(new_commit_data);
489 }
490
491 RoleEvent::LeaderDiscovered(leader_id, term) => {
492 debug!("LeaderDiscovered: leader_id={}, term={}", leader_id, term);
493 self.notify_leader_change(Some(leader_id), term);
496 }
497
498 RoleEvent::ReprocessEvent(raft_event) => {
499 info!("Replay the RaftEvent: {:?}", &raft_event);
500 self.event_tx.send(*raft_event).await.map_err(|e| {
501 let error_str = format!("{e:?}");
502 error!("Failed to send: {}", error_str);
503 NetworkError::SingalSendFailed(error_str)
504 })?;
505 }
506 };
507
508 Ok(())
509 }
510
511 pub fn register_new_commit_listener(
512 &mut self,
513 tx: mpsc::UnboundedSender<NewCommitData>,
514 ) {
515 self.new_commit_listener.push(tx);
516 }
517
518 pub fn notify_new_commit(
519 &self,
520 new_commit_data: NewCommitData,
521 ) {
522 debug!(?new_commit_data, "notify_new_commit",);
523
524 for tx in &self.new_commit_listener {
525 if let Err(e) = tx.send(new_commit_data.clone()) {
526 error!("notify_new_commit failed: {:?}", e);
527 }
528 }
529 }
530
531 #[cfg(test)]
532 pub fn register_role_transition_listener(
533 &mut self,
534 tx: mpsc::UnboundedSender<i32>,
535 ) {
536 self.test_role_transition_listener.push(tx);
537 }
538
539 #[cfg(test)]
540 pub fn notify_role_transition(&self) {
541 let new_role_i32 = self.role.as_i32();
542 for tx in &self.test_role_transition_listener {
543 tx.send(new_role_i32).expect("should succeed");
544 }
545 }
546
547 #[cfg(test)]
548 pub fn register_raft_event_listener(
549 &mut self,
550 tx: mpsc::UnboundedSender<super::TestEvent>,
551 ) {
552 self.test_raft_event_listener.push(tx);
553 }
554
555 #[cfg(test)]
556 pub fn notify_raft_event(
557 &self,
558 event: super::TestEvent,
559 ) {
560 debug!("unit test:: notify new raft event: {:?}", &event);
561
562 for tx in &self.test_raft_event_listener {
563 assert!(tx.send(event.clone()).is_ok(), "should succeed");
564 }
565 }
566
567 #[cfg(test)]
568 pub fn set_role(
569 &mut self,
570 role: RaftRole<T>,
571 ) {
572 self.role = role
573 }
574
575 pub fn event_sender(&self) -> mpsc::Sender<RaftEvent> {
586 self.event_tx.clone()
587 }
588
589 pub fn cmd_sender(&self) -> mpsc::UnboundedSender<super::ClientCmd> {
590 self.cmd_tx.clone()
591 }
592
593 #[doc(hidden)]
599 pub fn role_event_sender(&self) -> mpsc::UnboundedSender<RoleEvent> {
600 self.role_tx.clone()
601 }
602}
603
604impl<T> Drop for Raft<T>
605where
606 T: TypeConfig,
607{
608 fn drop(&mut self) {
609 info!("Raft been dropped.");
610
611 if let Err(e) = self
612 .ctx
613 .raft_log()
614 .save_hard_state(&self.role.state().shared_state().hard_state)
615 {
616 error!(?e, "State storage persist node hard state failed.");
617 }
618
619 info!("Graceful shutdown node state ...");
620 }
621}