d_engine_core/
raft.rs

1use std::sync::Arc;
2
3use d_engine_proto::common::EntryPayload;
4// Re-export LeaderInfo from proto (application layer use)
5pub use d_engine_proto::common::LeaderInfo;
6use d_engine_proto::server::election::VotedFor;
7use tokio::sync::mpsc;
8use tokio::sync::watch;
9use tokio::time::sleep_until;
10use tracing::debug;
11use tracing::error;
12use tracing::info;
13use tracing::trace;
14use tracing::warn;
15
16use super::NewCommitData;
17use super::RaftContext;
18use super::RaftCoreHandlers;
19use super::RaftEvent;
20use super::RaftRole;
21use super::RaftStorageHandles;
22use super::RoleEvent;
23#[cfg(test)]
24use super::raft_event_to_test_event;
25use crate::Membership;
26use crate::NetworkError;
27use crate::RaftLog;
28use crate::RaftNodeConfig;
29use crate::Result;
30use crate::TypeConfig;
31use crate::alias::MOF;
32use crate::alias::TROF;
33
34pub struct Raft<T>
35where
36    T: TypeConfig,
37{
38    pub node_id: u32,
39    pub role: RaftRole<T>,
40    pub ctx: RaftContext<T>,
41
42    // Network & Storage events
43    event_tx: mpsc::Sender<RaftEvent>,
44    event_rx: mpsc::Receiver<RaftEvent>,
45
46    // Timer
47    role_tx: mpsc::UnboundedSender<RoleEvent>,
48    role_rx: mpsc::UnboundedReceiver<RoleEvent>,
49
50    // For business logic to apply logs into state machine
51    new_commit_listener: Vec<mpsc::UnboundedSender<NewCommitData>>,
52
53    // Leader change notification
54    // Uses watch::Sender for efficient multi-subscriber pattern
55    leader_change_listener: Option<watch::Sender<Option<LeaderInfo>>>,
56
57    // Shutdown signal
58    shutdown_signal: watch::Receiver<()>,
59
60    // For unit test
61    #[cfg(test)]
62    test_role_transition_listener: Vec<mpsc::UnboundedSender<i32>>,
63
64    #[cfg(test)]
65    test_raft_event_listener: Vec<mpsc::UnboundedSender<super::TestEvent>>,
66}
67
68pub struct SignalParams {
69    pub(crate) role_tx: mpsc::UnboundedSender<RoleEvent>,
70    pub(crate) role_rx: mpsc::UnboundedReceiver<RoleEvent>,
71    pub(crate) event_tx: mpsc::Sender<RaftEvent>,
72    pub(crate) event_rx: mpsc::Receiver<RaftEvent>,
73    pub(crate) shutdown_signal: watch::Receiver<()>,
74}
75
76impl SignalParams {
77    /// Creates a new SignalParams with the provided channels.
78    ///
79    /// This is the only way to construct SignalParams from outside d-engine-core,
80    /// ensuring controlled initialization of the internal communication channels.
81    pub fn new(
82        role_tx: mpsc::UnboundedSender<RoleEvent>,
83        role_rx: mpsc::UnboundedReceiver<RoleEvent>,
84        event_tx: mpsc::Sender<RaftEvent>,
85        event_rx: mpsc::Receiver<RaftEvent>,
86        shutdown_signal: watch::Receiver<()>,
87    ) -> Self {
88        Self {
89            role_tx,
90            role_rx,
91            event_tx,
92            event_rx,
93            shutdown_signal,
94        }
95    }
96}
97
98impl<T> Raft<T>
99where
100    T: TypeConfig,
101{
102    #[allow(clippy::too_many_arguments)]
103    pub fn new(
104        node_id: u32,
105        role: RaftRole<T>,
106        storage: RaftStorageHandles<T>,
107        transport: TROF<T>,
108        handlers: RaftCoreHandlers<T>,
109        membership: Arc<MOF<T>>,
110        signal_params: SignalParams,
111        node_config: Arc<RaftNodeConfig>,
112    ) -> Self {
113        let ctx = Self::build_context(
114            node_id,
115            storage,
116            transport,
117            membership,
118            handlers,
119            node_config.clone(),
120        );
121
122        Raft {
123            node_id,
124            ctx,
125            role,
126
127            event_tx: signal_params.event_tx,
128            event_rx: signal_params.event_rx,
129
130            role_tx: signal_params.role_tx,
131            role_rx: signal_params.role_rx,
132
133            new_commit_listener: Vec::new(),
134
135            shutdown_signal: signal_params.shutdown_signal,
136
137            leader_change_listener: None,
138
139            #[cfg(test)]
140            test_role_transition_listener: Vec::new(),
141
142            #[cfg(test)]
143            test_raft_event_listener: Vec::new(),
144        }
145    }
146
147    /// Register a listener for leader election events.
148    ///
149    /// The listener will receive LeaderInfo updates:
150    /// - Some(LeaderInfo) when a leader is elected
151    /// - None when no leader exists (during election)
152    ///
153    /// # Performance
154    /// Event-driven notification (no polling), multi-subscriber support via watch channel
155    pub fn register_leader_change_listener(
156        &mut self,
157        tx: watch::Sender<Option<LeaderInfo>>,
158    ) {
159        self.leader_change_listener = Some(tx);
160    }
161
162    /// Notify all leader change listeners.
163    ///
164    /// Called internally when role transitions occur.
165    /// Uses send_if_modified to avoid redundant notifications.
166    fn notify_leader_change(
167        &self,
168        leader_id: Option<u32>,
169        term: u64,
170    ) {
171        if let Some(tx) = &self.leader_change_listener {
172            tx.send_if_modified(|current| {
173                let new_info = leader_id.map(|id| LeaderInfo {
174                    leader_id: id,
175                    term,
176                });
177                if *current != new_info {
178                    *current = new_info;
179                    true
180                } else {
181                    false
182                }
183            });
184        }
185    }
186
187    fn build_context(
188        id: u32,
189        storage: RaftStorageHandles<T>,
190        transport: TROF<T>,
191        membership: Arc<MOF<T>>,
192        handlers: RaftCoreHandlers<T>,
193        node_config: Arc<RaftNodeConfig>,
194    ) -> RaftContext<T> {
195        RaftContext {
196            node_id: id,
197            storage,
198            transport: Arc::new(transport),
199            membership,
200            handlers,
201
202            node_config,
203        }
204    }
205
206    pub async fn join_cluster(&self) -> Result<()> {
207        self.role.join_cluster(&self.ctx).await
208    }
209
210    pub async fn run(&mut self) -> Result<()> {
211        // Add snapshot handler before main loop
212        if self.ctx.node_config.is_learner() {
213            info!(
214                "Node({}) is learner and needs to fetch initial snapshot.",
215                self.node_id
216            );
217            if let Err(e) = self.role.fetch_initial_snapshot(&self.ctx).await {
218                warn!(
219                    "Initial snapshot failed: {:?}.
220            ================================================
221            Leader has not generate snapshot yet. New node
222            will sync with Leader via append entries requests.
223            ================================================
224            ",
225                    e
226                );
227                println!(
228                    "
229            ================================================
230            Leader has not generate snapshot yet. New node
231            will sync with Leader via append entries requests
232            ================================================
233                    "
234                );
235            }
236        }
237
238        info!("Node is running");
239
240        if self.role.is_timer_expired() {
241            self.role.reset_timer();
242        }
243
244        loop {
245            // Note: next_deadline wil be reset in each role's tick function
246            let tick = sleep_until(self.role.next_deadline());
247
248            tokio::select! {
249                // Use biased to ensure branch order
250                biased;
251                // P0: shutdown received;
252                _ = self.shutdown_signal.changed() => {
253                    info!("[Raft:{}] shutdown signal received.", self.node_id);
254                    return Ok(());
255                }
256                // P1: Tick: start Heartbeat(replication) or start Election
257                _ = tick => {
258
259                    trace!("receive tick");
260                    let role_tx = &self.role_tx;
261                    let event_tx = &self.event_tx;
262
263                    if let Err(e) = self.role.tick(role_tx, event_tx, &self.ctx).await {
264                        error!("tick failed: {:?}", e);
265                    } else {
266                        trace!("tick success");
267                    }
268                }
269
270                // P2: Role events
271                Some(role_event) = self.role_rx.recv() => {
272                    debug!(%self.node_id, ?role_event, "receive role event");
273
274                    if let Err(e) = self.handle_role_event(role_event).await {
275                        error!(%self.node_id, ?e, "handle_role_event error");
276                    }
277                }
278
279                // P3: Other events
280                Some(raft_event) = self.event_rx.recv() => {
281                    trace!(%self.node_id, ?raft_event, "receive raft event");
282
283                    #[cfg(test)]
284                    let event = raft_event_to_test_event(&raft_event);
285
286                    if let Err(e) = self.role.handle_raft_event(raft_event, &self.ctx, self.role_tx.clone()).await {
287                        error!(%self.node_id, ?e, "handle_raft_event error");
288                    }
289
290                    #[cfg(test)]
291                    self.notify_raft_event(event);
292                }
293
294            }
295        }
296    }
297
298    /// `handle_role_event` will be responsbile to process role trasnsition and
299    /// role state events.
300    pub async fn handle_role_event(
301        &mut self,
302        role_event: RoleEvent,
303    ) -> Result<()> {
304        // All inbound and outbound raft event
305
306        match role_event {
307            RoleEvent::BecomeFollower(leader_id_option) => {
308                // Drain read buffer before stepping down
309                if let Err(e) = self.role.drain_read_buffer() {
310                    tracing::warn!("Failed to drain read buffer during role transition: {e:?}");
311                }
312
313                debug!("BecomeFollower");
314                self.role = self.role.become_follower()?;
315
316                // Reset vote when stepping down (new term, no vote yet)
317                self.role.state_mut().reset_voted_for()?;
318
319                // Notify leader change listeners
320                let current_term = self.role.current_term();
321                self.notify_leader_change(leader_id_option, current_term);
322
323                #[cfg(test)]
324                self.notify_role_transition();
325
326                //TODO: update membership
327            }
328            RoleEvent::BecomeCandidate => {
329                // Drain read buffer before stepping down
330                if let Err(e) = self.role.drain_read_buffer() {
331                    tracing::warn!("Failed to drain read buffer during role transition: {e:?}");
332                }
333
334                debug!("BecomeCandidate");
335                self.role = self.role.become_candidate()?;
336
337                // No leader during candidate state
338                let current_term = self.role.current_term();
339                self.notify_leader_change(None, current_term);
340
341                #[cfg(test)]
342                self.notify_role_transition();
343            }
344            RoleEvent::BecomeLeader => {
345                debug!("BecomeLeader");
346                self.role = self.role.become_leader()?;
347
348                // Mark vote as committed (candidate → leader transition)
349                let current_term = self.role.current_term();
350                let _ = self.role.state_mut().update_voted_for(VotedFor {
351                    voted_for_id: self.node_id,
352                    voted_for_term: current_term,
353                    committed: true,
354                })?;
355
356                // Notify leader change listeners: this node is now leader
357                self.notify_leader_change(Some(self.node_id), current_term);
358
359                let peer_ids = self.ctx.membership().get_peers_id_with_condition(|_| true).await;
360
361                self.role.init_peers_next_index_and_match_index(
362                    self.ctx.raft_log().last_entry_id(),
363                    peer_ids,
364                )?;
365
366                // Initialize cluster metadata cache for hot path optimization
367                self.role.state_mut().init_cluster_metadata(&self.ctx.membership()).await?;
368
369                //async action
370                if !self
371                    .role
372                    .verify_leadership_persistent(
373                        vec![EntryPayload::noop()],
374                        true,
375                        &self.ctx,
376                        &self.role_tx,
377                    )
378                    .await
379                    .unwrap_or(false)
380                {
381                    warn!(
382                        "Verify leadership in new term failed. Now the node is going to step back to Follower..."
383                    );
384                    self.role_tx.send(RoleEvent::BecomeFollower(None)).map_err(|e| {
385                        let error_str = format!("{e:?}");
386                        error!("Failed to send: {}", error_str);
387                        NetworkError::SingalSendFailed(error_str)
388                    })?;
389                } else {
390                    // Track no-op index for linearizable read optimization (best-effort)
391                    if let Err(e) = self.role.on_noop_committed(&self.ctx) {
392                        warn!(
393                            ?e,
394                            "Failed to track no-op commit index after leadership verification"
395                        );
396                    }
397                }
398
399                #[cfg(test)]
400                self.notify_role_transition();
401            }
402            RoleEvent::BecomeLearner => {
403                // Drain read buffer before stepping down
404                if let Err(e) = self.role.drain_read_buffer() {
405                    tracing::warn!("Failed to drain read buffer during role transition: {e:?}");
406                }
407
408                debug!("BecomeLearner");
409                self.role = self.role.become_learner()?;
410
411                // Learner has no leader initially
412                let current_term = self.role.current_term();
413                self.notify_leader_change(None, current_term);
414
415                #[cfg(test)]
416                self.notify_role_transition();
417            }
418            RoleEvent::NotifyNewCommitIndex(new_commit_data) => {
419                debug!(
420                    ?new_commit_data,
421                    "[{}] RoleEvent::NotifyNewCommitIndex.", self.node_id,
422                );
423                self.notify_new_commit(new_commit_data);
424            }
425
426            RoleEvent::LeaderDiscovered(leader_id, term) => {
427                debug!("LeaderDiscovered: leader_id={}, term={}", leader_id, term);
428                // Notify leader change listeners - no state transition
429                // Note: mpsc channels do not deduplicate; consumers handle dedup if needed
430                self.notify_leader_change(Some(leader_id), term);
431            }
432
433            RoleEvent::ReprocessEvent(raft_event) => {
434                info!("Replay the RaftEvent: {:?}", &raft_event);
435                self.event_tx.send(*raft_event).await.map_err(|e| {
436                    let error_str = format!("{e:?}");
437                    error!("Failed to send: {}", error_str);
438                    NetworkError::SingalSendFailed(error_str)
439                })?;
440            }
441        };
442
443        Ok(())
444    }
445
446    pub fn register_new_commit_listener(
447        &mut self,
448        tx: mpsc::UnboundedSender<NewCommitData>,
449    ) {
450        self.new_commit_listener.push(tx);
451    }
452
453    pub fn notify_new_commit(
454        &self,
455        new_commit_data: NewCommitData,
456    ) {
457        debug!(?new_commit_data, "notify_new_commit",);
458
459        for tx in &self.new_commit_listener {
460            if let Err(e) = tx.send(new_commit_data.clone()) {
461                error!("notify_new_commit failed: {:?}", e);
462            }
463        }
464    }
465
466    #[cfg(test)]
467    pub fn register_role_transition_listener(
468        &mut self,
469        tx: mpsc::UnboundedSender<i32>,
470    ) {
471        self.test_role_transition_listener.push(tx);
472    }
473
474    #[cfg(test)]
475    pub fn notify_role_transition(&self) {
476        let new_role_i32 = self.role.as_i32();
477        for tx in &self.test_role_transition_listener {
478            tx.send(new_role_i32).expect("should succeed");
479        }
480    }
481
482    #[cfg(test)]
483    pub fn register_raft_event_listener(
484        &mut self,
485        tx: mpsc::UnboundedSender<super::TestEvent>,
486    ) {
487        self.test_raft_event_listener.push(tx);
488    }
489
490    #[cfg(test)]
491    pub fn notify_raft_event(
492        &self,
493        event: super::TestEvent,
494    ) {
495        debug!("unit test:: notify new raft event: {:?}", &event);
496
497        for tx in &self.test_raft_event_listener {
498            assert!(tx.send(event.clone()).is_ok(), "should succeed");
499        }
500    }
501
502    #[cfg(test)]
503    pub fn set_role(
504        &mut self,
505        role: RaftRole<T>,
506    ) {
507        self.role = role
508    }
509
510    /// Returns a cloned event sender for external use.
511    ///
512    /// This provides controlled access to send validated RaftEvents to the Raft core.
513    /// Events sent through this sender are still processed through the normal validation
514    /// pipeline in the main event loop.
515    ///
516    /// # Security Note
517    /// While this provides access to the event channel, all events are still validated
518    /// by the Raft state machine before being applied. The event handler in `handle_raft_event`
519    /// performs necessary checks based on current term, role, and state.
520    pub fn event_sender(&self) -> mpsc::Sender<RaftEvent> {
521        self.event_tx.clone()
522    }
523
524    /// Returns a cloned role event sender for internal use.
525    ///
526    /// # Warning
527    /// This is primarily for internal components that need to trigger role transitions.
528    /// External callers should not use this unless they understand the Raft protocol deeply.
529    #[doc(hidden)]
530    pub fn role_event_sender(&self) -> mpsc::UnboundedSender<RoleEvent> {
531        self.role_tx.clone()
532    }
533}
534
535impl<T> Drop for Raft<T>
536where
537    T: TypeConfig,
538{
539    fn drop(&mut self) {
540        info!("Raft been dropped.");
541
542        if let Err(e) = self
543            .ctx
544            .raft_log()
545            .save_hard_state(&self.role.state().shared_state().hard_state)
546        {
547            error!(?e, "State storage persist node hard state failed.");
548        }
549
550        info!("Graceful shutdown node state ...");
551    }
552}