Skip to main content

d_engine_core/
raft.rs

1use std::sync::Arc;
2
3use d_engine_proto::common::EntryPayload;
4// Re-export LeaderInfo from proto (application layer use)
5pub use d_engine_proto::common::LeaderInfo;
6use d_engine_proto::server::election::VotedFor;
7use tokio::sync::mpsc;
8use tokio::sync::watch;
9use tokio::time::sleep_until;
10use tracing::debug;
11use tracing::error;
12use tracing::info;
13use tracing::trace;
14use tracing::warn;
15
16use super::NewCommitData;
17use super::RaftContext;
18use super::RaftCoreHandlers;
19use super::RaftEvent;
20use super::RaftRole;
21use super::RaftStorageHandles;
22use super::RoleEvent;
23#[cfg(test)]
24use super::raft_event_to_test_event;
25use crate::Membership;
26use crate::NetworkError;
27use crate::RaftLog;
28use crate::RaftNodeConfig;
29use crate::Result;
30use crate::TypeConfig;
31use crate::alias::MOF;
32use crate::alias::TROF;
33
34pub struct Raft<T>
35where
36    T: TypeConfig,
37{
38    pub node_id: u32,
39    pub role: RaftRole<T>,
40    pub ctx: RaftContext<T>,
41
42    // Network & Storage events
43    event_tx: mpsc::Sender<RaftEvent>,
44    event_rx: mpsc::Receiver<RaftEvent>,
45
46    // Client commands (drain-driven)
47    cmd_tx: mpsc::UnboundedSender<super::ClientCmd>,
48    cmd_rx: mpsc::UnboundedReceiver<super::ClientCmd>,
49
50    // Timer
51    role_tx: mpsc::UnboundedSender<RoleEvent>,
52    role_rx: mpsc::UnboundedReceiver<RoleEvent>,
53
54    // For business logic to apply logs into state machine
55    new_commit_listener: Vec<mpsc::UnboundedSender<NewCommitData>>,
56
57    // Leader change notification
58    // Uses watch::Sender for efficient multi-subscriber pattern
59    leader_change_listener: Option<watch::Sender<Option<LeaderInfo>>>,
60
61    // Shutdown signal
62    shutdown_signal: watch::Receiver<()>,
63
64    // For unit test
65    #[cfg(test)]
66    test_role_transition_listener: Vec<mpsc::UnboundedSender<i32>>,
67
68    #[cfg(test)]
69    test_raft_event_listener: Vec<mpsc::UnboundedSender<super::TestEvent>>,
70}
71
72pub struct SignalParams {
73    pub(crate) role_tx: mpsc::UnboundedSender<RoleEvent>,
74    pub(crate) role_rx: mpsc::UnboundedReceiver<RoleEvent>,
75    pub(crate) event_tx: mpsc::Sender<RaftEvent>,
76    pub(crate) event_rx: mpsc::Receiver<RaftEvent>,
77    pub(crate) cmd_tx: mpsc::UnboundedSender<super::ClientCmd>,
78    pub(crate) cmd_rx: mpsc::UnboundedReceiver<super::ClientCmd>,
79    pub(crate) shutdown_signal: watch::Receiver<()>,
80}
81
82impl SignalParams {
83    /// Creates a new SignalParams with the provided channels.
84    ///
85    /// This is the only way to construct SignalParams from outside d-engine-core,
86    /// ensuring controlled initialization of the internal communication channels.
87    pub fn new(
88        role_tx: mpsc::UnboundedSender<RoleEvent>,
89        role_rx: mpsc::UnboundedReceiver<RoleEvent>,
90        event_tx: mpsc::Sender<RaftEvent>,
91        event_rx: mpsc::Receiver<RaftEvent>,
92        cmd_tx: mpsc::UnboundedSender<super::ClientCmd>,
93        cmd_rx: mpsc::UnboundedReceiver<super::ClientCmd>,
94        shutdown_signal: watch::Receiver<()>,
95    ) -> Self {
96        Self {
97            role_tx,
98            role_rx,
99            event_tx,
100            event_rx,
101            cmd_tx,
102            cmd_rx,
103            shutdown_signal,
104        }
105    }
106}
107
108impl<T> Raft<T>
109where
110    T: TypeConfig,
111{
112    #[allow(clippy::too_many_arguments)]
113    pub fn new(
114        node_id: u32,
115        role: RaftRole<T>,
116        storage: RaftStorageHandles<T>,
117        transport: TROF<T>,
118        handlers: RaftCoreHandlers<T>,
119        membership: Arc<MOF<T>>,
120        signal_params: SignalParams,
121        node_config: Arc<RaftNodeConfig>,
122    ) -> Self {
123        let ctx = Self::build_context(
124            node_id,
125            storage,
126            transport,
127            membership,
128            handlers,
129            node_config.clone(),
130        );
131
132        Raft {
133            node_id,
134            ctx,
135            role,
136
137            event_tx: signal_params.event_tx,
138            event_rx: signal_params.event_rx,
139
140            cmd_tx: signal_params.cmd_tx,
141            cmd_rx: signal_params.cmd_rx,
142
143            role_tx: signal_params.role_tx,
144            role_rx: signal_params.role_rx,
145
146            new_commit_listener: Vec::new(),
147
148            shutdown_signal: signal_params.shutdown_signal,
149
150            leader_change_listener: None,
151
152            #[cfg(test)]
153            test_role_transition_listener: Vec::new(),
154
155            #[cfg(test)]
156            test_raft_event_listener: Vec::new(),
157        }
158    }
159
160    /// Register a listener for leader election events.
161    ///
162    /// The listener will receive LeaderInfo updates:
163    /// - Some(LeaderInfo) when a leader is elected
164    /// - None when no leader exists (during election)
165    ///
166    /// # Performance
167    /// Event-driven notification (no polling), multi-subscriber support via watch channel
168    pub fn register_leader_change_listener(
169        &mut self,
170        tx: watch::Sender<Option<LeaderInfo>>,
171    ) {
172        self.leader_change_listener = Some(tx);
173    }
174
175    /// Notify all leader change listeners.
176    ///
177    /// Called internally when role transitions occur.
178    /// Uses send_if_modified to avoid redundant notifications.
179    fn notify_leader_change(
180        &self,
181        leader_id: Option<u32>,
182        term: u64,
183    ) {
184        if let Some(tx) = &self.leader_change_listener {
185            tx.send_if_modified(|current| {
186                let new_info = leader_id.map(|id| LeaderInfo {
187                    leader_id: id,
188                    term,
189                });
190                if *current != new_info {
191                    *current = new_info;
192                    true
193                } else {
194                    false
195                }
196            });
197        }
198    }
199
200    fn build_context(
201        id: u32,
202        storage: RaftStorageHandles<T>,
203        transport: TROF<T>,
204        membership: Arc<MOF<T>>,
205        handlers: RaftCoreHandlers<T>,
206        node_config: Arc<RaftNodeConfig>,
207    ) -> RaftContext<T> {
208        RaftContext {
209            node_id: id,
210            storage,
211            transport: Arc::new(transport),
212            membership,
213            handlers,
214
215            node_config,
216        }
217    }
218
219    pub async fn join_cluster(&self) -> Result<()> {
220        self.role.join_cluster(&self.ctx).await
221    }
222
223    pub async fn run(&mut self) -> Result<()> {
224        // Add snapshot handler before main loop
225        if self.ctx.node_config.is_learner() {
226            info!(
227                "Node({}) is learner and needs to fetch initial snapshot.",
228                self.node_id
229            );
230            if let Err(e) = self.role.fetch_initial_snapshot(&self.ctx).await {
231                warn!(
232                    "Initial snapshot failed: {:?}.
233            ================================================
234            Leader has not generate snapshot yet. New node
235            will sync with Leader via append entries requests.
236            ================================================
237            ",
238                    e
239                );
240                println!(
241                    "
242            ================================================
243            Leader has not generate snapshot yet. New node
244            will sync with Leader via append entries requests
245            ================================================
246                    "
247                );
248            }
249        }
250
251        info!("Node is running");
252
253        if self.role.is_timer_expired() {
254            self.role.reset_timer();
255        }
256
257        loop {
258            // Note: next_deadline wil be reset in each role's tick function
259            let tick = sleep_until(self.role.next_deadline());
260
261            tokio::select! {
262                // Use biased to ensure branch order
263                biased;
264                // P0: shutdown received;
265                _ = self.shutdown_signal.changed() => {
266                    info!("[Raft:{}] shutdown signal received.", self.node_id);
267                    return Ok(());
268                }
269                // P1: Tick: start Heartbeat(replication) or start Election
270                _ = tick => {
271
272                    trace!("receive tick");
273                    let role_tx = &self.role_tx;
274                    let event_tx = &self.event_tx;
275
276                    if let Err(e) = self.role.tick(role_tx, event_tx, &self.ctx).await {
277                        error!("tick failed: {:?}", e);
278                    } else {
279                        trace!("tick success");
280                    }
281                }
282
283                // P2: Role events
284                Some(role_event) = self.role_rx.recv() => {
285                    debug!(%self.node_id, ?role_event, "receive role event");
286
287                    if let Err(e) = self.handle_role_event(role_event).await {
288                        error!(%self.node_id, ?e, "handle_role_event error");
289                    }
290                }
291
292                // P3: Client commands (drain-driven batch with RPC merge)
293                Some(first_cmd) = self.cmd_rx.recv() => {
294                    trace!(%self.node_id, "receive first client command");
295
296                    // Push first command and drain rest (direct push for zero-copy)
297                    self.role.push_client_cmd(first_cmd, &self.ctx);
298
299                    // Drain all pending commands from channel (max_batch_size limit)
300                    let max_batch = self.ctx.node_config.raft.batching.max_batch_size;
301                    let mut count = 1;
302
303                    while count < max_batch {
304                        match self.cmd_rx.try_recv() {
305                            Ok(cmd) => {
306                                self.role.push_client_cmd(cmd, &self.ctx);
307                                count += 1;
308                            }
309                            Err(_) => break,
310                        }
311                    }
312
313                    trace!("Drained {} client commands", count);
314
315                    // Flush buffers if thresholds reached
316                    if let Err(e) = self.role.flush_cmd_buffers(&self.ctx, &self.role_tx).await {
317                        error!(%self.node_id, ?e, "flush_cmd_buffers error");
318                        return Err(e);
319                    }
320                }
321
322                // P4: Other events
323                Some(raft_event) = self.event_rx.recv() => {
324                    trace!(%self.node_id, ?raft_event, "receive raft event");
325
326                    #[cfg(test)]
327                    let event = raft_event_to_test_event(&raft_event);
328
329                    if let Err(e) = self.role.handle_raft_event(raft_event, &self.ctx, self.role_tx.clone()).await {
330                        error!(%self.node_id, ?e, "handle_raft_event error");
331                        // Fatal errors from SM Worker will be caught here and propagated
332                        return Err(e);
333                    }
334
335                    #[cfg(test)]
336                    self.notify_raft_event(event);
337                }
338
339            }
340        }
341    }
342
343    /// `handle_role_event` will be responsbile to process role trasnsition and
344    /// role state events.
345    pub async fn handle_role_event(
346        &mut self,
347        role_event: RoleEvent,
348    ) -> Result<()> {
349        // All inbound and outbound raft event
350
351        match role_event {
352            RoleEvent::BecomeFollower(leader_id_option) => {
353                // Drain read buffer when stepping down from Leader; skip otherwise.
354                let _ = self.role.drain_read_buffer();
355
356                debug!("BecomeFollower");
357                self.role = self.role.become_follower()?;
358
359                // Reset vote when stepping down (new term, no vote yet)
360                self.role.state_mut().reset_voted_for()?;
361
362                // Notify leader change listeners
363                let current_term = self.role.current_term();
364                self.notify_leader_change(leader_id_option, current_term);
365
366                #[cfg(test)]
367                self.notify_role_transition();
368
369                //TODO: update membership
370            }
371            RoleEvent::BecomeCandidate => {
372                // Drain read buffer when stepping down from Leader; skip otherwise.
373                let _ = self.role.drain_read_buffer();
374
375                debug!("BecomeCandidate");
376                self.role = self.role.become_candidate()?;
377
378                // No leader during candidate state
379                let current_term = self.role.current_term();
380                self.notify_leader_change(None, current_term);
381
382                #[cfg(test)]
383                self.notify_role_transition();
384            }
385            RoleEvent::BecomeLeader => {
386                debug!("BecomeLeader");
387                self.role = self.role.become_leader()?;
388
389                // Mark vote as committed (candidate → leader transition)
390                let current_term = self.role.current_term();
391                let _ = self.role.state_mut().update_voted_for(VotedFor {
392                    voted_for_id: self.node_id,
393                    voted_for_term: current_term,
394                    committed: true,
395                })?;
396
397                let peer_ids = self.ctx.membership().get_peers_id_with_condition(|_| true).await;
398
399                self.role.init_peers_next_index_and_match_index(
400                    self.ctx.raft_log().last_entry_id(),
401                    peer_ids,
402                )?;
403
404                // Initialize cluster metadata cache for hot path optimization
405                self.role.state_mut().init_cluster_metadata(&self.ctx.membership()).await?;
406
407                //async action
408                if !self
409                    .role
410                    .verify_leadership_persistent(
411                        vec![EntryPayload::noop()],
412                        &self.ctx,
413                        &self.role_tx,
414                    )
415                    .await
416                    .unwrap_or(false)
417                {
418                    warn!(
419                        "Verify leadership in new term failed. Now the node is going to step back to Follower..."
420                    );
421                    self.role_tx.send(RoleEvent::BecomeFollower(None)).map_err(|e| {
422                        let error_str = format!("{e:?}");
423                        error!("Failed to send: {}", error_str);
424                        NetworkError::SingalSendFailed(error_str)
425                    })?;
426                } else {
427                    // Track no-op index for linearizable read optimization (best-effort)
428                    if let Err(e) = self.role.on_noop_committed(&self.ctx) {
429                        warn!(
430                            ?e,
431                            "Failed to track no-op commit index after leadership verification"
432                        );
433                    } else {
434                        // Notify leader change listeners: this node is now leader
435                        self.notify_leader_change(Some(self.node_id), current_term);
436                    }
437                }
438
439                #[cfg(test)]
440                self.notify_role_transition();
441            }
442            RoleEvent::BecomeLearner => {
443                // Drain read buffer when stepping down from Leader; skip otherwise.
444                let _ = self.role.drain_read_buffer();
445
446                debug!("BecomeLearner");
447                self.role = self.role.become_learner()?;
448
449                // Learner has no leader initially
450                let current_term = self.role.current_term();
451                self.notify_leader_change(None, current_term);
452
453                #[cfg(test)]
454                self.notify_role_transition();
455            }
456            RoleEvent::NotifyNewCommitIndex(mut new_commit_data) => {
457                // Drain all pending NotifyNewCommitIndex events (max_batch_size limit)
458                // This batches multiple committed entries into a single notification
459                let max_batch = self.ctx.node_config.raft.batching.max_batch_size;
460                let mut count = 1;
461
462                while count < max_batch {
463                    match self.role_rx.try_recv() {
464                        Ok(RoleEvent::NotifyNewCommitIndex(next)) => {
465                            // Only keep the largest commit_index
466                            if next.new_commit_index > new_commit_data.new_commit_index {
467                                new_commit_data = next;
468                            }
469                            count += 1;
470                        }
471                        Ok(other) => {
472                            self.role_tx.send(other).map_err(|e| {
473                                let error_str = format!("{e:?}");
474                                error!("Failed to resend role event: {}", error_str);
475                                crate::Error::Fatal(error_str)
476                            })?;
477                            break;
478                        }
479                        Err(_) => break,
480                    }
481                }
482
483                debug!(
484                    "[{}] NotifyNewCommitIndex drained: {} events, max_commit_index={}",
485                    self.node_id, count, new_commit_data.new_commit_index
486                );
487
488                self.notify_new_commit(new_commit_data);
489            }
490
491            RoleEvent::LeaderDiscovered(leader_id, term) => {
492                debug!("LeaderDiscovered: leader_id={}, term={}", leader_id, term);
493                // Notify leader change listeners - no state transition
494                // Note: mpsc channels do not deduplicate; consumers handle dedup if needed
495                self.notify_leader_change(Some(leader_id), term);
496            }
497
498            RoleEvent::ReprocessEvent(raft_event) => {
499                info!("Replay the RaftEvent: {:?}", &raft_event);
500                self.event_tx.send(*raft_event).await.map_err(|e| {
501                    let error_str = format!("{e:?}");
502                    error!("Failed to send: {}", error_str);
503                    NetworkError::SingalSendFailed(error_str)
504                })?;
505            }
506        };
507
508        Ok(())
509    }
510
511    pub fn register_new_commit_listener(
512        &mut self,
513        tx: mpsc::UnboundedSender<NewCommitData>,
514    ) {
515        self.new_commit_listener.push(tx);
516    }
517
518    pub fn notify_new_commit(
519        &self,
520        new_commit_data: NewCommitData,
521    ) {
522        debug!(?new_commit_data, "notify_new_commit",);
523
524        for tx in &self.new_commit_listener {
525            if let Err(e) = tx.send(new_commit_data.clone()) {
526                error!("notify_new_commit failed: {:?}", e);
527            }
528        }
529    }
530
531    #[cfg(test)]
532    pub fn register_role_transition_listener(
533        &mut self,
534        tx: mpsc::UnboundedSender<i32>,
535    ) {
536        self.test_role_transition_listener.push(tx);
537    }
538
539    #[cfg(test)]
540    pub fn notify_role_transition(&self) {
541        let new_role_i32 = self.role.as_i32();
542        for tx in &self.test_role_transition_listener {
543            tx.send(new_role_i32).expect("should succeed");
544        }
545    }
546
547    #[cfg(test)]
548    pub fn register_raft_event_listener(
549        &mut self,
550        tx: mpsc::UnboundedSender<super::TestEvent>,
551    ) {
552        self.test_raft_event_listener.push(tx);
553    }
554
555    #[cfg(test)]
556    pub fn notify_raft_event(
557        &self,
558        event: super::TestEvent,
559    ) {
560        debug!("unit test:: notify new raft event: {:?}", &event);
561
562        for tx in &self.test_raft_event_listener {
563            assert!(tx.send(event.clone()).is_ok(), "should succeed");
564        }
565    }
566
567    #[cfg(test)]
568    pub fn set_role(
569        &mut self,
570        role: RaftRole<T>,
571    ) {
572        self.role = role
573    }
574
575    /// Returns a cloned event sender for external use.
576    ///
577    /// This provides controlled access to send validated RaftEvents to the Raft core.
578    /// Events sent through this sender are still processed through the normal validation
579    /// pipeline in the main event loop.
580    ///
581    /// # Security Note
582    /// While this provides access to the event channel, all events are still validated
583    /// by the Raft state machine before being applied. The event handler in `handle_raft_event`
584    /// performs necessary checks based on current term, role, and state.
585    pub fn event_sender(&self) -> mpsc::Sender<RaftEvent> {
586        self.event_tx.clone()
587    }
588
589    pub fn cmd_sender(&self) -> mpsc::UnboundedSender<super::ClientCmd> {
590        self.cmd_tx.clone()
591    }
592
593    /// Returns a cloned role event sender for internal use.
594    ///
595    /// # Warning
596    /// This is primarily for internal components that need to trigger role transitions.
597    /// External callers should not use this unless they understand the Raft protocol deeply.
598    #[doc(hidden)]
599    pub fn role_event_sender(&self) -> mpsc::UnboundedSender<RoleEvent> {
600        self.role_tx.clone()
601    }
602}
603
604impl<T> Drop for Raft<T>
605where
606    T: TypeConfig,
607{
608    fn drop(&mut self) {
609        info!("Raft been dropped.");
610
611        if let Err(e) = self
612            .ctx
613            .raft_log()
614            .save_hard_state(&self.role.state().shared_state().hard_state)
615        {
616            error!(?e, "State storage persist node hard state failed.");
617        }
618
619        info!("Graceful shutdown node state ...");
620    }
621}