d_engine_core/
raft.rs

1use std::sync::Arc;
2
3use d_engine_proto::common::EntryPayload;
4// Re-export LeaderInfo from proto (application layer use)
5pub use d_engine_proto::common::LeaderInfo;
6use d_engine_proto::server::election::VotedFor;
7use tokio::sync::mpsc;
8use tokio::sync::watch;
9use tokio::time::sleep_until;
10use tracing::debug;
11use tracing::error;
12use tracing::info;
13use tracing::trace;
14use tracing::warn;
15
16use super::NewCommitData;
17use super::RaftContext;
18use super::RaftCoreHandlers;
19use super::RaftEvent;
20use super::RaftRole;
21use super::RaftStorageHandles;
22use super::RoleEvent;
23#[cfg(any(test, feature = "test-utils"))]
24use super::raft_event_to_test_event;
25use crate::Membership;
26use crate::NetworkError;
27use crate::RaftLog;
28use crate::RaftNodeConfig;
29use crate::Result;
30use crate::TypeConfig;
31use crate::alias::MOF;
32use crate::alias::TROF;
33
34pub struct Raft<T>
35where
36    T: TypeConfig,
37{
38    pub node_id: u32,
39    pub role: RaftRole<T>,
40    pub ctx: RaftContext<T>,
41
42    // Network & Storage events
43    event_tx: mpsc::Sender<RaftEvent>,
44    event_rx: mpsc::Receiver<RaftEvent>,
45
46    // Timer
47    role_tx: mpsc::UnboundedSender<RoleEvent>,
48    role_rx: mpsc::UnboundedReceiver<RoleEvent>,
49
50    // For business logic to apply logs into state machine
51    new_commit_listener: Vec<mpsc::UnboundedSender<NewCommitData>>,
52
53    // Leader change notification
54    // Uses watch::Sender for efficient multi-subscriber pattern
55    leader_change_listener: Option<watch::Sender<Option<LeaderInfo>>>,
56
57    // Shutdown signal
58    shutdown_signal: watch::Receiver<()>,
59
60    // For unit test
61    #[cfg(any(test, feature = "test-utils"))]
62    test_role_transition_listener: Vec<mpsc::UnboundedSender<i32>>,
63
64    #[cfg(any(test, feature = "test-utils"))]
65    test_raft_event_listener: Vec<mpsc::UnboundedSender<super::TestEvent>>,
66}
67
68pub struct SignalParams {
69    pub(crate) role_tx: mpsc::UnboundedSender<RoleEvent>,
70    pub(crate) role_rx: mpsc::UnboundedReceiver<RoleEvent>,
71    pub(crate) event_tx: mpsc::Sender<RaftEvent>,
72    pub(crate) event_rx: mpsc::Receiver<RaftEvent>,
73    pub(crate) shutdown_signal: watch::Receiver<()>,
74}
75
76impl SignalParams {
77    /// Creates a new SignalParams with the provided channels.
78    ///
79    /// This is the only way to construct SignalParams from outside d-engine-core,
80    /// ensuring controlled initialization of the internal communication channels.
81    pub fn new(
82        role_tx: mpsc::UnboundedSender<RoleEvent>,
83        role_rx: mpsc::UnboundedReceiver<RoleEvent>,
84        event_tx: mpsc::Sender<RaftEvent>,
85        event_rx: mpsc::Receiver<RaftEvent>,
86        shutdown_signal: watch::Receiver<()>,
87    ) -> Self {
88        Self {
89            role_tx,
90            role_rx,
91            event_tx,
92            event_rx,
93            shutdown_signal,
94        }
95    }
96}
97
98impl<T> Raft<T>
99where
100    T: TypeConfig,
101{
102    #[allow(clippy::too_many_arguments)]
103    pub fn new(
104        node_id: u32,
105        role: RaftRole<T>,
106        storage: RaftStorageHandles<T>,
107        transport: TROF<T>,
108        handlers: RaftCoreHandlers<T>,
109        membership: Arc<MOF<T>>,
110        signal_params: SignalParams,
111        node_config: Arc<RaftNodeConfig>,
112    ) -> Self {
113        let ctx = Self::build_context(
114            node_id,
115            storage,
116            transport,
117            membership,
118            handlers,
119            node_config.clone(),
120        );
121
122        Raft {
123            node_id,
124            ctx,
125            role,
126
127            event_tx: signal_params.event_tx,
128            event_rx: signal_params.event_rx,
129
130            role_tx: signal_params.role_tx,
131            role_rx: signal_params.role_rx,
132
133            new_commit_listener: Vec::new(),
134
135            shutdown_signal: signal_params.shutdown_signal,
136
137            leader_change_listener: None,
138
139            #[cfg(any(test, feature = "test-utils"))]
140            test_role_transition_listener: Vec::new(),
141
142            #[cfg(any(test, feature = "test-utils"))]
143            test_raft_event_listener: Vec::new(),
144        }
145    }
146
147    /// Register a listener for leader election events.
148    ///
149    /// The listener will receive LeaderInfo updates:
150    /// - Some(LeaderInfo) when a leader is elected
151    /// - None when no leader exists (during election)
152    ///
153    /// # Performance
154    /// Event-driven notification (no polling), multi-subscriber support via watch channel
155    pub fn register_leader_change_listener(
156        &mut self,
157        tx: watch::Sender<Option<LeaderInfo>>,
158    ) {
159        self.leader_change_listener = Some(tx);
160    }
161
162    /// Notify all leader change listeners.
163    ///
164    /// Called internally when role transitions occur.
165    /// Uses send_if_modified to avoid redundant notifications.
166    fn notify_leader_change(
167        &self,
168        leader_id: Option<u32>,
169        term: u64,
170    ) {
171        if let Some(tx) = &self.leader_change_listener {
172            tx.send_if_modified(|current| {
173                let new_info = leader_id.map(|id| LeaderInfo {
174                    leader_id: id,
175                    term,
176                });
177                if *current != new_info {
178                    *current = new_info;
179                    true
180                } else {
181                    false
182                }
183            });
184        }
185    }
186
187    fn build_context(
188        id: u32,
189        storage: RaftStorageHandles<T>,
190        transport: TROF<T>,
191        membership: Arc<MOF<T>>,
192        handlers: RaftCoreHandlers<T>,
193        node_config: Arc<RaftNodeConfig>,
194    ) -> RaftContext<T> {
195        RaftContext {
196            node_id: id,
197            storage,
198            transport: Arc::new(transport),
199            membership,
200            handlers,
201
202            node_config,
203        }
204    }
205
206    pub async fn join_cluster(&self) -> Result<()> {
207        self.role.join_cluster(&self.ctx).await
208    }
209
210    pub async fn run(&mut self) -> Result<()> {
211        // Add snapshot handler before main loop
212        if self.ctx.node_config.is_learner() {
213            info!(
214                "Node({}) is learner and needs to fetch initial snapshot.",
215                self.node_id
216            );
217            if let Err(e) = self.role.fetch_initial_snapshot(&self.ctx).await {
218                warn!(
219                    "Initial snapshot failed: {:?}.
220            ================================================
221            Leader has not generate snapshot yet. New node
222            will sync with Leader via append entries requests.
223            ================================================
224            ",
225                    e
226                );
227                println!(
228                    "
229            ================================================
230            Leader has not generate snapshot yet. New node
231            will sync with Leader via append entries requests
232            ================================================
233                    "
234                );
235            }
236        }
237
238        info!("Node is running");
239
240        if self.role.is_timer_expired() {
241            self.role.reset_timer();
242        }
243
244        loop {
245            // Note: next_deadline wil be reset in each role's tick function
246            let tick = sleep_until(self.role.next_deadline());
247
248            tokio::select! {
249                // Use biased to ensure branch order
250                biased;
251                // P0: shutdown received;
252                _ = self.shutdown_signal.changed() => {
253                    info!("[Raft:{}] shutdown signal received.", self.node_id);
254                    return Ok(());
255                }
256                // P1: Tick: start Heartbeat(replication) or start Election
257                _ = tick => {
258
259                    trace!("receive tick");
260                    let role_tx = &self.role_tx;
261                    let event_tx = &self.event_tx;
262
263                    if let Err(e) = self.role.tick(role_tx, event_tx, &self.ctx).await {
264                        error!("tick failed: {:?}", e);
265                    } else {
266                        trace!("tick success");
267                    }
268                }
269
270                // P2: Role events
271                Some(role_event) = self.role_rx.recv() => {
272                    debug!(%self.node_id, ?role_event, "receive role event");
273
274                    if let Err(e) = self.handle_role_event(role_event).await {
275                        error!(%self.node_id, ?e, "handle_role_event error");
276                    }
277                }
278
279                // P3: Other events
280                Some(raft_event) = self.event_rx.recv() => {
281                    trace!(%self.node_id, ?raft_event, "receive raft event");
282
283                    #[cfg(any(test, feature = "test-utils"))]
284                    let event = raft_event_to_test_event(&raft_event);
285
286                    if let Err(e) = self.role.handle_raft_event(raft_event, &self.ctx, self.role_tx.clone()).await {
287                        error!(%self.node_id, ?e, "handle_raft_event error");
288                    }
289
290                    #[cfg(any(test, feature = "test-utils"))]
291                    self.notify_raft_event(event);
292                }
293
294            }
295        }
296    }
297
298    /// `handle_role_event` will be responsbile to process role trasnsition and
299    /// role state events.
300    pub async fn handle_role_event(
301        &mut self,
302        role_event: RoleEvent,
303    ) -> Result<()> {
304        // All inbound and outbound raft event
305
306        match role_event {
307            RoleEvent::BecomeFollower(leader_id_option) => {
308                debug!("BecomeFollower");
309                self.role = self.role.become_follower()?;
310
311                // Reset vote when stepping down (new term, no vote yet)
312                self.role.state_mut().reset_voted_for()?;
313
314                // Notify leader change listeners
315                let current_term = self.role.current_term();
316                self.notify_leader_change(leader_id_option, current_term);
317
318                #[cfg(any(test, feature = "test-utils"))]
319                self.notify_role_transition();
320
321                //TODO: update membership
322            }
323            RoleEvent::BecomeCandidate => {
324                debug!("BecomeCandidate");
325                self.role = self.role.become_candidate()?;
326
327                // No leader during candidate state
328                let current_term = self.role.current_term();
329                self.notify_leader_change(None, current_term);
330
331                #[cfg(any(test, feature = "test-utils"))]
332                self.notify_role_transition();
333            }
334            RoleEvent::BecomeLeader => {
335                debug!("BecomeLeader");
336                self.role = self.role.become_leader()?;
337
338                // Mark vote as committed (candidate → leader transition)
339                let current_term = self.role.current_term();
340                let _ = self.role.state_mut().update_voted_for(VotedFor {
341                    voted_for_id: self.node_id,
342                    voted_for_term: current_term,
343                    committed: true,
344                })?;
345
346                // Notify leader change listeners: this node is now leader
347                self.notify_leader_change(Some(self.node_id), current_term);
348
349                let peer_ids = self.ctx.membership().get_peers_id_with_condition(|_| true).await;
350
351                self.role.init_peers_next_index_and_match_index(
352                    self.ctx.raft_log().last_entry_id(),
353                    peer_ids,
354                )?;
355
356                // Initialize cluster metadata cache for hot path optimization
357                self.role.state_mut().init_cluster_metadata(&self.ctx.membership()).await?;
358
359                //async action
360                if !self
361                    .role
362                    .verify_leadership_persistent(
363                        vec![EntryPayload::noop()],
364                        true,
365                        &self.ctx,
366                        &self.role_tx,
367                    )
368                    .await
369                    .unwrap_or(false)
370                {
371                    warn!(
372                        "Verify leadership in new term failed. Now the node is going to step back to Follower..."
373                    );
374                    self.role_tx.send(RoleEvent::BecomeFollower(None)).map_err(|e| {
375                        let error_str = format!("{e:?}");
376                        error!("Failed to send: {}", error_str);
377                        NetworkError::SingalSendFailed(error_str)
378                    })?;
379                }
380
381                #[cfg(any(test, feature = "test-utils"))]
382                self.notify_role_transition();
383            }
384            RoleEvent::BecomeLearner => {
385                debug!("BecomeLearner");
386                self.role = self.role.become_learner()?;
387
388                // Learner has no leader initially
389                let current_term = self.role.current_term();
390                self.notify_leader_change(None, current_term);
391
392                #[cfg(any(test, feature = "test-utils"))]
393                self.notify_role_transition();
394            }
395            RoleEvent::NotifyNewCommitIndex(new_commit_data) => {
396                debug!(
397                    ?new_commit_data,
398                    "[{}] RoleEvent::NotifyNewCommitIndex.", self.node_id,
399                );
400                self.notify_new_commit(new_commit_data);
401            }
402
403            RoleEvent::LeaderDiscovered(leader_id, term) => {
404                debug!("LeaderDiscovered: leader_id={}, term={}", leader_id, term);
405                // Notify leader change listeners - no state transition
406                // Note: mpsc channels do not deduplicate; consumers handle dedup if needed
407                self.notify_leader_change(Some(leader_id), term);
408            }
409
410            RoleEvent::ReprocessEvent(raft_event) => {
411                info!("Replay the RaftEvent: {:?}", &raft_event);
412                self.event_tx.send(*raft_event).await.map_err(|e| {
413                    let error_str = format!("{e:?}");
414                    error!("Failed to send: {}", error_str);
415                    NetworkError::SingalSendFailed(error_str)
416                })?;
417            }
418        };
419
420        Ok(())
421    }
422
423    pub fn register_new_commit_listener(
424        &mut self,
425        tx: mpsc::UnboundedSender<NewCommitData>,
426    ) {
427        self.new_commit_listener.push(tx);
428    }
429
430    pub fn notify_new_commit(
431        &self,
432        new_commit_data: NewCommitData,
433    ) {
434        debug!(?new_commit_data, "notify_new_commit",);
435
436        for tx in &self.new_commit_listener {
437            if let Err(e) = tx.send(new_commit_data.clone()) {
438                error!("notify_new_commit failed: {:?}", e);
439            }
440        }
441    }
442
443    #[cfg(any(test, feature = "test-utils"))]
444    pub fn register_role_transition_listener(
445        &mut self,
446        tx: mpsc::UnboundedSender<i32>,
447    ) {
448        self.test_role_transition_listener.push(tx);
449    }
450
451    #[cfg(any(test, feature = "test-utils"))]
452    pub fn notify_role_transition(&self) {
453        let new_role_i32 = self.role.as_i32();
454        for tx in &self.test_role_transition_listener {
455            tx.send(new_role_i32).expect("should succeed");
456        }
457    }
458
459    #[cfg(any(test, feature = "test-utils"))]
460    pub fn register_raft_event_listener(
461        &mut self,
462        tx: mpsc::UnboundedSender<super::TestEvent>,
463    ) {
464        self.test_raft_event_listener.push(tx);
465    }
466
467    #[cfg(any(test, feature = "test-utils"))]
468    pub fn notify_raft_event(
469        &self,
470        event: super::TestEvent,
471    ) {
472        debug!("unit test:: notify new raft event: {:?}", &event);
473
474        for tx in &self.test_raft_event_listener {
475            assert!(tx.send(event.clone()).is_ok(), "should succeed");
476        }
477    }
478
479    #[cfg(any(test, feature = "test-utils"))]
480    pub fn set_role(
481        &mut self,
482        role: RaftRole<T>,
483    ) {
484        self.role = role
485    }
486
487    /// Returns a cloned event sender for external use.
488    ///
489    /// This provides controlled access to send validated RaftEvents to the Raft core.
490    /// Events sent through this sender are still processed through the normal validation
491    /// pipeline in the main event loop.
492    ///
493    /// # Security Note
494    /// While this provides access to the event channel, all events are still validated
495    /// by the Raft state machine before being applied. The event handler in `handle_raft_event`
496    /// performs necessary checks based on current term, role, and state.
497    pub fn event_sender(&self) -> mpsc::Sender<RaftEvent> {
498        self.event_tx.clone()
499    }
500
501    /// Returns a cloned role event sender for internal use.
502    ///
503    /// # Warning
504    /// This is primarily for internal components that need to trigger role transitions.
505    /// External callers should not use this unless they understand the Raft protocol deeply.
506    #[doc(hidden)]
507    pub fn role_event_sender(&self) -> mpsc::UnboundedSender<RoleEvent> {
508        self.role_tx.clone()
509    }
510}
511
512impl<T> Drop for Raft<T>
513where
514    T: TypeConfig,
515{
516    fn drop(&mut self) {
517        info!("Raft been dropped.");
518
519        if let Err(e) = self
520            .ctx
521            .raft_log()
522            .save_hard_state(&self.role.state().shared_state().hard_state)
523        {
524            error!(?e, "State storage persist node hard state failed.");
525        }
526
527        info!("Graceful shutdown node state ...");
528    }
529}