Skip to main content

d_engine_core/
raft.rs

1// Re-export LeaderInfo from proto (application layer use)
2pub use d_engine_proto::common::LeaderInfo;
3use d_engine_proto::server::election::VotedFor;
4use tokio::sync::mpsc;
5use tokio::sync::watch;
6use tokio::time::sleep_until;
7use tracing::debug;
8use tracing::error;
9use tracing::info;
10use tracing::trace;
11use tracing::warn;
12
13use super::NewCommitData;
14use super::RaftContext;
15use super::RaftCoreHandlers;
16use super::RaftEvent;
17use super::RaftRole;
18use super::RaftStorageHandles;
19use super::RoleEvent;
20#[cfg(test)]
21use super::raft_event_to_test_event;
22use crate::Membership;
23use crate::NetworkError;
24use crate::RaftLog;
25use crate::RaftNodeConfig;
26use crate::Result;
27use crate::TypeConfig;
28use crate::alias::MOF;
29use crate::alias::TROF;
30use std::sync::Arc;
31
32pub struct Raft<T>
33where
34    T: TypeConfig,
35{
36    pub node_id: u32,
37    pub role: RaftRole<T>,
38    pub ctx: RaftContext<T>,
39
40    // Network & Storage events
41    event_tx: mpsc::Sender<RaftEvent>,
42    event_rx: mpsc::Receiver<RaftEvent>,
43
44    // Client commands (drain-driven)
45    cmd_tx: mpsc::Sender<super::ClientCmd>,
46    cmd_rx: mpsc::Receiver<super::ClientCmd>,
47
48    // Timer
49    role_tx: mpsc::UnboundedSender<RoleEvent>,
50    role_rx: mpsc::UnboundedReceiver<RoleEvent>,
51
52    // For business logic to apply logs into state machine
53    new_commit_listener: Vec<mpsc::UnboundedSender<NewCommitData>>,
54
55    // Leader change notification
56    // Uses watch::Sender for efficient multi-subscriber pattern
57    leader_change_listener: Option<watch::Sender<Option<LeaderInfo>>>,
58
59    // Shutdown signal
60    shutdown_signal: watch::Receiver<()>,
61
62    // For unit test
63    #[cfg(test)]
64    test_role_transition_listener: Vec<mpsc::UnboundedSender<i32>>,
65
66    #[cfg(test)]
67    test_raft_event_listener: Vec<mpsc::UnboundedSender<super::TestEvent>>,
68}
69
70pub struct SignalParams {
71    pub(crate) role_tx: mpsc::UnboundedSender<RoleEvent>,
72    pub(crate) role_rx: mpsc::UnboundedReceiver<RoleEvent>,
73    pub(crate) event_tx: mpsc::Sender<RaftEvent>,
74    pub(crate) event_rx: mpsc::Receiver<RaftEvent>,
75    pub(crate) cmd_tx: mpsc::Sender<super::ClientCmd>,
76    pub(crate) cmd_rx: mpsc::Receiver<super::ClientCmd>,
77    pub(crate) shutdown_signal: watch::Receiver<()>,
78}
79
80impl SignalParams {
81    /// Creates a new SignalParams with the provided channels.
82    ///
83    /// This is the only way to construct SignalParams from outside d-engine-core,
84    /// ensuring controlled initialization of the internal communication channels.
85    pub fn new(
86        role_tx: mpsc::UnboundedSender<RoleEvent>,
87        role_rx: mpsc::UnboundedReceiver<RoleEvent>,
88        event_tx: mpsc::Sender<RaftEvent>,
89        event_rx: mpsc::Receiver<RaftEvent>,
90        cmd_tx: mpsc::Sender<super::ClientCmd>,
91        cmd_rx: mpsc::Receiver<super::ClientCmd>,
92        shutdown_signal: watch::Receiver<()>,
93    ) -> Self {
94        Self {
95            role_tx,
96            role_rx,
97            event_tx,
98            event_rx,
99            cmd_tx,
100            cmd_rx,
101            shutdown_signal,
102        }
103    }
104}
105
106impl<T> Raft<T>
107where
108    T: TypeConfig,
109{
110    #[allow(clippy::too_many_arguments)]
111    pub fn new(
112        node_id: u32,
113        role: RaftRole<T>,
114        storage: RaftStorageHandles<T>,
115        transport: TROF<T>,
116        handlers: RaftCoreHandlers<T>,
117        membership: Arc<MOF<T>>,
118        signal_params: SignalParams,
119        node_config: Arc<RaftNodeConfig>,
120    ) -> Self {
121        let ctx = Self::build_context(
122            node_id,
123            storage,
124            transport,
125            membership,
126            handlers,
127            node_config.clone(),
128        );
129
130        Raft {
131            node_id,
132            ctx,
133            role,
134
135            event_tx: signal_params.event_tx,
136            event_rx: signal_params.event_rx,
137
138            cmd_tx: signal_params.cmd_tx,
139            cmd_rx: signal_params.cmd_rx,
140
141            role_tx: signal_params.role_tx,
142            role_rx: signal_params.role_rx,
143
144            new_commit_listener: Vec::new(),
145
146            shutdown_signal: signal_params.shutdown_signal,
147
148            leader_change_listener: None,
149
150            #[cfg(test)]
151            test_role_transition_listener: Vec::new(),
152
153            #[cfg(test)]
154            test_raft_event_listener: Vec::new(),
155        }
156    }
157
158    /// Register a listener for leader election events.
159    ///
160    /// The listener will receive LeaderInfo updates:
161    /// - Some(LeaderInfo) when a leader is elected
162    /// - None when no leader exists (during election)
163    ///
164    /// # Performance
165    /// Event-driven notification (no polling), multi-subscriber support via watch channel
166    pub fn register_leader_change_listener(
167        &mut self,
168        tx: watch::Sender<Option<LeaderInfo>>,
169    ) {
170        self.leader_change_listener = Some(tx);
171    }
172
173    /// Notify all leader change listeners.
174    ///
175    /// Called internally when role transitions occur.
176    /// Uses send_if_modified to avoid redundant notifications.
177    fn notify_leader_change(
178        &self,
179        leader_id: Option<u32>,
180        term: u64,
181    ) {
182        if let Some(tx) = &self.leader_change_listener {
183            tx.send_if_modified(|current| {
184                let new_info = leader_id.map(|id| LeaderInfo {
185                    leader_id: id,
186                    term,
187                });
188                if *current != new_info {
189                    *current = new_info;
190                    true
191                } else {
192                    false
193                }
194            });
195        }
196    }
197
198    fn build_context(
199        id: u32,
200        storage: RaftStorageHandles<T>,
201        transport: TROF<T>,
202        membership: Arc<MOF<T>>,
203        handlers: RaftCoreHandlers<T>,
204        node_config: Arc<RaftNodeConfig>,
205    ) -> RaftContext<T> {
206        RaftContext {
207            node_id: id,
208            storage,
209            transport: Arc::new(transport),
210            membership,
211            handlers,
212
213            node_config,
214        }
215    }
216
217    pub async fn join_cluster(&self) -> Result<()> {
218        self.role.join_cluster(&self.ctx).await
219    }
220
221    pub async fn run(&mut self) -> Result<()> {
222        // Add snapshot handler before main loop
223        if self.ctx.node_config.is_learner() {
224            info!(
225                "Node({}) is learner and needs to fetch initial snapshot.",
226                self.node_id
227            );
228            if let Err(e) = self.role.fetch_initial_snapshot(&self.ctx).await {
229                warn!(
230                    "Initial snapshot failed: {:?}.
231            ================================================
232            Leader has not generate snapshot yet. New node
233            will sync with Leader via append entries requests.
234            ================================================
235            ",
236                    e
237                );
238                println!(
239                    "
240            ================================================
241            Leader has not generate snapshot yet. New node
242            will sync with Leader via append entries requests
243            ================================================
244                    "
245                );
246            }
247        }
248
249        info!("Node is running");
250
251        if self.role.is_timer_expired() {
252            self.role.reset_timer();
253        }
254
255        loop {
256            // Note: next_deadline wil be reset in each role's tick function
257            let tick = sleep_until(self.role.next_deadline());
258
259            tokio::select! {
260                // Use biased to ensure branch order
261                biased;
262                // P0: shutdown received;
263                _ = self.shutdown_signal.changed() => {
264                    info!("[Raft:{}] shutdown signal received.", self.node_id);
265                    // Close IO thread BEFORE returning (before runtime shutdown)
266                    // This ensures RocksDB file lock is released before tokio runtime shuts down
267                    self.ctx.storage.raft_log.close().await;
268                    // Unblock any tasks stuck in event_tx.send().await (e.g. gRPC stream handlers).
269                    // Without this, serve_with_shutdown never completes and Arc<Node> is never
270                    // released, keeping Arc<DB> alive and the RocksDB LOCK held indefinitely.
271                    self.event_rx.close();
272                    return Ok(());
273                }
274                // P1: Tick: start Heartbeat(replication) or start Election
275                _ = tick => {
276
277                    trace!("receive tick");
278                    let role_tx = &self.role_tx;
279                    let event_tx = &self.event_tx;
280
281                    if let Err(e) = self.role.tick(role_tx, event_tx, &self.ctx).await {
282                        error!("tick failed: {:?}", e);
283                    } else {
284                        trace!("tick success");
285                    }
286                }
287
288                // P2: Role events — handle first, drain rest after select
289                Some(role_event) = self.role_rx.recv() => {
290                    debug!(%self.node_id, ?role_event, "receive role event");
291
292                    if let Err(e) = self.handle_role_event(role_event).await {
293                        error!(%self.node_id, ?e, "handle_role_event error");
294                    }
295                }
296
297                // P3: Client commands — push first, drain rest after select
298                Some(first_cmd) = self.cmd_rx.recv() => {
299                    trace!(%self.node_id, "receive first client command");
300                    self.role.push_client_cmd(first_cmd, &self.ctx);
301                }
302
303                // P4: Other events — handle first, drain rest after select
304                Some(raft_event) = self.event_rx.recv() => {
305                    trace!(%self.node_id, ?raft_event, "receive raft event");
306
307                    #[cfg(test)]
308                    let event = raft_event_to_test_event(&raft_event);
309
310                    if let Err(e) = self.role.handle_raft_event(raft_event, &self.ctx, self.role_tx.clone()).await {
311                        if e.is_fatal() {
312                            error!(%self.node_id, ?e, "Fatal error in handle_raft_event, shutting down");
313                            return Err(e);
314                        }
315                        warn!(%self.node_id, ?e, "Non-fatal error in handle_raft_event, continuing");
316                    }
317
318                    #[cfg(test)]
319                    self.notify_raft_event(event);
320                }
321
322            }
323
324            // After any arm fires: drain all channels in order.
325            // role_rx first: processes ACKs/commits and sends responses to clients,
326            // naturally yielding at .await points so client tasks can enqueue their
327            // next writes into cmd_rx before drain_client_cmds runs.
328            tokio::task::yield_now().await;
329            self.drain_role_events().await?;
330            // Yield after role events so woken client tasks can enqueue their next
331            // writes into cmd_rx before drain_client_cmds runs.
332            tokio::task::yield_now().await;
333            self.drain_client_cmds().await?;
334            self.drain_raft_events().await?;
335        }
336    }
337
338    /// Drain all pending role events (up to max_batch_size).
339    async fn drain_role_events(&mut self) -> Result<()> {
340        let max = self.ctx.node_config.raft.batching.max_batch_size;
341        let mut count = 0;
342        while count < max {
343            match self.role_rx.try_recv() {
344                Ok(role_event) => {
345                    if let Err(e) = self.handle_role_event(role_event).await {
346                        error!(%self.node_id, ?e, "drain_role_events: handle_role_event error");
347                    }
348                    count += 1;
349                }
350                Err(_) => break,
351            }
352        }
353        Ok(())
354    }
355
356    /// Drain all pending client commands (up to max_batch_size) and flush.
357    async fn drain_client_cmds(&mut self) -> Result<()> {
358        let max = self.ctx.node_config.raft.batching.max_batch_size;
359        let mut count = 0;
360        while count < max {
361            match self.cmd_rx.try_recv() {
362                Ok(cmd) => {
363                    self.role.push_client_cmd(cmd, &self.ctx);
364                    count += 1;
365                }
366                Err(_) => break,
367            }
368        }
369        if count > 0 {
370            trace!("Drained {} client commands", count);
371        }
372        // Always flush: the P3 select arm may have pushed a command before this drain ran.
373        // flush_cmd_buffers checks is_empty() internally and is a no-op when nothing is buffered.
374        if let Err(e) = self.role.flush_cmd_buffers(&self.ctx, &self.role_tx).await {
375            error!(%self.node_id, ?e, "drain_client_cmds: flush_cmd_buffers error");
376            return Err(e);
377        }
378        Ok(())
379    }
380
381    /// Drain all pending raft events (up to max_batch_size).
382    async fn drain_raft_events(&mut self) -> Result<()> {
383        let max = self.ctx.node_config.raft.batching.max_batch_size;
384        let mut count = 0;
385        while count < max {
386            match self.event_rx.try_recv() {
387                Ok(raft_event) => {
388                    if let Err(e) = self
389                        .role
390                        .handle_raft_event(raft_event, &self.ctx, self.role_tx.clone())
391                        .await
392                    {
393                        if e.is_fatal() {
394                            error!(%self.node_id, ?e, "Fatal error in drain_raft_events, shutting down");
395                            return Err(e);
396                        }
397                        warn!(%self.node_id, ?e, "Non-fatal error in drain_raft_events, continuing");
398                    }
399                    count += 1;
400                }
401                Err(_) => break,
402            }
403        }
404        Ok(())
405    }
406
407    /// `handle_role_event` will be responsbile to process role trasnsition and
408    /// role state events.
409    pub async fn handle_role_event(
410        &mut self,
411        role_event: RoleEvent,
412    ) -> Result<()> {
413        // All inbound and outbound raft event
414
415        match role_event {
416            RoleEvent::BecomeFollower(leader_id_option) => {
417                // Drain read buffer when stepping down from Leader; skip otherwise.
418                let _ = self.role.drain_read_buffer();
419
420                debug!("BecomeFollower");
421                self.role = self.role.become_follower()?;
422
423                // Reset vote when stepping down (new term, no vote yet)
424                self.role.state_mut().reset_voted_for()?;
425
426                // Notify leader change listeners
427                let current_term = self.role.current_term();
428                self.notify_leader_change(leader_id_option, current_term);
429
430                #[cfg(test)]
431                self.notify_role_transition();
432
433                //TODO: update membership
434            }
435            RoleEvent::BecomeCandidate => {
436                // Drain read buffer when stepping down from Leader; skip otherwise.
437                let _ = self.role.drain_read_buffer();
438
439                debug!("BecomeCandidate");
440                self.role = self.role.become_candidate()?;
441
442                // No leader during candidate state
443                let current_term = self.role.current_term();
444                self.notify_leader_change(None, current_term);
445
446                #[cfg(test)]
447                self.notify_role_transition();
448            }
449            RoleEvent::BecomeLeader => {
450                debug!("BecomeLeader");
451                self.role = self.role.become_leader()?;
452
453                // Mark vote as committed (candidate → leader transition)
454                let current_term = self.role.current_term();
455                let _ = self.role.state_mut().update_voted_for(VotedFor {
456                    voted_for_id: self.node_id,
457                    voted_for_term: current_term,
458                    committed: true,
459                })?;
460
461                let peer_ids = self.ctx.membership().get_peers_id_with_condition(|_| true).await;
462
463                self.role.init_peers_next_index_and_match_index(
464                    self.ctx.raft_log().last_entry_id(),
465                    peer_ids,
466                )?;
467
468                // Initialize cluster metadata cache for hot path optimization
469                self.role.state_mut().init_cluster_metadata(&self.ctx.membership()).await?;
470
471                // Fire-and-forget noop to confirm quorum.
472                // Completion is delivered asynchronously via RoleEvent::NoopCommitted.
473                if let Err(e) = self.role.initiate_noop_commit(&self.ctx, &self.role_tx).await {
474                    warn!(?e, "initiate_noop_commit failed — stepping down");
475                    self.role_tx.send(RoleEvent::BecomeFollower(None)).map_err(|e| {
476                        let error_str = format!("{e:?}");
477                        error!("Failed to send: {}", error_str);
478                        NetworkError::SingalSendFailed(error_str)
479                    })?;
480                }
481
482                #[cfg(test)]
483                self.notify_role_transition();
484            }
485            RoleEvent::BecomeLearner => {
486                // Drain read buffer when stepping down from Leader; skip otherwise.
487                let _ = self.role.drain_read_buffer();
488
489                debug!("BecomeLearner");
490                self.role = self.role.become_learner()?;
491
492                // Learner has no leader initially
493                let current_term = self.role.current_term();
494                self.notify_leader_change(None, current_term);
495
496                #[cfg(test)]
497                self.notify_role_transition();
498            }
499            RoleEvent::NotifyNewCommitIndex(mut new_commit_data) => {
500                // Drain all pending NotifyNewCommitIndex events (max_batch_size limit)
501                // This batches multiple committed entries into a single notification
502                let max_batch = self.ctx.node_config.raft.batching.max_batch_size;
503                let mut count = 1;
504
505                while count < max_batch {
506                    match self.role_rx.try_recv() {
507                        Ok(RoleEvent::NotifyNewCommitIndex(next)) => {
508                            // Only keep the largest commit_index
509                            if next.new_commit_index > new_commit_data.new_commit_index {
510                                new_commit_data = next;
511                            }
512                            count += 1;
513                        }
514                        Ok(other) => {
515                            self.role_tx.send(other).map_err(|e| {
516                                let error_str = format!("{e:?}");
517                                error!("Failed to resend role event: {}", error_str);
518                                crate::Error::Fatal(error_str)
519                            })?;
520                            break;
521                        }
522                        Err(_) => break,
523                    }
524                }
525
526                debug!(
527                    "[{}] NotifyNewCommitIndex drained: {} events, max_commit_index={}",
528                    self.node_id, count, new_commit_data.new_commit_index
529                );
530
531                self.notify_new_commit(new_commit_data);
532            }
533
534            RoleEvent::LeaderDiscovered(leader_id, term) => {
535                debug!("LeaderDiscovered: leader_id={}, term={}", leader_id, term);
536                // Notify leader change listeners - no state transition
537                // Note: mpsc channels do not deduplicate; consumers handle dedup if needed
538                self.notify_leader_change(Some(leader_id), term);
539            }
540
541            RoleEvent::ReprocessEvent(raft_event) => {
542                info!("Replay the RaftEvent: {:?}", &raft_event);
543                self.event_tx.send(*raft_event).await.map_err(|e| {
544                    let error_str = format!("{e:?}");
545                    error!("Failed to send: {}", error_str);
546                    NetworkError::SingalSendFailed(error_str)
547                })?;
548            }
549
550            RoleEvent::LogFlushed { durable_index } => {
551                debug!("LogFlushed: durable_index={}", durable_index);
552                self.role.handle_log_flushed(durable_index, &self.ctx, &self.role_tx).await;
553            }
554
555            RoleEvent::AppendResult {
556                follower_id,
557                result,
558            } => {
559                debug!("AppendResult: follower_id={}", follower_id);
560                if let Err(e) = self
561                    .role
562                    .handle_append_result(follower_id, result, &self.ctx, &self.role_tx)
563                    .await
564                {
565                    error!("handle_append_result failed: {:?}", e);
566                }
567            }
568
569            RoleEvent::NoopCommitted { term } => {
570                debug!("NoopCommitted: term={}", term);
571                // on_noop_committed already called directly in drain_commit_actions.
572                // Only notify leader change listeners here (requires Raft<T> access).
573                self.notify_leader_change(Some(self.node_id), term);
574            }
575
576            RoleEvent::FatalError { source, error } => {
577                error!(%self.node_id, %source, %error, "Fatal error from SM worker — shutting down");
578                return Err(crate::Error::Fatal(format!("{source}: {error}")));
579            }
580
581            RoleEvent::ApplyCompleted {
582                last_index,
583                results,
584            } => {
585                // Routed via role_tx (P2) to avoid priority inversion against AppendEntries at P4.
586                if let Err(e) = self
587                    .role
588                    .handle_apply_completed(last_index, results, &self.ctx, &self.role_tx)
589                    .await
590                {
591                    if e.is_fatal() {
592                        error!(%self.node_id, ?e, "Fatal error in ApplyCompleted handler");
593                        return Err(e);
594                    }
595                    warn!(%self.node_id, ?e, "Non-fatal error in ApplyCompleted handler");
596                }
597            }
598
599            RoleEvent::PeerStreamError { peer_id } => {
600                debug!(%peer_id, "PeerStreamError: bidi stream disconnected, resetting next_index");
601                self.role.handle_peer_stream_error(peer_id);
602            }
603
604            RoleEvent::ZombieDetected(node_id) => {
605                debug!(%node_id, "ZombieDetected: forwarding to leader for BatchRemove");
606                if let Err(e) =
607                    self.role.handle_zombie_detected(node_id, &self.role_tx, &self.ctx).await
608                {
609                    error!(%node_id, ?e, "handle_zombie_detected failed");
610                }
611            }
612
613            RoleEvent::SnapshotPushCompleted { peer_id, success } => {
614                debug!(%peer_id, %success, "SnapshotPushCompleted");
615                if success {
616                    // Reset next_index to last_entry_id + 1 so the peer is no longer below
617                    // the purge boundary and resumes AppendEntries on the next heartbeat.
618                    // Using last_entry_id follows Raft convention (nextIndex = leader last + 1);
619                    // if the peer is behind, the conflict response will walk it back correctly.
620                    let last_entry_id = self.ctx.raft_log().last_entry_id();
621                    let _ = self
622                        .role
623                        .init_peers_next_index_and_match_index(last_entry_id, vec![peer_id]);
624                }
625                // Update per-peer backoff state; emit error alert + metrics when consecutive
626                // failures reach the configured threshold (leader protection highest priority).
627                let policy = &self.ctx.node_config.retry.install_snapshot;
628                self.role.handle_snapshot_push_completed(peer_id, success, policy, self.node_id);
629            }
630        };
631
632        Ok(())
633    }
634
635    pub fn register_new_commit_listener(
636        &mut self,
637        tx: mpsc::UnboundedSender<NewCommitData>,
638    ) {
639        self.new_commit_listener.push(tx);
640    }
641
642    pub fn notify_new_commit(
643        &self,
644        new_commit_data: NewCommitData,
645    ) {
646        debug!(?new_commit_data, "notify_new_commit",);
647
648        for tx in &self.new_commit_listener {
649            if let Err(e) = tx.send(new_commit_data.clone()) {
650                error!("notify_new_commit failed: {:?}", e);
651            }
652        }
653    }
654
655    #[cfg(test)]
656    pub fn register_role_transition_listener(
657        &mut self,
658        tx: mpsc::UnboundedSender<i32>,
659    ) {
660        self.test_role_transition_listener.push(tx);
661    }
662
663    #[cfg(test)]
664    pub fn notify_role_transition(&self) {
665        let new_role_i32 = self.role.as_i32();
666        for tx in &self.test_role_transition_listener {
667            tx.send(new_role_i32).expect("should succeed");
668        }
669    }
670
671    #[cfg(test)]
672    pub fn register_raft_event_listener(
673        &mut self,
674        tx: mpsc::UnboundedSender<super::TestEvent>,
675    ) {
676        self.test_raft_event_listener.push(tx);
677    }
678
679    #[cfg(test)]
680    pub fn notify_raft_event(
681        &self,
682        event: super::TestEvent,
683    ) {
684        debug!("unit test:: notify new raft event: {:?}", &event);
685
686        for tx in &self.test_raft_event_listener {
687            assert!(tx.send(event.clone()).is_ok(), "should succeed");
688        }
689    }
690
691    #[cfg(test)]
692    pub fn set_role(
693        &mut self,
694        role: RaftRole<T>,
695    ) {
696        self.role = role
697    }
698
699    /// Returns a cloned event sender for external use.
700    ///
701    /// This provides controlled access to send validated RaftEvents to the Raft core.
702    /// Events sent through this sender are still processed through the normal validation
703    /// pipeline in the main event loop.
704    ///
705    /// # Security Note
706    /// While this provides access to the event channel, all events are still validated
707    /// by the Raft state machine before being applied. The event handler in `handle_raft_event`
708    /// performs necessary checks based on current term, role, and state.
709    pub fn event_sender(&self) -> mpsc::Sender<RaftEvent> {
710        self.event_tx.clone()
711    }
712
713    pub fn cmd_sender(&self) -> mpsc::Sender<super::ClientCmd> {
714        self.cmd_tx.clone()
715    }
716
717    /// Returns a cloned role event sender for internal use.
718    ///
719    /// # Warning
720    /// This is primarily for internal components that need to trigger role transitions.
721    /// External callers should not use this unless they understand the Raft protocol deeply.
722    #[doc(hidden)]
723    pub fn role_event_sender(&self) -> mpsc::UnboundedSender<RoleEvent> {
724        self.role_tx.clone()
725    }
726}
727
728impl<T> Drop for Raft<T>
729where
730    T: TypeConfig,
731{
732    fn drop(&mut self) {
733        info!("Raft been dropped.");
734
735        if let Err(e) = self
736            .ctx
737            .raft_log()
738            .save_hard_state(&self.role.state().shared_state().hard_state)
739        {
740            error!(?e, "State storage persist node hard state failed.");
741        }
742
743        info!("Graceful shutdown node state ...");
744    }
745}