Skip to main content

d_engine_server/node/
builder.rs

1//! A builder pattern implementation for constructing a [`Node`] instance in a
2//! Raft cluster.
3//!
4//! The [`NodeBuilder`] provides a fluent interface to configure and assemble
5//! components required by the Raft node, including storage engines, state machines,
6//! transport layers, membership management, and asynchronous handlers.
7//!
8//! ## Key Design Points
9//! - **Explicit Component Initialization**: Requires explicit configuration of storage engines and
10//!   state machines (no implicit defaults).
11//! - **Customization**: Allows overriding components via setter methods (e.g., `storage_engine()`,
12//!   `state_machine()`, `transport()`).
13//! - **Simple API**: Single `start().await?` method to build and launch the node.
14//!
15//! ## Example
16//! ```ignore
17//! let (shutdown_tx, shutdown_rx) = watch::channel(());
18//! let node = NodeBuilder::init(config, shutdown_rx)
19//!     .storage_engine(custom_storage_engine)  // Required component
20//!     .state_machine(custom_state_machine)    // Required component
21//!     .start().await?;
22//! ```
23//!
24//! ## Notes
25//! - **Thread Safety**: All components wrapped in `Arc`/`Mutex` for shared ownership and thread
26//!   safety.
27//! - **Resource Cleanup**: Uses `watch::Receiver` for cooperative shutdown signaling.
28//! - **Configuration Loading**: Supports loading cluster configuration from file or in-memory
29//!   config.
30
31use std::fmt::Debug;
32use std::sync::Arc;
33use std::sync::atomic::AtomicBool;
34
35use d_engine_core::ClusterConfig;
36use d_engine_core::CommitHandler;
37use d_engine_core::CommitHandlerDependencies;
38use d_engine_core::DefaultCommitHandler;
39use d_engine_core::DefaultPurgeExecutor;
40use d_engine_core::DefaultStateMachineHandler;
41use d_engine_core::ElectionHandler;
42use d_engine_core::LogSizePolicy;
43use d_engine_core::NewCommitData;
44use d_engine_core::Raft;
45use d_engine_core::RaftConfig;
46use d_engine_core::RaftCoreHandlers;
47use d_engine_core::RaftLog;
48use d_engine_core::RaftNodeConfig;
49use d_engine_core::RaftRole;
50use d_engine_core::RaftStorageHandles;
51use d_engine_core::ReplicationHandler;
52use d_engine_core::Result;
53use d_engine_core::SignalParams;
54use d_engine_core::StateMachine;
55use d_engine_core::StateMachineWorker;
56use d_engine_core::StorageEngine;
57use d_engine_core::SystemError;
58use d_engine_core::alias::MOF;
59use d_engine_core::alias::SMHOF;
60use d_engine_core::alias::SNP;
61use d_engine_core::alias::TROF;
62use d_engine_core::follower_state::FollowerState;
63use d_engine_core::learner_state::LearnerState;
64#[cfg(feature = "watch")]
65use d_engine_core::watch::WatchDispatcher;
66#[cfg(feature = "watch")]
67use d_engine_core::watch::WatchRegistry;
68use tokio::sync::Mutex;
69use tokio::sync::mpsc;
70use tokio::sync::watch;
71use tracing::debug;
72use tracing::error;
73use tracing::info;
74
75use super::LeaderNotifier;
76use super::RaftTypeConfig;
77use crate::Node;
78use crate::membership::RaftMembership;
79use crate::network::grpc;
80use crate::network::grpc::grpc_transport::GrpcTransport;
81use crate::storage::BufferedRaftLog;
82
83/// Builder for creating a Raft node
84///
85/// Provides a fluent API for configuring and constructing a [`Node`].
86///
87/// # Example
88///
89/// ```rust,ignore
90/// use d_engine_server::{NodeBuilder, FileStorageEngine, FileStateMachine};
91///
92/// let node = NodeBuilder::new(None, shutdown_rx)
93///     .storage_engine(Arc::new(FileStorageEngine::new(...)?))
94///     .state_machine(Arc::new(FileStateMachine::new(...).await?))
95///     .start().await?;
96/// ```
97pub struct NodeBuilder<SE, SM>
98where
99    SE: StorageEngine + Debug,
100    SM: StateMachine + Debug,
101{
102    node_id: u32,
103
104    pub(super) node_config: RaftNodeConfig,
105    pub(super) storage_engine: Option<Arc<SE>>,
106    pub(super) membership: Option<MOF<RaftTypeConfig<SE, SM>>>,
107    pub(super) state_machine: Option<Arc<SM>>,
108    pub(super) transport: Option<TROF<RaftTypeConfig<SE, SM>>>,
109    pub(super) state_machine_handler: Option<Arc<SMHOF<RaftTypeConfig<SE, SM>>>>,
110    pub(super) snapshot_policy: Option<SNP<RaftTypeConfig<SE, SM>>>,
111    pub(super) shutdown_signal: watch::Receiver<()>,
112
113    pub(super) node: Option<Arc<Node<RaftTypeConfig<SE, SM>>>>,
114}
115
116impl<SE, SM> NodeBuilder<SE, SM>
117where
118    SE: StorageEngine + Debug,
119    SM: StateMachine + Debug,
120{
121    /// Creates a new NodeBuilder with cluster configuration loaded from file
122    ///
123    /// # Arguments
124    /// * `cluster_path` - Optional path to node-specific cluster configuration
125    /// * `shutdown_signal` - Watch channel for graceful shutdown signaling
126    ///
127    /// # Panics
128    /// Will panic if configuration loading fails (consider returning Result
129    /// instead)
130    pub fn new(
131        cluster_path: Option<&str>,
132        shutdown_signal: watch::Receiver<()>,
133    ) -> Self {
134        let node_config = if let Some(p) = cluster_path {
135            info!("with_override_config from: {}", &p);
136            RaftNodeConfig::new()
137                .expect("Load node_config successfully")
138                .with_override_config(p)
139                .expect("Overwrite node_config successfully")
140                .validate()
141                .expect("Validate node_config successfully")
142        } else {
143            RaftNodeConfig::new()
144                .expect("Load node_config successfully")
145                .validate()
146                .expect("Validate node_config successfully")
147        };
148
149        Self::init(node_config, shutdown_signal)
150    }
151
152    /// Constructs NodeBuilder from in-memory cluster configuration
153    ///
154    /// # Arguments
155    /// * `cluster_config` - Pre-built cluster configuration
156    /// * `shutdown_signal` - Graceful shutdown notification channel
157    ///
158    /// # Usage
159    /// ```ignore
160    /// let builder = NodeBuilder::from_cluster_config(my_config, shutdown_rx);
161    /// ```
162    pub fn from_cluster_config(
163        cluster_config: ClusterConfig,
164        shutdown_signal: watch::Receiver<()>,
165    ) -> Self {
166        let mut node_config = RaftNodeConfig::new().expect("Load node_config successfully");
167        node_config.cluster = cluster_config;
168        let node_config = node_config.validate().expect("Validate node_config successfully");
169        Self::init(node_config, shutdown_signal)
170    }
171
172    /// Core initialization logic shared by all construction paths
173    pub fn init(
174        node_config: RaftNodeConfig,
175        shutdown_signal: watch::Receiver<()>,
176    ) -> Self {
177        Self {
178            node_id: node_config.cluster.node_id,
179            storage_engine: None,
180            state_machine: None,
181            transport: None,
182            membership: None,
183            node_config,
184            shutdown_signal,
185            state_machine_handler: None,
186            snapshot_policy: None,
187            node: None,
188        }
189    }
190
191    /// Sets a custom storage engine implementation
192    pub fn storage_engine(
193        mut self,
194        storage_engine: Arc<SE>,
195    ) -> Self {
196        self.storage_engine = Some(storage_engine);
197        self
198    }
199
200    /// Sets a custom state machine implementation
201    pub fn state_machine(
202        mut self,
203        state_machine: Arc<SM>,
204    ) -> Self {
205        self.state_machine = Some(state_machine);
206        self
207    }
208
209    /// Replaces the entire node configuration
210    pub fn node_config(
211        mut self,
212        node_config: RaftNodeConfig,
213    ) -> Self {
214        self.node_config = node_config;
215        self
216    }
217
218    /// Replaces the raft  configuration
219    pub fn raft_config(
220        mut self,
221        config: RaftConfig,
222    ) -> Self {
223        self.node_config.raft = config;
224        self
225    }
226
227    /// Finalizes the builder and constructs the Raft node instance.
228    ///
229    /// Initializes default implementations for any unconfigured components:
230    /// - Creates file-based databases for state machine and logs
231    /// - Sets up default gRPC transport
232    /// - Initializes commit handling subsystem
233    /// - Configures membership management
234    ///
235    /// # Panics
236    /// Panics if essential components cannot be initialized
237    pub async fn build(mut self) -> Result<Self> {
238        let node_id = self.node_id;
239        let node_config = self.node_config.clone();
240
241        // Init CommitHandler
242        let (new_commit_event_tx, new_commit_event_rx) = mpsc::unbounded_channel::<NewCommitData>();
243
244        // Handle state machine initialization
245        let state_machine = self.state_machine.take().ok_or_else(|| {
246            SystemError::NodeStartFailed(
247                "State machine must be set before calling build()".to_string(),
248            )
249        })?;
250
251        // Note: Lease configuration should be injected BEFORE wrapping in Arc.
252        // See StandaloneServer::start() or EmbeddedEngine::with_rocksdb() for correct pattern.
253        // If state_machine is passed from user code, they are responsible for lease injection.
254
255        // Start state machine: flip flags and load persisted data
256        state_machine.start().await?;
257
258        // Spawn lease background cleanup task (if TTL feature is enabled)
259        // Framework-level feature: completely transparent to developers
260        let lease_cleanup_handle = if node_config.raft.state_machine.lease.enabled {
261            info!(
262                "Starting lease background cleanup worker (interval: {}ms)",
263                node_config.raft.state_machine.lease.interval_ms
264            );
265            Some(Self::spawn_background_cleanup_worker(
266                Arc::clone(&state_machine),
267                node_config.raft.state_machine.lease.interval_ms,
268                self.shutdown_signal.clone(),
269            ))
270        } else {
271            debug!("Lease feature disabled: no background cleanup worker");
272            None
273        };
274
275        // Handle storage engine initialization
276        let storage_engine = self.storage_engine.take().ok_or_else(|| {
277            SystemError::NodeStartFailed(
278                "Storage engine must be set before calling build()".to_string(),
279            )
280        })?;
281
282        //Retrieve last applied index from state machine
283        let last_applied_index = state_machine.last_applied().index;
284        info!("Node startup, Last applied index: {}", last_applied_index);
285        let raft_log = {
286            let (log, receiver) = BufferedRaftLog::new(
287                node_id,
288                node_config.raft.persistence.clone(),
289                storage_engine.clone(),
290            );
291
292            // Start processor and get Arc-wrapped instance
293            log.start(receiver)
294        };
295
296        let transport = self.transport.take().unwrap_or(GrpcTransport::new(node_id));
297
298        let snapshot_policy = self.snapshot_policy.take().unwrap_or(LogSizePolicy::new(
299            node_config.raft.snapshot.max_log_entries_before_snapshot,
300            node_config.raft.snapshot.snapshot_cool_down_since_last_check,
301        ));
302
303        let shutdown_signal = self.shutdown_signal.clone();
304
305        // Initialize watch system (controlled by feature flag at compile time)
306        // All resource allocation is explicit and visible here (no hidden spawns)
307        #[cfg(feature = "watch")]
308        let watch_system = {
309            let (broadcast_tx, broadcast_rx) =
310                tokio::sync::broadcast::channel(node_config.raft.watch.event_queue_size);
311
312            // Create unregister channel
313            let (unregister_tx, unregister_rx) = mpsc::unbounded_channel();
314
315            // Create shared registry
316            let registry = Arc::new(WatchRegistry::new(
317                node_config.raft.watch.watcher_buffer_size,
318                unregister_tx,
319            ));
320
321            // Create dispatcher
322            let dispatcher =
323                WatchDispatcher::new(Arc::clone(&registry), broadcast_rx, unregister_rx);
324
325            // Explicitly spawn dispatcher task (resource allocation visible)
326            let dispatcher_handle = tokio::spawn(async move {
327                dispatcher.run().await;
328            });
329
330            Some((broadcast_tx, registry, dispatcher_handle))
331        };
332
333        let state_machine_handler = self.state_machine_handler.take().unwrap_or_else(|| {
334            #[cfg(feature = "watch")]
335            let watch_event_tx = watch_system.as_ref().map(|(tx, _, _)| tx.clone());
336            #[cfg(not(feature = "watch"))]
337            let watch_event_tx = None;
338
339            Arc::new(DefaultStateMachineHandler::new(
340                node_id,
341                last_applied_index,
342                state_machine.clone(),
343                node_config.raft.snapshot.clone(),
344                snapshot_policy,
345                watch_event_tx,
346            ))
347        });
348        let membership = Arc::new(self.membership.take().unwrap_or_else(|| {
349            RaftMembership::new(
350                node_id,
351                node_config.cluster.initial_cluster.clone(),
352                node_config.clone(),
353            )
354        }));
355
356        let purge_executor = DefaultPurgeExecutor::new(raft_log.clone());
357
358        let (role_tx, role_rx) = mpsc::unbounded_channel();
359        let (event_tx, event_rx) = mpsc::channel(10240);
360        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
361        let event_tx_clone = event_tx.clone(); // used in commit handler
362
363        let node_config_arc = Arc::new(node_config);
364
365        // Construct my role
366        // Role initialization flow:
367        // 1. Check if node is learner from config
368        // 2. Load persisted hard state from storage
369        // 3. Determine initial role based on cluster state
370        // 4. Inject dependencies to role state
371        let last_applied_index = Some(state_machine.last_applied().index);
372        let my_role = if node_config_arc.is_learner() {
373            RaftRole::Learner(Box::new(LearnerState::new(
374                node_id,
375                node_config_arc.clone(),
376            )))
377        } else {
378            RaftRole::Follower(Box::new(FollowerState::new(
379                node_id,
380                node_config_arc.clone(),
381                raft_log.load_hard_state().expect("Failed to load hard state"),
382                last_applied_index,
383            )))
384        };
385        let my_role_i32 = my_role.as_i32();
386        let my_current_term = my_role.current_term();
387        info!(
388            "Start node with role: {} and term: {}",
389            my_role_i32, my_current_term
390        );
391
392        // Construct raft core
393        let mut raft_core = Raft::<RaftTypeConfig<SE, SM>>::new(
394            node_id,
395            my_role,
396            RaftStorageHandles::<RaftTypeConfig<SE, SM>> {
397                raft_log,
398                state_machine: state_machine.clone(),
399            },
400            transport,
401            RaftCoreHandlers::<RaftTypeConfig<SE, SM>> {
402                election_handler: ElectionHandler::new(node_id),
403                replication_handler: ReplicationHandler::new(node_id),
404                state_machine_handler: state_machine_handler.clone(),
405                purge_executor: Arc::new(purge_executor),
406            },
407            membership.clone(),
408            SignalParams::new(
409                role_tx,
410                role_rx,
411                event_tx,
412                event_rx,
413                cmd_tx.clone(),
414                cmd_rx,
415                shutdown_signal.clone(),
416            ),
417            node_config_arc.clone(),
418        );
419
420        // Register commit event listener
421        raft_core.register_new_commit_listener(new_commit_event_tx);
422
423        // Create leader notification channel and register with Raft core
424        let leader_notifier = LeaderNotifier::new();
425        raft_core.register_leader_change_listener(leader_notifier.sender());
426
427        // Create SM Worker channel for decoupled apply operations
428        let (sm_apply_tx, sm_apply_rx) = mpsc::unbounded_channel();
429
430        // Spawn SM Worker task
431        let sm_worker = StateMachineWorker::new(
432            node_id,
433            state_machine_handler.clone(),
434            sm_apply_rx,
435            event_tx_clone.clone(),
436            self.shutdown_signal.clone(),
437        );
438        let sm_worker_handle = Self::spawn_state_machine_worker(sm_worker);
439
440        // Start CommitHandler in a single thread
441        let deps = CommitHandlerDependencies {
442            state_machine_handler,
443            raft_log: raft_core.ctx.storage.raft_log.clone(),
444            membership: membership.clone(),
445            event_tx: event_tx_clone,
446            sm_apply_tx,
447            shutdown_signal,
448            max_batch_size: raft_core.ctx.node_config.raft.batching.max_batch_size,
449        };
450
451        let commit_handler = DefaultCommitHandler::<RaftTypeConfig<SE, SM>>::new(
452            node_id,
453            my_role_i32,
454            my_current_term,
455            deps,
456            new_commit_event_rx,
457        );
458        // Spawn commit listener via Builder method
459        // This ensures all tokio::spawn calls are visible in one place (Builder impl)
460        // Following the "one page visible" Builder pattern principle
461        let commit_handler_handle = Self::spawn_state_machine_commit_listener(commit_handler);
462
463        let event_tx = raft_core.event_sender();
464        let (rpc_ready_tx, _rpc_ready_rx) = watch::channel(false);
465
466        let node = Node::<RaftTypeConfig<SE, SM>> {
467            node_id,
468            raft_core: Arc::new(Mutex::new(raft_core)),
469            membership,
470            event_tx: event_tx.clone(),
471            cmd_tx,
472            ready: AtomicBool::new(false),
473            rpc_ready_tx,
474            leader_notifier,
475            node_config: node_config_arc,
476            #[cfg(feature = "watch")]
477            watch_registry: watch_system.as_ref().map(|(_, reg, _)| Arc::clone(reg)),
478            #[cfg(feature = "watch")]
479            _watch_dispatcher_handle: watch_system.map(|(_, _, handle)| handle),
480            _sm_worker_handle: Some(sm_worker_handle),
481            _commit_handler_handle: Some(commit_handler_handle),
482            _lease_cleanup_handle: lease_cleanup_handle,
483            shutdown_signal: self.shutdown_signal.clone(),
484        };
485
486        self.node = Some(Arc::new(node));
487        Ok(self)
488    }
489
490    /// Spawn state machine commit listener as background task.
491    ///
492    /// This method is called during node build() to start the commit handler thread.
493    /// All spawn_* methods are centralized in NodeBuilder so developers can see
494    /// all resource consumption (threads/tasks) in one place.
495    ///
496    /// # Returns
497    /// * `JoinHandle` - Task handle for lifecycle management
498    fn spawn_state_machine_commit_listener(
499        mut commit_handler: DefaultCommitHandler<RaftTypeConfig<SE, SM>>
500    ) -> tokio::task::JoinHandle<()> {
501        tokio::spawn(async move {
502            match commit_handler.run().await {
503                Ok(_) => {
504                    info!("commit_handler exit program");
505                }
506                Err(e) => {
507                    error!("commit_handler exit program with unexpected error: {:?}", e);
508                    println!("commit_handler exit program");
509                }
510            }
511        })
512    }
513
514    /// Spawn state machine worker task as background task.
515    ///
516    /// This task applies state machine entries decoupled from CommitHandler,
517    /// enabling non-blocking entry sending from the critical path. Implements
518    /// graceful drain on shutdown to ensure all pending entries are applied
519    /// before task termination, maintaining Raft protocol compliance.
520    ///
521    /// # Returns
522    /// * `JoinHandle` - Task handle for lifecycle management
523    fn spawn_state_machine_worker(
524        sm_worker: StateMachineWorker<RaftTypeConfig<SE, SM>>
525    ) -> tokio::task::JoinHandle<()> {
526        tokio::spawn(async move {
527            match sm_worker.run().await {
528                Ok(_) => {
529                    info!("state_machine_worker exit program");
530                }
531                Err(e) => {
532                    error!(
533                        "state_machine_worker exit program with unexpected error: {:?}",
534                        e
535                    );
536                    println!("state_machine_worker exit program");
537                }
538            }
539        })
540    }
541
542    /// Spawn lease background cleanup task (if enabled).
543    ///
544    /// This task runs independently from Raft apply pipeline, avoiding any blocking.
545    /// Only spawns when cleanup_strategy = "background" (not "disabled" or "piggyback").
546    ///
547    /// # Arguments
548    /// * `state_machine` - Arc reference to state machine for accessing lease manager
549    /// * `lease_config` - Lease configuration determining cleanup behavior
550    /// * `shutdown_signal` - Watch channel for graceful shutdown notification
551    ///
552    /// # Returns
553    /// * `Option<JoinHandle>` - Task handle if background cleanup is enabled, None otherwise
554    ///
555    /// # Design Principles
556    /// - **Zero overhead**: If disabled, returns None immediately (no task spawned)
557    /// - **One page visible**: All long-running tasks spawned here in builder.rs
558    /// - **Graceful shutdown**: Monitors shutdown signal for clean termination
559    fn spawn_background_cleanup_worker(
560        state_machine: Arc<SM>,
561        interval_ms: u64,
562        mut shutdown_signal: watch::Receiver<()>,
563    ) -> tokio::task::JoinHandle<()> {
564        tokio::spawn(async move {
565            let mut interval = tokio::time::interval(std::time::Duration::from_millis(interval_ms));
566
567            loop {
568                tokio::select! {
569                    _ = interval.tick() => {
570                        // Call state machine's lease background cleanup
571                        match state_machine.lease_background_cleanup().await {
572                            Ok(deleted_keys) => {
573                                if !deleted_keys.is_empty() {
574                                    debug!(
575                                        "Lease background cleanup: deleted {} expired keys",
576                                        deleted_keys.len()
577                                    );
578                                }
579                            }
580                            Err(e) => {
581                                error!("Lease background cleanup failed: {:?}", e);
582                            }
583                        }
584                    }
585                    _ = shutdown_signal.changed() => {
586                        info!("Lease background cleanup received shutdown signal");
587                        break;
588                    }
589                }
590            }
591
592            debug!("Lease background cleanup worker stopped");
593        })
594    }
595
596    /// Spawn watch dispatcher as background task.
597    ///
598    /// The dispatcher manages all watch streams for the lifetime of the node.
599    /// Sets a custom state machine handler implementation.
600    ///
601    /// This allows developers to provide their own implementation of the state machine handler
602    /// which processes committed log entries and applies them to the state machine.
603    ///
604    /// # Arguments
605    /// * `handler` - custom state machine handler that must implement the `StateMachineHandler`
606    ///   trait
607    ///
608    /// # Notes
609    /// - The handler must be thread-safe as it will be shared across multiple threads
610    /// - If not set, a default implementation will be used during `build()`
611    /// - The handler should properly handle snapshot creation and restoration
612    pub fn with_custom_state_machine_handler(
613        mut self,
614        handler: Arc<SMHOF<RaftTypeConfig<SE, SM>>>,
615    ) -> Self {
616        self.state_machine_handler = Some(handler);
617        self
618    }
619
620    /// Starts the gRPC server for cluster communication.
621    ///
622    /// # Panics
623    /// Panics if node hasn't been built or address binding fails
624    async fn start_rpc_server(self) -> Self {
625        debug!("1. --- start RPC server --- ");
626        if let Some(ref node) = self.node {
627            let node_clone = node.clone();
628            let shutdown = self.shutdown_signal.clone();
629            let listen_address = self.node_config.cluster.listen_address;
630            let node_config = self.node_config.clone();
631            tokio::spawn(async move {
632                if let Err(e) =
633                    grpc::start_rpc_server(node_clone, listen_address, node_config, shutdown).await
634                {
635                    eprintln!("RPC server stops. {e:?}");
636                    error!("RPC server stops. {:?}", e);
637                }
638            });
639            self
640        } else {
641            panic!("failed to start RPC server");
642        }
643    }
644
645    /// Builds and starts the Raft node.
646    ///
647    /// This is the primary method to initialize and start a node. It performs:
648    /// 1. State machine initialization (including lease injection if applicable)
649    /// 2. Raft core construction
650    /// 3. Background task spawning (commit handler, replication, election)
651    /// 4. gRPC server startup for cluster communication
652    ///
653    /// # Returns
654    /// An `Arc<Node>` ready for operation
655    ///
656    /// # Errors
657    /// Returns an error if any initialization step fails
658    ///
659    /// # Example
660    /// ```ignore
661    /// let node = NodeBuilder::init(config, shutdown_rx)
662    ///     .storage_engine(storage)
663    ///     .state_machine(state_machine)
664    ///     .start().await?;
665    /// ```
666    pub async fn start(self) -> Result<Arc<Node<RaftTypeConfig<SE, SM>>>> {
667        let builder = self.build().await?;
668        let builder = builder.start_rpc_server().await;
669        builder.node.ok_or_else(|| {
670            SystemError::NodeStartFailed("Node build failed unexpectedly".to_string()).into()
671        })
672    }
673
674    /// Test constructor with custom database path
675    ///
676    /// # Safety
677    /// Bypasses normal configuration validation - use for testing only
678    #[cfg(test)]
679    pub(crate) fn new_from_db_path(
680        db_path: &str,
681        shutdown_signal: watch::Receiver<()>,
682    ) -> Self {
683        use std::path::PathBuf;
684
685        let mut node_config = RaftNodeConfig::new().expect("Load node_config successfully");
686        node_config.cluster.db_root_dir = PathBuf::from(db_path);
687        let node_config = node_config.validate().expect("Validate node_config successfully");
688
689        Self::init(node_config, shutdown_signal)
690    }
691}