Skip to main content

d_engine_server/node/
builder.rs

1//! A builder pattern implementation for constructing a [`Node`] instance in a
2//! Raft cluster.
3//!
4//! The [`NodeBuilder`] provides a fluent interface to configure and assemble
5//! components required by the Raft node, including storage engines, state machines,
6//! transport layers, membership management, and asynchronous handlers.
7//!
8//! ## Key Design Points
9//! - **Explicit Component Initialization**: Requires explicit configuration of storage engines and
10//!   state machines (no implicit defaults).
11//! - **Customization**: Allows overriding components via setter methods (e.g., `storage_engine()`,
12//!   `state_machine()`, `transport()`).
13//! - **Simple API**: Single `start().await?` method to build and launch the node.
14//!
15//! ## Example
16//! ```ignore
17//! let (shutdown_tx, shutdown_rx) = watch::channel(());
18//! let node = NodeBuilder::from_node_config(config, shutdown_rx)
19//!     .storage_engine(custom_storage_engine)  // Required component
20//!     .state_machine(custom_state_machine)    // Required component
21//!     .start().await?;
22//! ```
23//!
24//! ## Notes
25//! - **Thread Safety**: All components wrapped in `Arc`/`Mutex` for shared ownership and thread
26//!   safety.
27//! - **Resource Cleanup**: Uses `watch::Receiver` for cooperative shutdown signaling.
28//! - **Configuration Loading**: Supports loading cluster configuration from file or in-memory
29//!   config.
30
31use std::fmt::Debug;
32use std::sync::Arc;
33use std::sync::atomic::AtomicBool;
34
35use d_engine_core::ClusterConfig;
36use d_engine_core::CommitHandler;
37use d_engine_core::CommitHandlerDependencies;
38use d_engine_core::DefaultCommitHandler;
39use d_engine_core::DefaultPurgeExecutor;
40use d_engine_core::DefaultStateMachineHandler;
41use d_engine_core::ElectionHandler;
42use d_engine_core::LogSizePolicy;
43use d_engine_core::NewCommitData;
44use d_engine_core::Raft;
45use d_engine_core::RaftConfig;
46use d_engine_core::RaftCoreHandlers;
47use d_engine_core::RaftLog;
48use d_engine_core::RaftNodeConfig;
49use d_engine_core::RaftRole;
50use d_engine_core::RaftStorageHandles;
51use d_engine_core::ReplicationHandler;
52use d_engine_core::Result;
53use d_engine_core::RoleEvent;
54use d_engine_core::SignalParams;
55use d_engine_core::StateMachine;
56use d_engine_core::StateMachineWorker;
57use d_engine_core::StorageEngine;
58use d_engine_core::SystemError;
59use d_engine_core::alias::MOF;
60use d_engine_core::alias::SMHOF;
61use d_engine_core::alias::SNP;
62use d_engine_core::alias::TROF;
63use d_engine_core::follower_state::FollowerState;
64use d_engine_core::learner_state::LearnerState;
65#[cfg(feature = "watch")]
66use d_engine_core::watch::WatchDispatcher;
67#[cfg(feature = "watch")]
68use d_engine_core::watch::WatchRegistry;
69use tokio::sync::Mutex;
70use tokio::sync::mpsc;
71use tokio::sync::watch;
72use tracing::debug;
73use tracing::error;
74use tracing::info;
75
76use super::LeaderNotifier;
77use super::RaftTypeConfig;
78use crate::Node;
79use crate::membership::RaftMembership;
80use crate::network::grpc;
81use crate::network::grpc::grpc_transport::GrpcTransport;
82use crate::storage::BufferedRaftLog;
83
84/// Builder for creating a Raft node
85///
86/// Provides a fluent API for configuring and constructing a [`Node`].
87///
88/// # Example
89///
90/// ```rust,ignore
91/// use d_engine_server::{NodeBuilder, FileStorageEngine, FileStateMachine};
92///
93/// let node = NodeBuilder::new(None, shutdown_rx)
94///     .storage_engine(Arc::new(FileStorageEngine::new(...)?))
95///     .state_machine(Arc::new(FileStateMachine::new(...).await?))
96///     .start().await?;
97/// ```
98pub struct NodeBuilder<SE, SM>
99where
100    SE: StorageEngine + Debug,
101    SM: StateMachine + Debug,
102{
103    node_id: u32,
104
105    pub(super) node_config: RaftNodeConfig,
106    pub(super) storage_engine: Option<Arc<SE>>,
107    pub(super) membership: Option<MOF<RaftTypeConfig<SE, SM>>>,
108    pub(super) state_machine: Option<Arc<SM>>,
109    pub(super) transport: Option<TROF<RaftTypeConfig<SE, SM>>>,
110    pub(super) state_machine_handler: Option<Arc<SMHOF<RaftTypeConfig<SE, SM>>>>,
111    pub(super) snapshot_policy: Option<SNP<RaftTypeConfig<SE, SM>>>,
112    pub(super) shutdown_signal: watch::Receiver<()>,
113
114    pub(super) node: Option<Arc<Node<RaftTypeConfig<SE, SM>>>>,
115}
116
117impl<SE, SM> NodeBuilder<SE, SM>
118where
119    SE: StorageEngine + Debug,
120    SM: StateMachine + Debug,
121{
122    /// Creates a new NodeBuilder with cluster configuration loaded from file
123    ///
124    /// # Arguments
125    /// * `cluster_path` - Optional path to node-specific cluster configuration
126    /// * `shutdown_signal` - Watch channel for graceful shutdown signaling
127    ///
128    /// # Panics
129    /// Will panic if configuration loading fails (consider returning Result
130    /// instead)
131    pub fn new(
132        cluster_path: Option<&str>,
133        shutdown_signal: watch::Receiver<()>,
134    ) -> Self {
135        let node_config = if let Some(p) = cluster_path {
136            info!("with_override_config from: {}", &p);
137            RaftNodeConfig::new()
138                .expect("Load node_config successfully")
139                .with_override_config(p)
140                .expect("Overwrite node_config successfully")
141                .validate()
142                .expect("Validate node_config successfully")
143        } else {
144            RaftNodeConfig::new()
145                .expect("Load node_config successfully")
146                .validate()
147                .expect("Validate node_config successfully")
148        };
149
150        Self::init(node_config, shutdown_signal)
151    }
152
153    /// Constructs NodeBuilder from in-memory cluster configuration
154    ///
155    /// # Arguments
156    /// * `cluster_config` - Pre-built cluster configuration
157    /// * `shutdown_signal` - Graceful shutdown notification channel
158    ///
159    /// # Usage
160    /// ```ignore
161    /// let builder = NodeBuilder::from_cluster_config(my_config, shutdown_rx);
162    /// ```
163    pub fn from_cluster_config(
164        cluster_config: ClusterConfig,
165        shutdown_signal: watch::Receiver<()>,
166    ) -> Self {
167        let mut node_config = RaftNodeConfig::new().expect("Load node_config successfully");
168        node_config.cluster = cluster_config;
169        let node_config = node_config.validate().expect("Validate node_config successfully");
170        Self::init(node_config, shutdown_signal)
171    }
172
173    /// Constructs NodeBuilder from a fully-built [`RaftNodeConfig`].
174    ///
175    /// Use this when you have already assembled a complete `RaftNodeConfig` and
176    /// want to avoid the implicit `RaftNodeConfig::new()` + `validate()` call
177    /// that [`new`](NodeBuilder::new) performs internally before applying your
178    /// config. That detour can panic in environments where no default config
179    /// file or environment variables are present.
180    ///
181    /// # Arguments
182    /// * `node_config` - Fully assembled and validated node configuration
183    /// * `shutdown_signal` - Watch channel for graceful shutdown signaling
184    ///
185    /// # Usage
186    /// ```ignore
187    /// let config: RaftNodeConfig = /* build and validate your config */;
188    /// let builder = NodeBuilder::from_node_config(config, shutdown_rx);
189    /// ```
190    pub fn from_node_config(
191        node_config: RaftNodeConfig,
192        shutdown_signal: watch::Receiver<()>,
193    ) -> Self {
194        Self::init(node_config, shutdown_signal)
195    }
196
197    /// Core initialization logic shared by all construction paths
198    pub(crate) fn init(
199        node_config: RaftNodeConfig,
200        shutdown_signal: watch::Receiver<()>,
201    ) -> Self {
202        Self {
203            node_id: node_config.cluster.node_id,
204            storage_engine: None,
205            state_machine: None,
206            transport: None,
207            membership: None,
208            node_config,
209            shutdown_signal,
210            state_machine_handler: None,
211            snapshot_policy: None,
212            node: None,
213        }
214    }
215
216    /// Sets a custom storage engine implementation
217    pub fn storage_engine(
218        mut self,
219        storage_engine: Arc<SE>,
220    ) -> Self {
221        self.storage_engine = Some(storage_engine);
222        self
223    }
224
225    /// Sets a custom state machine implementation
226    pub fn state_machine(
227        mut self,
228        state_machine: Arc<SM>,
229    ) -> Self {
230        self.state_machine = Some(state_machine);
231        self
232    }
233
234    /// Replaces the entire node configuration
235    pub fn node_config(
236        mut self,
237        node_config: RaftNodeConfig,
238    ) -> Self {
239        self.node_id = node_config.cluster.node_id;
240        self.node_config = node_config;
241        self
242    }
243
244    /// Replaces the raft  configuration
245    pub fn raft_config(
246        mut self,
247        config: RaftConfig,
248    ) -> Self {
249        self.node_config.raft = config;
250        self
251    }
252
253    /// Finalizes the builder and constructs the Raft node instance.
254    ///
255    /// Initializes default implementations for any unconfigured components:
256    /// - Creates file-based databases for state machine and logs
257    /// - Sets up default gRPC transport
258    /// - Initializes commit handling subsystem
259    /// - Configures membership management
260    ///
261    /// # Panics
262    /// Panics if essential components cannot be initialized
263    pub async fn build(mut self) -> Result<Self> {
264        let node_id = self.node_id;
265        let node_config = self.node_config.clone();
266
267        // Init CommitHandler
268        let (new_commit_event_tx, new_commit_event_rx) = mpsc::unbounded_channel::<NewCommitData>();
269
270        // Handle state machine initialization
271        let state_machine = self.state_machine.take().ok_or_else(|| {
272            SystemError::NodeStartFailed(
273                "State machine must be set before calling build()".to_string(),
274            )
275        })?;
276
277        // Note: Lease configuration should be injected BEFORE wrapping in Arc.
278        // See StandaloneServer::start() or EmbeddedEngine::with_rocksdb() for correct pattern.
279        // If state_machine is passed from user code, they are responsible for lease injection.
280
281        // Start state machine: flip flags and load persisted data
282        state_machine.start().await?;
283
284        // Spawn lease background cleanup task (if TTL feature is enabled)
285        // Framework-level feature: completely transparent to developers
286        let lease_cleanup_handle = if node_config.raft.state_machine.lease.enabled {
287            info!(
288                "Starting lease background cleanup worker (interval: {}ms)",
289                node_config.raft.state_machine.lease.cleanup_interval_ms
290            );
291            Some(Self::spawn_background_cleanup_worker(
292                Arc::clone(&state_machine),
293                node_config.raft.state_machine.lease.cleanup_interval_ms,
294                self.shutdown_signal.clone(),
295            ))
296        } else {
297            debug!("Lease feature disabled: no background cleanup worker");
298            None
299        };
300
301        // Handle storage engine initialization
302        let storage_engine = self.storage_engine.take().ok_or_else(|| {
303            SystemError::NodeStartFailed(
304                "Storage engine must be set before calling build()".to_string(),
305            )
306        })?;
307
308        //Retrieve last applied index from state machine
309        let last_applied_index = state_machine.last_applied().index;
310        info!("Node startup, Last applied index: {}", last_applied_index);
311
312        // Create role channel before raft_log so log_flush_tx can be passed to start().
313        // role_rx is passed to Raft::new() below; only the creation order changes.
314        let (role_tx, role_rx) = mpsc::unbounded_channel();
315
316        let raft_log = {
317            let (log, receiver) = BufferedRaftLog::new(
318                node_id,
319                node_config.raft.persistence.clone(),
320                storage_engine.clone(),
321            );
322
323            // Start processor and get Arc-wrapped instance.
324            // Pass role_tx so batch_processor sends RoleEvent::LogFlushed after each fsync.
325            log.start(receiver, Some(role_tx.clone()))
326        };
327
328        // Peer health channels: transport fires try_send(peer_id) on stream failure/success.
329        // Bridge tasks below relay to membership so zombie detection and health recovery both work.
330        let (peer_failure_tx, peer_failure_rx) = mpsc::channel::<u32>(64);
331        let (peer_success_tx, peer_success_rx) = mpsc::channel::<u32>(64);
332        let transport = self.transport.take().unwrap_or_else(|| {
333            GrpcTransport::new_with_channels(node_id, peer_failure_tx, peer_success_tx)
334        });
335
336        let snapshot_policy = self.snapshot_policy.take().unwrap_or(LogSizePolicy::new(
337            node_config.raft.snapshot.max_log_entries_before_snapshot,
338            node_config.raft.snapshot.snapshot_cool_down_since_last_check,
339        ));
340
341        let shutdown_signal = self.shutdown_signal.clone();
342
343        // Initialize watch system (controlled by feature flag at compile time)
344        // All resource allocation is explicit and visible here (no hidden spawns)
345        #[cfg(feature = "watch")]
346        let watch_system = {
347            let (broadcast_tx, broadcast_rx) =
348                tokio::sync::broadcast::channel(node_config.raft.watch.event_queue_size);
349
350            // Create unregister channel
351            let (unregister_tx, unregister_rx) = mpsc::unbounded_channel();
352
353            // Create shared registry
354            let registry = Arc::new(WatchRegistry::new_with_limits(
355                node_config.raft.watch.watcher_buffer_size,
356                node_config.raft.watch.max_watcher_count,
357                unregister_tx,
358            ));
359
360            // Shared last_applied for Progress event revision; written by handler, read by dispatcher.
361            let last_applied_ref = Arc::new(std::sync::atomic::AtomicU64::new(last_applied_index));
362
363            // Create dispatcher
364            let dispatcher = WatchDispatcher::new(
365                Arc::clone(&registry),
366                broadcast_rx,
367                unregister_rx,
368                Arc::clone(&last_applied_ref),
369                node_config.raft.watch.heartbeat_interval_ms,
370            );
371
372            // Explicitly spawn dispatcher task (resource allocation visible)
373            let dispatcher_handle = tokio::spawn(async move {
374                dispatcher.run().await;
375            });
376
377            Some((broadcast_tx, registry, dispatcher_handle, last_applied_ref))
378        };
379
380        let state_machine_handler = self.state_machine_handler.take().unwrap_or_else(|| {
381            #[cfg(feature = "watch")]
382            let watch_event_tx = watch_system.as_ref().map(|(tx, _, _, _)| tx.clone());
383
384            // Share the registry's live prev_kv counter Arc so the handler sees real-time updates.
385            #[cfg(feature = "watch")]
386            let prev_kv_count = watch_system
387                .as_ref()
388                .map(|(_, registry, _, _)| registry.prev_kv_watcher_count_arc())
389                .unwrap_or_else(|| Arc::new(std::sync::atomic::AtomicUsize::new(0)));
390
391            #[cfg(not(feature = "watch"))]
392            let watch_event_tx: Option<
393                tokio::sync::broadcast::Sender<d_engine_proto::client::WatchResponse>,
394            > = None;
395
396            Arc::new(DefaultStateMachineHandler::new(
397                node_id,
398                last_applied_index,
399                state_machine.clone(),
400                node_config.raft.snapshot.clone(),
401                snapshot_policy,
402                #[cfg(feature = "watch")]
403                watch_event_tx,
404                #[cfg(not(feature = "watch"))]
405                watch_event_tx,
406                #[cfg(feature = "watch")]
407                prev_kv_count,
408                #[cfg(not(feature = "watch"))]
409                Arc::new(std::sync::atomic::AtomicUsize::new(0)),
410            ))
411        });
412        let (membership_inner, zombie_rx) = self.membership.take().map_or_else(
413            || {
414                RaftMembership::new(
415                    node_id,
416                    node_config.cluster.initial_cluster.clone(),
417                    node_config.clone(),
418                )
419            },
420            // Pre-built membership has no zombie_rx; create a dummy closed channel.
421            |m| {
422                let (_tx, rx) = tokio::sync::mpsc::channel(1);
423                (m, rx)
424            },
425        );
426        let membership = Arc::new(membership_inner);
427        // Capture the membership watch receiver before membership is moved into Node.
428        let membership_rx = membership.subscribe_membership();
429
430        let purge_executor = DefaultPurgeExecutor::new(raft_log.clone());
431
432        // role_tx / role_rx created earlier (before raft_log) to pass log_flush_tx to start().
433        let (event_tx, event_rx) = mpsc::channel(10240);
434        let (cmd_tx, cmd_rx) = mpsc::channel(node_config.raft.cmd_channel_capacity);
435        let event_tx_clone = event_tx.clone(); // used in commit handler
436        let role_tx_for_sm = role_tx.clone(); // used in SM worker (ApplyCompleted → P2)
437
438        // Bridge zombie signals from health monitor → role event loop.
439        // Exits naturally when zombie_tx drops with RaftMembership (channel closed → recv None).
440        // Validates each signal against the health monitor before forwarding: if the peer
441        // recovered (record_success called) after the signal was queued, drop the stale signal
442        // to prevent BatchRemove for a healthy node.
443        let role_tx_for_zombie = role_tx.clone();
444        let membership_for_zombie = Arc::clone(&membership);
445        tokio::spawn(async move {
446            let mut zombie_rx = zombie_rx;
447            while let Some(node_id) = zombie_rx.recv().await {
448                if membership_for_zombie.is_zombie_valid(node_id) {
449                    let _ = role_tx_for_zombie.send(RoleEvent::ZombieDetected(node_id));
450                }
451            }
452        });
453
454        // Bridge peer stream failures from transport → health monitor.
455        // Exits naturally when transport (and its peer_failure_tx) drops (channel closed → recv None).
456        let membership_for_peer_failure = Arc::clone(&membership);
457        tokio::spawn(async move {
458            let mut peer_failure_rx = peer_failure_rx;
459            while let Some(failed_peer_id) = peer_failure_rx.recv().await {
460                membership_for_peer_failure.on_peer_stream_failed(failed_peer_id).await;
461            }
462        });
463
464        // Bridge peer stream successes from transport → health monitor.
465        // Resets failure counter so transient errors during failover don't accumulate into false zombies.
466        let membership_for_peer_success = Arc::clone(&membership);
467        tokio::spawn(async move {
468            let mut peer_success_rx = peer_success_rx;
469            while let Some(succeeded_peer_id) = peer_success_rx.recv().await {
470                membership_for_peer_success.on_peer_stream_success(succeeded_peer_id).await;
471            }
472        });
473
474        let node_config_arc = Arc::new(node_config);
475
476        // Construct my role
477        // Role initialization flow:
478        // 1. Check if node is learner from config
479        // 2. Load persisted hard state from storage
480        // 3. Determine initial role based on cluster state
481        // 4. Inject dependencies to role state
482        let last_applied_index = Some(state_machine.last_applied().index);
483        let my_role = if node_config_arc.is_learner() {
484            RaftRole::Learner(Box::new(LearnerState::new(
485                node_id,
486                node_config_arc.clone(),
487            )))
488        } else {
489            RaftRole::Follower(Box::new(FollowerState::new(
490                node_id,
491                node_config_arc.clone(),
492                raft_log.load_hard_state().expect("Failed to load hard state"),
493                last_applied_index,
494            )))
495        };
496        let my_role_i32 = my_role.as_i32();
497        let my_current_term = my_role.current_term();
498        info!(
499            "Start node with role: {} and term: {}",
500            my_role_i32, my_current_term
501        );
502
503        // Construct raft core
504        let mut raft_core = Raft::<RaftTypeConfig<SE, SM>>::new(
505            node_id,
506            my_role,
507            RaftStorageHandles::<RaftTypeConfig<SE, SM>> {
508                raft_log,
509                state_machine: state_machine.clone(),
510            },
511            transport,
512            RaftCoreHandlers::<RaftTypeConfig<SE, SM>> {
513                election_handler: ElectionHandler::new(node_id),
514                replication_handler: ReplicationHandler::new(node_id),
515                state_machine_handler: state_machine_handler.clone(),
516                purge_executor: Arc::new(purge_executor),
517            },
518            membership.clone(),
519            SignalParams::new(
520                role_tx,
521                role_rx,
522                event_tx,
523                event_rx,
524                cmd_tx.clone(),
525                cmd_rx,
526                shutdown_signal.clone(),
527            ),
528            node_config_arc.clone(),
529        );
530
531        // Register commit event listener
532        raft_core.register_new_commit_listener(new_commit_event_tx);
533
534        // Create leader notification channel and register with Raft core
535        let leader_notifier = LeaderNotifier::new();
536        raft_core.register_leader_change_listener(leader_notifier.sender());
537
538        // Create SM Worker channel for decoupled apply operations
539        let (sm_apply_tx, sm_apply_rx) = mpsc::unbounded_channel();
540
541        // Spawn SM Worker task
542        let sm_worker = StateMachineWorker::new(
543            node_id,
544            state_machine_handler.clone(),
545            sm_apply_rx,
546            role_tx_for_sm,
547            self.shutdown_signal.clone(),
548        );
549        let sm_worker_handle = Self::spawn_state_machine_worker(sm_worker);
550
551        // Start CommitHandler in a single thread
552        let deps = CommitHandlerDependencies {
553            state_machine_handler,
554            raft_log: raft_core.ctx.storage.raft_log.clone(),
555            membership: membership.clone(),
556            event_tx: event_tx_clone,
557            sm_apply_tx,
558            shutdown_signal,
559            max_batch_size: raft_core.ctx.node_config.raft.batching.max_batch_size,
560        };
561
562        let commit_handler = DefaultCommitHandler::<RaftTypeConfig<SE, SM>>::new(
563            node_id,
564            my_role_i32,
565            my_current_term,
566            deps,
567            new_commit_event_rx,
568        );
569        // Spawn commit listener via Builder method
570        // This ensures all tokio::spawn calls are visible in one place (Builder impl)
571        // Following the "one page visible" Builder pattern principle
572        let commit_handler_handle = Self::spawn_state_machine_commit_listener(commit_handler);
573
574        let event_tx = raft_core.event_sender();
575        let (rpc_ready_tx, _rpc_ready_rx) = watch::channel(false);
576
577        let node = Node::<RaftTypeConfig<SE, SM>> {
578            node_id,
579            raft_core: Arc::new(Mutex::new(raft_core)),
580            membership,
581            event_tx: event_tx.clone(),
582            cmd_tx,
583            ready: AtomicBool::new(false),
584            rpc_ready_tx,
585            leader_notifier,
586            membership_rx,
587            node_config: node_config_arc,
588            #[cfg(feature = "watch")]
589            watch_registry: watch_system.as_ref().map(|(_, reg, _, _)| Arc::clone(reg)),
590            #[cfg(feature = "watch")]
591            _watch_dispatcher_handle: watch_system.map(|(_, _, handle, _)| handle),
592            sm_worker_handle: std::sync::Mutex::new(Some(sm_worker_handle)),
593            _commit_handler_handle: Some(commit_handler_handle),
594            _lease_cleanup_handle: lease_cleanup_handle,
595            shutdown_signal: self.shutdown_signal.clone(),
596        };
597
598        self.node = Some(Arc::new(node));
599        Ok(self)
600    }
601
602    /// Spawn state machine commit listener as background task.
603    ///
604    /// This method is called during node build() to start the commit handler thread.
605    /// All spawn_* methods are centralized in NodeBuilder so developers can see
606    /// all resource consumption (threads/tasks) in one place.
607    ///
608    /// # Returns
609    /// * `JoinHandle` - Task handle for lifecycle management
610    fn spawn_state_machine_commit_listener(
611        mut commit_handler: DefaultCommitHandler<RaftTypeConfig<SE, SM>>
612    ) -> tokio::task::JoinHandle<()> {
613        tokio::spawn(async move {
614            match commit_handler.run().await {
615                Ok(_) => {
616                    info!("commit_handler exit program");
617                }
618                Err(e) => {
619                    error!("commit_handler exit program with unexpected error: {:?}", e);
620                    println!("commit_handler exit program");
621                }
622            }
623        })
624    }
625
626    /// Spawn state machine worker on a dedicated OS thread.
627    ///
628    /// Runs on `sm-apply-{node_id}` OS thread (not a tokio worker) so that
629    /// synchronous RocksDB writes inside `apply_chunk` never block the async runtime
630    /// or compete with tokio's blocking thread pool.
631    ///
632    /// # Returns
633    /// * `JoinHandle` - Thread handle for lifecycle management
634    fn spawn_state_machine_worker(
635        sm_worker: StateMachineWorker<RaftTypeConfig<SE, SM>>
636    ) -> std::thread::JoinHandle<()> {
637        let node_id = sm_worker.node_id();
638        let handle = tokio::runtime::Handle::current();
639        std::thread::Builder::new()
640            .name(format!("sm-apply-{}", node_id))
641            .spawn(move || {
642                handle.block_on(async move {
643                    match sm_worker.run().await {
644                        Ok(_) => info!("state_machine_worker exit"),
645                        Err(e) => error!("state_machine_worker exit with error: {:?}", e),
646                    }
647                });
648            })
649            .expect("failed to spawn sm-apply thread")
650    }
651
652    /// Spawn lease background cleanup task (if enabled).
653    ///
654    /// This task runs independently from Raft apply pipeline, avoiding any blocking.
655    /// Only spawns when cleanup_strategy = "background" (not "disabled" or "piggyback").
656    ///
657    /// # Arguments
658    /// * `state_machine` - Arc reference to state machine for accessing lease manager
659    /// * `lease_config` - Lease configuration determining cleanup behavior
660    /// * `shutdown_signal` - Watch channel for graceful shutdown notification
661    ///
662    /// # Returns
663    /// * `Option<JoinHandle>` - Task handle if background cleanup is enabled, None otherwise
664    ///
665    /// # Design Principles
666    /// - **Zero overhead**: If disabled, returns None immediately (no task spawned)
667    /// - **One page visible**: All long-running tasks spawned here in builder.rs
668    /// - **Graceful shutdown**: Monitors shutdown signal for clean termination
669    fn spawn_background_cleanup_worker(
670        state_machine: Arc<SM>,
671        cleanup_interval_ms: u64,
672        mut shutdown_signal: watch::Receiver<()>,
673    ) -> tokio::task::JoinHandle<()> {
674        tokio::spawn(async move {
675            let mut interval =
676                tokio::time::interval(std::time::Duration::from_millis(cleanup_interval_ms));
677
678            loop {
679                tokio::select! {
680                    _ = interval.tick() => {
681                        // Call state machine's lease background cleanup
682                        match state_machine.lease_background_cleanup().await {
683                            Ok(deleted_keys) => {
684                                if !deleted_keys.is_empty() {
685                                    debug!(
686                                        "Lease background cleanup: deleted {} expired keys",
687                                        deleted_keys.len()
688                                    );
689                                }
690                            }
691                            Err(e) => {
692                                error!("Lease background cleanup failed: {:?}", e);
693                            }
694                        }
695                    }
696                    _ = shutdown_signal.changed() => {
697                        info!("Lease background cleanup received shutdown signal");
698                        break;
699                    }
700                }
701            }
702
703            debug!("Lease background cleanup worker stopped");
704        })
705    }
706
707    /// Sets a custom state machine handler implementation.
708    ///
709    /// Allows providing a custom implementation that processes committed log entries
710    /// and applies them to the state machine. If not set, a default implementation
711    /// is used during `build()`.
712    ///
713    /// # Arguments
714    /// * `handler` - custom handler implementing the `StateMachineHandler` trait
715    ///
716    /// # Notes
717    /// - The handler must be thread-safe (shared across threads via `Arc`)
718    /// - The handler must correctly handle snapshot creation and restoration
719    pub fn with_custom_state_machine_handler(
720        mut self,
721        handler: Arc<SMHOF<RaftTypeConfig<SE, SM>>>,
722    ) -> Self {
723        self.state_machine_handler = Some(handler);
724        self
725    }
726
727    /// Starts the gRPC server for cluster communication.
728    ///
729    /// # Panics
730    /// Panics if node hasn't been built or address binding fails
731    async fn start_rpc_server(self) -> Self {
732        debug!("1. --- start RPC server --- ");
733        if let Some(ref node) = self.node {
734            let node_clone = node.clone();
735            let shutdown = self.shutdown_signal.clone();
736            let listen_address = self.node_config.cluster.listen_address;
737            let node_config = self.node_config.clone();
738            tokio::spawn(async move {
739                if let Err(e) =
740                    grpc::start_rpc_server(node_clone, listen_address, node_config, shutdown).await
741                {
742                    eprintln!("RPC server stops. {e:?}");
743                    error!("RPC server stops. {:?}", e);
744                }
745            });
746            self
747        } else {
748            panic!("failed to start RPC server");
749        }
750    }
751
752    /// Builds and starts the Raft node.
753    ///
754    /// This is the primary method to initialize and start a node. It performs:
755    /// 1. State machine initialization (including lease injection if applicable)
756    /// 2. Raft core construction
757    /// 3. Background task spawning (commit handler, replication, election)
758    /// 4. gRPC server startup for cluster communication
759    ///
760    /// # Returns
761    /// An `Arc<Node>` ready for operation
762    ///
763    /// # Errors
764    /// Returns an error if any initialization step fails
765    ///
766    /// # Example
767    /// ```ignore
768    /// let node = NodeBuilder::from_node_config(config, shutdown_rx)
769    ///     .storage_engine(storage)
770    ///     .state_machine(state_machine)
771    ///     .start().await?;
772    /// ```
773    pub async fn start(self) -> Result<Arc<Node<RaftTypeConfig<SE, SM>>>> {
774        let builder = self.build().await?;
775        let builder = builder.start_rpc_server().await;
776        builder.node.ok_or_else(|| {
777            SystemError::NodeStartFailed("Node build failed unexpectedly".to_string()).into()
778        })
779    }
780
781    /// Test constructor with custom database path
782    ///
783    /// # Safety
784    /// Bypasses normal configuration validation - use for testing only
785    #[cfg(test)]
786    pub(crate) fn new_from_db_path(
787        db_path: &str,
788        shutdown_signal: watch::Receiver<()>,
789    ) -> Self {
790        use std::path::PathBuf;
791
792        let mut node_config = RaftNodeConfig::new().expect("Load node_config successfully");
793        node_config.cluster.db_root_dir = PathBuf::from(db_path);
794        let node_config = node_config.validate().expect("Validate node_config successfully");
795
796        Self::init(node_config, shutdown_signal)
797    }
798}