1use std::sync::Arc;
2
3use d_engine_proto::common::EntryPayload;
4pub use d_engine_proto::common::LeaderInfo;
6use d_engine_proto::server::election::VotedFor;
7use tokio::sync::mpsc;
8use tokio::sync::watch;
9use tokio::time::sleep_until;
10use tracing::debug;
11use tracing::error;
12use tracing::info;
13use tracing::trace;
14use tracing::warn;
15
16use super::NewCommitData;
17use super::RaftContext;
18use super::RaftCoreHandlers;
19use super::RaftEvent;
20use super::RaftRole;
21use super::RaftStorageHandles;
22use super::RoleEvent;
23#[cfg(any(test, feature = "test-utils"))]
24use super::raft_event_to_test_event;
25use crate::Membership;
26use crate::NetworkError;
27use crate::RaftLog;
28use crate::RaftNodeConfig;
29use crate::Result;
30use crate::TypeConfig;
31use crate::alias::MOF;
32use crate::alias::TROF;
33
34pub struct Raft<T>
35where
36 T: TypeConfig,
37{
38 pub node_id: u32,
39 pub role: RaftRole<T>,
40 pub ctx: RaftContext<T>,
41
42 event_tx: mpsc::Sender<RaftEvent>,
44 event_rx: mpsc::Receiver<RaftEvent>,
45
46 role_tx: mpsc::UnboundedSender<RoleEvent>,
48 role_rx: mpsc::UnboundedReceiver<RoleEvent>,
49
50 new_commit_listener: Vec<mpsc::UnboundedSender<NewCommitData>>,
52
53 leader_change_listener: Option<watch::Sender<Option<LeaderInfo>>>,
56
57 shutdown_signal: watch::Receiver<()>,
59
60 #[cfg(any(test, feature = "test-utils"))]
62 test_role_transition_listener: Vec<mpsc::UnboundedSender<i32>>,
63
64 #[cfg(any(test, feature = "test-utils"))]
65 test_raft_event_listener: Vec<mpsc::UnboundedSender<super::TestEvent>>,
66}
67
68pub struct SignalParams {
69 pub(crate) role_tx: mpsc::UnboundedSender<RoleEvent>,
70 pub(crate) role_rx: mpsc::UnboundedReceiver<RoleEvent>,
71 pub(crate) event_tx: mpsc::Sender<RaftEvent>,
72 pub(crate) event_rx: mpsc::Receiver<RaftEvent>,
73 pub(crate) shutdown_signal: watch::Receiver<()>,
74}
75
76impl SignalParams {
77 pub fn new(
82 role_tx: mpsc::UnboundedSender<RoleEvent>,
83 role_rx: mpsc::UnboundedReceiver<RoleEvent>,
84 event_tx: mpsc::Sender<RaftEvent>,
85 event_rx: mpsc::Receiver<RaftEvent>,
86 shutdown_signal: watch::Receiver<()>,
87 ) -> Self {
88 Self {
89 role_tx,
90 role_rx,
91 event_tx,
92 event_rx,
93 shutdown_signal,
94 }
95 }
96}
97
98impl<T> Raft<T>
99where
100 T: TypeConfig,
101{
102 #[allow(clippy::too_many_arguments)]
103 pub fn new(
104 node_id: u32,
105 role: RaftRole<T>,
106 storage: RaftStorageHandles<T>,
107 transport: TROF<T>,
108 handlers: RaftCoreHandlers<T>,
109 membership: Arc<MOF<T>>,
110 signal_params: SignalParams,
111 node_config: Arc<RaftNodeConfig>,
112 ) -> Self {
113 let ctx = Self::build_context(
114 node_id,
115 storage,
116 transport,
117 membership,
118 handlers,
119 node_config.clone(),
120 );
121
122 Raft {
123 node_id,
124 ctx,
125 role,
126
127 event_tx: signal_params.event_tx,
128 event_rx: signal_params.event_rx,
129
130 role_tx: signal_params.role_tx,
131 role_rx: signal_params.role_rx,
132
133 new_commit_listener: Vec::new(),
134
135 shutdown_signal: signal_params.shutdown_signal,
136
137 leader_change_listener: None,
138
139 #[cfg(any(test, feature = "test-utils"))]
140 test_role_transition_listener: Vec::new(),
141
142 #[cfg(any(test, feature = "test-utils"))]
143 test_raft_event_listener: Vec::new(),
144 }
145 }
146
147 pub fn register_leader_change_listener(
156 &mut self,
157 tx: watch::Sender<Option<LeaderInfo>>,
158 ) {
159 self.leader_change_listener = Some(tx);
160 }
161
162 fn notify_leader_change(
167 &self,
168 leader_id: Option<u32>,
169 term: u64,
170 ) {
171 if let Some(tx) = &self.leader_change_listener {
172 tx.send_if_modified(|current| {
173 let new_info = leader_id.map(|id| LeaderInfo {
174 leader_id: id,
175 term,
176 });
177 if *current != new_info {
178 *current = new_info;
179 true
180 } else {
181 false
182 }
183 });
184 }
185 }
186
187 fn build_context(
188 id: u32,
189 storage: RaftStorageHandles<T>,
190 transport: TROF<T>,
191 membership: Arc<MOF<T>>,
192 handlers: RaftCoreHandlers<T>,
193 node_config: Arc<RaftNodeConfig>,
194 ) -> RaftContext<T> {
195 RaftContext {
196 node_id: id,
197 storage,
198 transport: Arc::new(transport),
199 membership,
200 handlers,
201
202 node_config,
203 }
204 }
205
206 pub async fn join_cluster(&self) -> Result<()> {
207 self.role.join_cluster(&self.ctx).await
208 }
209
210 pub async fn run(&mut self) -> Result<()> {
211 if self.ctx.node_config.is_learner() {
213 info!(
214 "Node({}) is learner and needs to fetch initial snapshot.",
215 self.node_id
216 );
217 if let Err(e) = self.role.fetch_initial_snapshot(&self.ctx).await {
218 warn!(
219 "Initial snapshot failed: {:?}.
220 ================================================
221 Leader has not generate snapshot yet. New node
222 will sync with Leader via append entries requests.
223 ================================================
224 ",
225 e
226 );
227 println!(
228 "
229 ================================================
230 Leader has not generate snapshot yet. New node
231 will sync with Leader via append entries requests
232 ================================================
233 "
234 );
235 }
236 }
237
238 info!("Node is running");
239
240 if self.role.is_timer_expired() {
241 self.role.reset_timer();
242 }
243
244 loop {
245 let tick = sleep_until(self.role.next_deadline());
247
248 tokio::select! {
249 biased;
251 _ = self.shutdown_signal.changed() => {
253 info!("[Raft:{}] shutdown signal received.", self.node_id);
254 return Ok(());
255 }
256 _ = tick => {
258
259 trace!("receive tick");
260 let role_tx = &self.role_tx;
261 let event_tx = &self.event_tx;
262
263 if let Err(e) = self.role.tick(role_tx, event_tx, &self.ctx).await {
264 error!("tick failed: {:?}", e);
265 } else {
266 trace!("tick success");
267 }
268 }
269
270 Some(role_event) = self.role_rx.recv() => {
272 debug!(%self.node_id, ?role_event, "receive role event");
273
274 if let Err(e) = self.handle_role_event(role_event).await {
275 error!(%self.node_id, ?e, "handle_role_event error");
276 }
277 }
278
279 Some(raft_event) = self.event_rx.recv() => {
281 trace!(%self.node_id, ?raft_event, "receive raft event");
282
283 #[cfg(any(test, feature = "test-utils"))]
284 let event = raft_event_to_test_event(&raft_event);
285
286 if let Err(e) = self.role.handle_raft_event(raft_event, &self.ctx, self.role_tx.clone()).await {
287 error!(%self.node_id, ?e, "handle_raft_event error");
288 }
289
290 #[cfg(any(test, feature = "test-utils"))]
291 self.notify_raft_event(event);
292 }
293
294 }
295 }
296 }
297
298 pub async fn handle_role_event(
301 &mut self,
302 role_event: RoleEvent,
303 ) -> Result<()> {
304 match role_event {
307 RoleEvent::BecomeFollower(leader_id_option) => {
308 debug!("BecomeFollower");
309 self.role = self.role.become_follower()?;
310
311 self.role.state_mut().reset_voted_for()?;
313
314 let current_term = self.role.current_term();
316 self.notify_leader_change(leader_id_option, current_term);
317
318 #[cfg(any(test, feature = "test-utils"))]
319 self.notify_role_transition();
320
321 }
323 RoleEvent::BecomeCandidate => {
324 debug!("BecomeCandidate");
325 self.role = self.role.become_candidate()?;
326
327 let current_term = self.role.current_term();
329 self.notify_leader_change(None, current_term);
330
331 #[cfg(any(test, feature = "test-utils"))]
332 self.notify_role_transition();
333 }
334 RoleEvent::BecomeLeader => {
335 debug!("BecomeLeader");
336 self.role = self.role.become_leader()?;
337
338 let current_term = self.role.current_term();
340 let _ = self.role.state_mut().update_voted_for(VotedFor {
341 voted_for_id: self.node_id,
342 voted_for_term: current_term,
343 committed: true,
344 })?;
345
346 self.notify_leader_change(Some(self.node_id), current_term);
348
349 let peer_ids = self.ctx.membership().get_peers_id_with_condition(|_| true).await;
350
351 self.role.init_peers_next_index_and_match_index(
352 self.ctx.raft_log().last_entry_id(),
353 peer_ids,
354 )?;
355
356 self.role.state_mut().init_cluster_metadata(&self.ctx.membership()).await?;
358
359 if !self
361 .role
362 .verify_leadership_persistent(
363 vec![EntryPayload::noop()],
364 true,
365 &self.ctx,
366 &self.role_tx,
367 )
368 .await
369 .unwrap_or(false)
370 {
371 warn!(
372 "Verify leadership in new term failed. Now the node is going to step back to Follower..."
373 );
374 self.role_tx.send(RoleEvent::BecomeFollower(None)).map_err(|e| {
375 let error_str = format!("{e:?}");
376 error!("Failed to send: {}", error_str);
377 NetworkError::SingalSendFailed(error_str)
378 })?;
379 }
380
381 #[cfg(any(test, feature = "test-utils"))]
382 self.notify_role_transition();
383 }
384 RoleEvent::BecomeLearner => {
385 debug!("BecomeLearner");
386 self.role = self.role.become_learner()?;
387
388 let current_term = self.role.current_term();
390 self.notify_leader_change(None, current_term);
391
392 #[cfg(any(test, feature = "test-utils"))]
393 self.notify_role_transition();
394 }
395 RoleEvent::NotifyNewCommitIndex(new_commit_data) => {
396 debug!(
397 ?new_commit_data,
398 "[{}] RoleEvent::NotifyNewCommitIndex.", self.node_id,
399 );
400 self.notify_new_commit(new_commit_data);
401 }
402
403 RoleEvent::LeaderDiscovered(leader_id, term) => {
404 debug!("LeaderDiscovered: leader_id={}, term={}", leader_id, term);
405 self.notify_leader_change(Some(leader_id), term);
408 }
409
410 RoleEvent::ReprocessEvent(raft_event) => {
411 info!("Replay the RaftEvent: {:?}", &raft_event);
412 self.event_tx.send(*raft_event).await.map_err(|e| {
413 let error_str = format!("{e:?}");
414 error!("Failed to send: {}", error_str);
415 NetworkError::SingalSendFailed(error_str)
416 })?;
417 }
418 };
419
420 Ok(())
421 }
422
423 pub fn register_new_commit_listener(
424 &mut self,
425 tx: mpsc::UnboundedSender<NewCommitData>,
426 ) {
427 self.new_commit_listener.push(tx);
428 }
429
430 pub fn notify_new_commit(
431 &self,
432 new_commit_data: NewCommitData,
433 ) {
434 debug!(?new_commit_data, "notify_new_commit",);
435
436 for tx in &self.new_commit_listener {
437 if let Err(e) = tx.send(new_commit_data.clone()) {
438 error!("notify_new_commit failed: {:?}", e);
439 }
440 }
441 }
442
443 #[cfg(any(test, feature = "test-utils"))]
444 pub fn register_role_transition_listener(
445 &mut self,
446 tx: mpsc::UnboundedSender<i32>,
447 ) {
448 self.test_role_transition_listener.push(tx);
449 }
450
451 #[cfg(any(test, feature = "test-utils"))]
452 pub fn notify_role_transition(&self) {
453 let new_role_i32 = self.role.as_i32();
454 for tx in &self.test_role_transition_listener {
455 tx.send(new_role_i32).expect("should succeed");
456 }
457 }
458
459 #[cfg(any(test, feature = "test-utils"))]
460 pub fn register_raft_event_listener(
461 &mut self,
462 tx: mpsc::UnboundedSender<super::TestEvent>,
463 ) {
464 self.test_raft_event_listener.push(tx);
465 }
466
467 #[cfg(any(test, feature = "test-utils"))]
468 pub fn notify_raft_event(
469 &self,
470 event: super::TestEvent,
471 ) {
472 debug!("unit test:: notify new raft event: {:?}", &event);
473
474 for tx in &self.test_raft_event_listener {
475 assert!(tx.send(event.clone()).is_ok(), "should succeed");
476 }
477 }
478
479 #[cfg(any(test, feature = "test-utils"))]
480 pub fn set_role(
481 &mut self,
482 role: RaftRole<T>,
483 ) {
484 self.role = role
485 }
486
487 pub fn event_sender(&self) -> mpsc::Sender<RaftEvent> {
498 self.event_tx.clone()
499 }
500
501 #[doc(hidden)]
507 pub fn role_event_sender(&self) -> mpsc::UnboundedSender<RoleEvent> {
508 self.role_tx.clone()
509 }
510}
511
512impl<T> Drop for Raft<T>
513where
514 T: TypeConfig,
515{
516 fn drop(&mut self) {
517 info!("Raft been dropped.");
518
519 if let Err(e) = self
520 .ctx
521 .raft_log()
522 .save_hard_state(&self.role.state().shared_state().hard_state)
523 {
524 error!(?e, "State storage persist node hard state failed.");
525 }
526
527 info!("Graceful shutdown node state ...");
528 }
529}