1use std::sync::Arc;
2
3use d_engine_proto::common::EntryPayload;
4pub use d_engine_proto::common::LeaderInfo;
6use d_engine_proto::server::election::VotedFor;
7use tokio::sync::mpsc;
8use tokio::sync::watch;
9use tokio::time::sleep_until;
10use tracing::debug;
11use tracing::error;
12use tracing::info;
13use tracing::trace;
14use tracing::warn;
15
16use super::NewCommitData;
17use super::RaftContext;
18use super::RaftCoreHandlers;
19use super::RaftEvent;
20use super::RaftRole;
21use super::RaftStorageHandles;
22use super::RoleEvent;
23#[cfg(test)]
24use super::raft_event_to_test_event;
25use crate::Membership;
26use crate::NetworkError;
27use crate::RaftLog;
28use crate::RaftNodeConfig;
29use crate::Result;
30use crate::TypeConfig;
31use crate::alias::MOF;
32use crate::alias::TROF;
33
34pub struct Raft<T>
35where
36 T: TypeConfig,
37{
38 pub node_id: u32,
39 pub role: RaftRole<T>,
40 pub ctx: RaftContext<T>,
41
42 event_tx: mpsc::Sender<RaftEvent>,
44 event_rx: mpsc::Receiver<RaftEvent>,
45
46 role_tx: mpsc::UnboundedSender<RoleEvent>,
48 role_rx: mpsc::UnboundedReceiver<RoleEvent>,
49
50 new_commit_listener: Vec<mpsc::UnboundedSender<NewCommitData>>,
52
53 leader_change_listener: Option<watch::Sender<Option<LeaderInfo>>>,
56
57 shutdown_signal: watch::Receiver<()>,
59
60 #[cfg(test)]
62 test_role_transition_listener: Vec<mpsc::UnboundedSender<i32>>,
63
64 #[cfg(test)]
65 test_raft_event_listener: Vec<mpsc::UnboundedSender<super::TestEvent>>,
66}
67
68pub struct SignalParams {
69 pub(crate) role_tx: mpsc::UnboundedSender<RoleEvent>,
70 pub(crate) role_rx: mpsc::UnboundedReceiver<RoleEvent>,
71 pub(crate) event_tx: mpsc::Sender<RaftEvent>,
72 pub(crate) event_rx: mpsc::Receiver<RaftEvent>,
73 pub(crate) shutdown_signal: watch::Receiver<()>,
74}
75
76impl SignalParams {
77 pub fn new(
82 role_tx: mpsc::UnboundedSender<RoleEvent>,
83 role_rx: mpsc::UnboundedReceiver<RoleEvent>,
84 event_tx: mpsc::Sender<RaftEvent>,
85 event_rx: mpsc::Receiver<RaftEvent>,
86 shutdown_signal: watch::Receiver<()>,
87 ) -> Self {
88 Self {
89 role_tx,
90 role_rx,
91 event_tx,
92 event_rx,
93 shutdown_signal,
94 }
95 }
96}
97
98impl<T> Raft<T>
99where
100 T: TypeConfig,
101{
102 #[allow(clippy::too_many_arguments)]
103 pub fn new(
104 node_id: u32,
105 role: RaftRole<T>,
106 storage: RaftStorageHandles<T>,
107 transport: TROF<T>,
108 handlers: RaftCoreHandlers<T>,
109 membership: Arc<MOF<T>>,
110 signal_params: SignalParams,
111 node_config: Arc<RaftNodeConfig>,
112 ) -> Self {
113 let ctx = Self::build_context(
114 node_id,
115 storage,
116 transport,
117 membership,
118 handlers,
119 node_config.clone(),
120 );
121
122 Raft {
123 node_id,
124 ctx,
125 role,
126
127 event_tx: signal_params.event_tx,
128 event_rx: signal_params.event_rx,
129
130 role_tx: signal_params.role_tx,
131 role_rx: signal_params.role_rx,
132
133 new_commit_listener: Vec::new(),
134
135 shutdown_signal: signal_params.shutdown_signal,
136
137 leader_change_listener: None,
138
139 #[cfg(test)]
140 test_role_transition_listener: Vec::new(),
141
142 #[cfg(test)]
143 test_raft_event_listener: Vec::new(),
144 }
145 }
146
147 pub fn register_leader_change_listener(
156 &mut self,
157 tx: watch::Sender<Option<LeaderInfo>>,
158 ) {
159 self.leader_change_listener = Some(tx);
160 }
161
162 fn notify_leader_change(
167 &self,
168 leader_id: Option<u32>,
169 term: u64,
170 ) {
171 if let Some(tx) = &self.leader_change_listener {
172 tx.send_if_modified(|current| {
173 let new_info = leader_id.map(|id| LeaderInfo {
174 leader_id: id,
175 term,
176 });
177 if *current != new_info {
178 *current = new_info;
179 true
180 } else {
181 false
182 }
183 });
184 }
185 }
186
187 fn build_context(
188 id: u32,
189 storage: RaftStorageHandles<T>,
190 transport: TROF<T>,
191 membership: Arc<MOF<T>>,
192 handlers: RaftCoreHandlers<T>,
193 node_config: Arc<RaftNodeConfig>,
194 ) -> RaftContext<T> {
195 RaftContext {
196 node_id: id,
197 storage,
198 transport: Arc::new(transport),
199 membership,
200 handlers,
201
202 node_config,
203 }
204 }
205
206 pub async fn join_cluster(&self) -> Result<()> {
207 self.role.join_cluster(&self.ctx).await
208 }
209
210 pub async fn run(&mut self) -> Result<()> {
211 if self.ctx.node_config.is_learner() {
213 info!(
214 "Node({}) is learner and needs to fetch initial snapshot.",
215 self.node_id
216 );
217 if let Err(e) = self.role.fetch_initial_snapshot(&self.ctx).await {
218 warn!(
219 "Initial snapshot failed: {:?}.
220 ================================================
221 Leader has not generate snapshot yet. New node
222 will sync with Leader via append entries requests.
223 ================================================
224 ",
225 e
226 );
227 println!(
228 "
229 ================================================
230 Leader has not generate snapshot yet. New node
231 will sync with Leader via append entries requests
232 ================================================
233 "
234 );
235 }
236 }
237
238 info!("Node is running");
239
240 if self.role.is_timer_expired() {
241 self.role.reset_timer();
242 }
243
244 loop {
245 let tick = sleep_until(self.role.next_deadline());
247
248 tokio::select! {
249 biased;
251 _ = self.shutdown_signal.changed() => {
253 info!("[Raft:{}] shutdown signal received.", self.node_id);
254 return Ok(());
255 }
256 _ = tick => {
258
259 trace!("receive tick");
260 let role_tx = &self.role_tx;
261 let event_tx = &self.event_tx;
262
263 if let Err(e) = self.role.tick(role_tx, event_tx, &self.ctx).await {
264 error!("tick failed: {:?}", e);
265 } else {
266 trace!("tick success");
267 }
268 }
269
270 Some(role_event) = self.role_rx.recv() => {
272 debug!(%self.node_id, ?role_event, "receive role event");
273
274 if let Err(e) = self.handle_role_event(role_event).await {
275 error!(%self.node_id, ?e, "handle_role_event error");
276 }
277 }
278
279 Some(raft_event) = self.event_rx.recv() => {
281 trace!(%self.node_id, ?raft_event, "receive raft event");
282
283 #[cfg(test)]
284 let event = raft_event_to_test_event(&raft_event);
285
286 if let Err(e) = self.role.handle_raft_event(raft_event, &self.ctx, self.role_tx.clone()).await {
287 error!(%self.node_id, ?e, "handle_raft_event error");
288 }
289
290 #[cfg(test)]
291 self.notify_raft_event(event);
292 }
293
294 }
295 }
296 }
297
298 pub async fn handle_role_event(
301 &mut self,
302 role_event: RoleEvent,
303 ) -> Result<()> {
304 match role_event {
307 RoleEvent::BecomeFollower(leader_id_option) => {
308 if let Err(e) = self.role.drain_read_buffer() {
310 tracing::warn!("Failed to drain read buffer during role transition: {e:?}");
311 }
312
313 debug!("BecomeFollower");
314 self.role = self.role.become_follower()?;
315
316 self.role.state_mut().reset_voted_for()?;
318
319 let current_term = self.role.current_term();
321 self.notify_leader_change(leader_id_option, current_term);
322
323 #[cfg(test)]
324 self.notify_role_transition();
325
326 }
328 RoleEvent::BecomeCandidate => {
329 if let Err(e) = self.role.drain_read_buffer() {
331 tracing::warn!("Failed to drain read buffer during role transition: {e:?}");
332 }
333
334 debug!("BecomeCandidate");
335 self.role = self.role.become_candidate()?;
336
337 let current_term = self.role.current_term();
339 self.notify_leader_change(None, current_term);
340
341 #[cfg(test)]
342 self.notify_role_transition();
343 }
344 RoleEvent::BecomeLeader => {
345 debug!("BecomeLeader");
346 self.role = self.role.become_leader()?;
347
348 let current_term = self.role.current_term();
350 let _ = self.role.state_mut().update_voted_for(VotedFor {
351 voted_for_id: self.node_id,
352 voted_for_term: current_term,
353 committed: true,
354 })?;
355
356 self.notify_leader_change(Some(self.node_id), current_term);
358
359 let peer_ids = self.ctx.membership().get_peers_id_with_condition(|_| true).await;
360
361 self.role.init_peers_next_index_and_match_index(
362 self.ctx.raft_log().last_entry_id(),
363 peer_ids,
364 )?;
365
366 self.role.state_mut().init_cluster_metadata(&self.ctx.membership()).await?;
368
369 if !self
371 .role
372 .verify_leadership_persistent(
373 vec![EntryPayload::noop()],
374 true,
375 &self.ctx,
376 &self.role_tx,
377 )
378 .await
379 .unwrap_or(false)
380 {
381 warn!(
382 "Verify leadership in new term failed. Now the node is going to step back to Follower..."
383 );
384 self.role_tx.send(RoleEvent::BecomeFollower(None)).map_err(|e| {
385 let error_str = format!("{e:?}");
386 error!("Failed to send: {}", error_str);
387 NetworkError::SingalSendFailed(error_str)
388 })?;
389 } else {
390 if let Err(e) = self.role.on_noop_committed(&self.ctx) {
392 warn!(
393 ?e,
394 "Failed to track no-op commit index after leadership verification"
395 );
396 }
397 }
398
399 #[cfg(test)]
400 self.notify_role_transition();
401 }
402 RoleEvent::BecomeLearner => {
403 if let Err(e) = self.role.drain_read_buffer() {
405 tracing::warn!("Failed to drain read buffer during role transition: {e:?}");
406 }
407
408 debug!("BecomeLearner");
409 self.role = self.role.become_learner()?;
410
411 let current_term = self.role.current_term();
413 self.notify_leader_change(None, current_term);
414
415 #[cfg(test)]
416 self.notify_role_transition();
417 }
418 RoleEvent::NotifyNewCommitIndex(new_commit_data) => {
419 debug!(
420 ?new_commit_data,
421 "[{}] RoleEvent::NotifyNewCommitIndex.", self.node_id,
422 );
423 self.notify_new_commit(new_commit_data);
424 }
425
426 RoleEvent::LeaderDiscovered(leader_id, term) => {
427 debug!("LeaderDiscovered: leader_id={}, term={}", leader_id, term);
428 self.notify_leader_change(Some(leader_id), term);
431 }
432
433 RoleEvent::ReprocessEvent(raft_event) => {
434 info!("Replay the RaftEvent: {:?}", &raft_event);
435 self.event_tx.send(*raft_event).await.map_err(|e| {
436 let error_str = format!("{e:?}");
437 error!("Failed to send: {}", error_str);
438 NetworkError::SingalSendFailed(error_str)
439 })?;
440 }
441 };
442
443 Ok(())
444 }
445
446 pub fn register_new_commit_listener(
447 &mut self,
448 tx: mpsc::UnboundedSender<NewCommitData>,
449 ) {
450 self.new_commit_listener.push(tx);
451 }
452
453 pub fn notify_new_commit(
454 &self,
455 new_commit_data: NewCommitData,
456 ) {
457 debug!(?new_commit_data, "notify_new_commit",);
458
459 for tx in &self.new_commit_listener {
460 if let Err(e) = tx.send(new_commit_data.clone()) {
461 error!("notify_new_commit failed: {:?}", e);
462 }
463 }
464 }
465
466 #[cfg(test)]
467 pub fn register_role_transition_listener(
468 &mut self,
469 tx: mpsc::UnboundedSender<i32>,
470 ) {
471 self.test_role_transition_listener.push(tx);
472 }
473
474 #[cfg(test)]
475 pub fn notify_role_transition(&self) {
476 let new_role_i32 = self.role.as_i32();
477 for tx in &self.test_role_transition_listener {
478 tx.send(new_role_i32).expect("should succeed");
479 }
480 }
481
482 #[cfg(test)]
483 pub fn register_raft_event_listener(
484 &mut self,
485 tx: mpsc::UnboundedSender<super::TestEvent>,
486 ) {
487 self.test_raft_event_listener.push(tx);
488 }
489
490 #[cfg(test)]
491 pub fn notify_raft_event(
492 &self,
493 event: super::TestEvent,
494 ) {
495 debug!("unit test:: notify new raft event: {:?}", &event);
496
497 for tx in &self.test_raft_event_listener {
498 assert!(tx.send(event.clone()).is_ok(), "should succeed");
499 }
500 }
501
502 #[cfg(test)]
503 pub fn set_role(
504 &mut self,
505 role: RaftRole<T>,
506 ) {
507 self.role = role
508 }
509
510 pub fn event_sender(&self) -> mpsc::Sender<RaftEvent> {
521 self.event_tx.clone()
522 }
523
524 #[doc(hidden)]
530 pub fn role_event_sender(&self) -> mpsc::UnboundedSender<RoleEvent> {
531 self.role_tx.clone()
532 }
533}
534
535impl<T> Drop for Raft<T>
536where
537 T: TypeConfig,
538{
539 fn drop(&mut self) {
540 info!("Raft been dropped.");
541
542 if let Err(e) = self
543 .ctx
544 .raft_log()
545 .save_hard_state(&self.role.state().shared_state().hard_state)
546 {
547 error!(?e, "State storage persist node hard state failed.");
548 }
549
550 info!("Graceful shutdown node state ...");
551 }
552}