d_engine_server/node/
builder.rs

1//! A builder pattern implementation for constructing a [`Node`] instance in a
2//! Raft cluster.
3//!
4//! The [`NodeBuilder`] provides a fluent interface to configure and assemble
5//! components required by the Raft node, including storage engines, state machines,
6//! transport layers, membership management, and asynchronous handlers.
7//!
8//! ## Key Design Points
9//! - **Explicit Component Initialization**: Requires explicit configuration of storage engines and
10//!   state machines (no implicit defaults).
11//! - **Customization**: Allows overriding components via setter methods (e.g., `storage_engine()`,
12//!   `state_machine()`, `transport()`).
13//! - **Simple API**: Single `start().await?` method to build and launch the node.
14//!
15//! ## Example
16//! ```ignore
17//! let (shutdown_tx, shutdown_rx) = watch::channel(());
18//! let node = NodeBuilder::init(config, shutdown_rx)
19//!     .storage_engine(custom_storage_engine)  // Required component
20//!     .state_machine(custom_state_machine)    // Required component
21//!     .start().await?;
22//! ```
23//!
24//! ## Notes
25//! - **Thread Safety**: All components wrapped in `Arc`/`Mutex` for shared ownership and thread
26//!   safety.
27//! - **Resource Cleanup**: Uses `watch::Receiver` for cooperative shutdown signaling.
28//! - **Configuration Loading**: Supports loading cluster configuration from file or in-memory
29//!   config.
30
31use std::fmt::Debug;
32use std::sync::Arc;
33use std::sync::atomic::AtomicBool;
34
35use d_engine_core::ClusterConfig;
36use d_engine_core::CommitHandler;
37use d_engine_core::CommitHandlerDependencies;
38use d_engine_core::DefaultCommitHandler;
39use d_engine_core::DefaultPurgeExecutor;
40use d_engine_core::DefaultStateMachineHandler;
41use d_engine_core::ElectionHandler;
42use d_engine_core::LogSizePolicy;
43use d_engine_core::NewCommitData;
44use d_engine_core::Raft;
45use d_engine_core::RaftConfig;
46use d_engine_core::RaftCoreHandlers;
47use d_engine_core::RaftLog;
48use d_engine_core::RaftNodeConfig;
49use d_engine_core::RaftRole;
50use d_engine_core::RaftStorageHandles;
51use d_engine_core::ReplicationHandler;
52use d_engine_core::Result;
53use d_engine_core::SignalParams;
54use d_engine_core::StateMachine;
55use d_engine_core::StorageEngine;
56use d_engine_core::SystemError;
57use d_engine_core::alias::MOF;
58use d_engine_core::alias::SMHOF;
59use d_engine_core::alias::SNP;
60use d_engine_core::alias::TROF;
61use d_engine_core::follower_state::FollowerState;
62use d_engine_core::learner_state::LearnerState;
63#[cfg(feature = "watch")]
64use d_engine_core::watch::WatchDispatcher;
65#[cfg(feature = "watch")]
66use d_engine_core::watch::WatchRegistry;
67use tokio::sync::Mutex;
68use tokio::sync::mpsc;
69use tokio::sync::watch;
70use tracing::debug;
71use tracing::error;
72use tracing::info;
73
74use super::LeaderNotifier;
75use super::RaftTypeConfig;
76use crate::Node;
77use crate::membership::RaftMembership;
78use crate::network::grpc;
79use crate::network::grpc::grpc_transport::GrpcTransport;
80use crate::storage::BufferedRaftLog;
81
82/// Builder for creating a Raft node
83///
84/// Provides a fluent API for configuring and constructing a [`Node`].
85///
86/// # Example
87///
88/// ```rust,ignore
89/// use d_engine_server::{NodeBuilder, FileStorageEngine, FileStateMachine};
90///
91/// let node = NodeBuilder::new(None, shutdown_rx)
92///     .storage_engine(Arc::new(FileStorageEngine::new(...)?))
93///     .state_machine(Arc::new(FileStateMachine::new(...).await?))
94///     .start().await?;
95/// ```
96pub struct NodeBuilder<SE, SM>
97where
98    SE: StorageEngine + Debug,
99    SM: StateMachine + Debug,
100{
101    node_id: u32,
102
103    pub(super) node_config: RaftNodeConfig,
104    pub(super) storage_engine: Option<Arc<SE>>,
105    pub(super) membership: Option<MOF<RaftTypeConfig<SE, SM>>>,
106    pub(super) state_machine: Option<Arc<SM>>,
107    pub(super) transport: Option<TROF<RaftTypeConfig<SE, SM>>>,
108    pub(super) state_machine_handler: Option<Arc<SMHOF<RaftTypeConfig<SE, SM>>>>,
109    pub(super) snapshot_policy: Option<SNP<RaftTypeConfig<SE, SM>>>,
110    pub(super) shutdown_signal: watch::Receiver<()>,
111
112    pub(super) node: Option<Arc<Node<RaftTypeConfig<SE, SM>>>>,
113}
114
115impl<SE, SM> NodeBuilder<SE, SM>
116where
117    SE: StorageEngine + Debug,
118    SM: StateMachine + Debug,
119{
120    /// Creates a new NodeBuilder with cluster configuration loaded from file
121    ///
122    /// # Arguments
123    /// * `cluster_path` - Optional path to node-specific cluster configuration
124    /// * `shutdown_signal` - Watch channel for graceful shutdown signaling
125    ///
126    /// # Panics
127    /// Will panic if configuration loading fails (consider returning Result
128    /// instead)
129    pub fn new(
130        cluster_path: Option<&str>,
131        shutdown_signal: watch::Receiver<()>,
132    ) -> Self {
133        let node_config = if let Some(p) = cluster_path {
134            info!("with_override_config from: {}", &p);
135            RaftNodeConfig::new()
136                .expect("Load node_config successfully")
137                .with_override_config(p)
138                .expect("Overwrite node_config successfully")
139                .validate()
140                .expect("Validate node_config successfully")
141        } else {
142            RaftNodeConfig::new()
143                .expect("Load node_config successfully")
144                .validate()
145                .expect("Validate node_config successfully")
146        };
147
148        Self::init(node_config, shutdown_signal)
149    }
150
151    /// Constructs NodeBuilder from in-memory cluster configuration
152    ///
153    /// # Arguments
154    /// * `cluster_config` - Pre-built cluster configuration
155    /// * `shutdown_signal` - Graceful shutdown notification channel
156    ///
157    /// # Usage
158    /// ```ignore
159    /// let builder = NodeBuilder::from_cluster_config(my_config, shutdown_rx);
160    /// ```
161    pub fn from_cluster_config(
162        cluster_config: ClusterConfig,
163        shutdown_signal: watch::Receiver<()>,
164    ) -> Self {
165        let mut node_config = RaftNodeConfig::new().expect("Load node_config successfully");
166        node_config.cluster = cluster_config;
167        let node_config = node_config.validate().expect("Validate node_config successfully");
168        Self::init(node_config, shutdown_signal)
169    }
170
171    /// Core initialization logic shared by all construction paths
172    pub fn init(
173        node_config: RaftNodeConfig,
174        shutdown_signal: watch::Receiver<()>,
175    ) -> Self {
176        Self {
177            node_id: node_config.cluster.node_id,
178            storage_engine: None,
179            state_machine: None,
180            transport: None,
181            membership: None,
182            node_config,
183            shutdown_signal,
184            state_machine_handler: None,
185            snapshot_policy: None,
186            node: None,
187        }
188    }
189
190    /// Sets a custom storage engine implementation
191    pub fn storage_engine(
192        mut self,
193        storage_engine: Arc<SE>,
194    ) -> Self {
195        self.storage_engine = Some(storage_engine);
196        self
197    }
198
199    /// Sets a custom state machine implementation
200    pub fn state_machine(
201        mut self,
202        state_machine: Arc<SM>,
203    ) -> Self {
204        self.state_machine = Some(state_machine);
205        self
206    }
207
208    /// Replaces the entire node configuration
209    pub fn node_config(
210        mut self,
211        node_config: RaftNodeConfig,
212    ) -> Self {
213        self.node_config = node_config;
214        self
215    }
216
217    /// Replaces the raft  configuration
218    pub fn raft_config(
219        mut self,
220        config: RaftConfig,
221    ) -> Self {
222        self.node_config.raft = config;
223        self
224    }
225
226    /// Finalizes the builder and constructs the Raft node instance.
227    ///
228    /// Initializes default implementations for any unconfigured components:
229    /// - Creates file-based databases for state machine and logs
230    /// - Sets up default gRPC transport
231    /// - Initializes commit handling subsystem
232    /// - Configures membership management
233    ///
234    /// # Panics
235    /// Panics if essential components cannot be initialized
236    pub async fn build(mut self) -> Result<Self> {
237        let node_id = self.node_id;
238        let node_config = self.node_config.clone();
239
240        // Init CommitHandler
241        let (new_commit_event_tx, new_commit_event_rx) = mpsc::unbounded_channel::<NewCommitData>();
242
243        // Handle state machine initialization
244        let state_machine = self.state_machine.take().ok_or_else(|| {
245            SystemError::NodeStartFailed(
246                "State machine must be set before calling build()".to_string(),
247            )
248        })?;
249
250        // Note: Lease configuration should be injected BEFORE wrapping in Arc.
251        // See StandaloneServer::start() or EmbeddedEngine::with_rocksdb() for correct pattern.
252        // If state_machine is passed from user code, they are responsible for lease injection.
253
254        // Start state machine: flip flags and load persisted data
255        state_machine.start().await?;
256
257        // Spawn lease background cleanup task (if TTL feature is enabled)
258        // Framework-level feature: completely transparent to developers
259        let lease_cleanup_handle = if node_config.raft.state_machine.lease.enabled {
260            info!(
261                "Starting lease background cleanup worker (interval: {}ms)",
262                node_config.raft.state_machine.lease.interval_ms
263            );
264            Some(Self::spawn_background_cleanup_worker(
265                Arc::clone(&state_machine),
266                node_config.raft.state_machine.lease.interval_ms,
267                self.shutdown_signal.clone(),
268            ))
269        } else {
270            debug!("Lease feature disabled: no background cleanup worker");
271            None
272        };
273
274        // Handle storage engine initialization
275        let storage_engine = self.storage_engine.take().ok_or_else(|| {
276            SystemError::NodeStartFailed(
277                "Storage engine must be set before calling build()".to_string(),
278            )
279        })?;
280
281        //Retrieve last applied index from state machine
282        let last_applied_index = state_machine.last_applied().index;
283        info!("Node startup, Last applied index: {}", last_applied_index);
284        let raft_log = {
285            let (log, receiver) = BufferedRaftLog::new(
286                node_id,
287                node_config.raft.persistence.clone(),
288                storage_engine.clone(),
289            );
290
291            // Start processor and get Arc-wrapped instance
292            log.start(receiver)
293        };
294
295        let transport = self.transport.take().unwrap_or(GrpcTransport::new(node_id));
296
297        let snapshot_policy = self.snapshot_policy.take().unwrap_or(LogSizePolicy::new(
298            node_config.raft.snapshot.max_log_entries_before_snapshot,
299            node_config.raft.snapshot.snapshot_cool_down_since_last_check,
300        ));
301
302        let shutdown_signal = self.shutdown_signal.clone();
303
304        // Initialize watch system (controlled by feature flag at compile time)
305        // All resource allocation is explicit and visible here (no hidden spawns)
306        #[cfg(feature = "watch")]
307        let watch_system = {
308            let (broadcast_tx, broadcast_rx) =
309                tokio::sync::broadcast::channel(node_config.raft.watch.event_queue_size);
310
311            // Create unregister channel
312            let (unregister_tx, unregister_rx) = mpsc::unbounded_channel();
313
314            // Create shared registry
315            let registry = Arc::new(WatchRegistry::new(
316                node_config.raft.watch.watcher_buffer_size,
317                unregister_tx,
318            ));
319
320            // Create dispatcher
321            let dispatcher =
322                WatchDispatcher::new(Arc::clone(&registry), broadcast_rx, unregister_rx);
323
324            // Explicitly spawn dispatcher task (resource allocation visible)
325            let dispatcher_handle = tokio::spawn(async move {
326                dispatcher.run().await;
327            });
328
329            Some((broadcast_tx, registry, dispatcher_handle))
330        };
331
332        let state_machine_handler = self.state_machine_handler.take().unwrap_or_else(|| {
333            #[cfg(feature = "watch")]
334            let watch_event_tx = watch_system.as_ref().map(|(tx, _, _)| tx.clone());
335            #[cfg(not(feature = "watch"))]
336            let watch_event_tx = None;
337
338            Arc::new(DefaultStateMachineHandler::new(
339                node_id,
340                last_applied_index,
341                node_config.raft.commit_handler.max_entries_per_chunk,
342                state_machine.clone(),
343                node_config.raft.snapshot.clone(),
344                snapshot_policy,
345                watch_event_tx,
346            ))
347        });
348        let membership = Arc::new(self.membership.take().unwrap_or_else(|| {
349            RaftMembership::new(
350                node_id,
351                node_config.cluster.initial_cluster.clone(),
352                node_config.clone(),
353            )
354        }));
355
356        let purge_executor = DefaultPurgeExecutor::new(raft_log.clone());
357
358        let (role_tx, role_rx) = mpsc::unbounded_channel();
359        let (event_tx, event_rx) = mpsc::channel(10240);
360        let event_tx_clone = event_tx.clone(); // used in commit handler
361
362        let node_config_arc = Arc::new(node_config);
363
364        // Construct my role
365        // Role initialization flow:
366        // 1. Check if node is learner from config
367        // 2. Load persisted hard state from storage
368        // 3. Determine initial role based on cluster state
369        // 4. Inject dependencies to role state
370        let last_applied_index = Some(state_machine.last_applied().index);
371        let my_role = if node_config_arc.is_learner() {
372            RaftRole::Learner(Box::new(LearnerState::new(
373                node_id,
374                node_config_arc.clone(),
375            )))
376        } else {
377            RaftRole::Follower(Box::new(FollowerState::new(
378                node_id,
379                node_config_arc.clone(),
380                raft_log.load_hard_state().expect("Failed to load hard state"),
381                last_applied_index,
382            )))
383        };
384        let my_role_i32 = my_role.as_i32();
385        let my_current_term = my_role.current_term();
386        info!(
387            "Start node with role: {} and term: {}",
388            my_role_i32, my_current_term
389        );
390
391        // Construct raft core
392        let mut raft_core = Raft::<RaftTypeConfig<SE, SM>>::new(
393            node_id,
394            my_role,
395            RaftStorageHandles::<RaftTypeConfig<SE, SM>> {
396                raft_log,
397                state_machine: state_machine.clone(),
398            },
399            transport,
400            RaftCoreHandlers::<RaftTypeConfig<SE, SM>> {
401                election_handler: ElectionHandler::new(node_id),
402                replication_handler: ReplicationHandler::new(node_id),
403                state_machine_handler: state_machine_handler.clone(),
404                purge_executor: Arc::new(purge_executor),
405            },
406            membership.clone(),
407            SignalParams::new(
408                role_tx,
409                role_rx,
410                event_tx,
411                event_rx,
412                shutdown_signal.clone(),
413            ),
414            node_config_arc.clone(),
415        );
416
417        // Register commit event listener
418        raft_core.register_new_commit_listener(new_commit_event_tx);
419
420        // Create leader notification channel and register with Raft core
421        let leader_notifier = LeaderNotifier::new();
422        raft_core.register_leader_change_listener(leader_notifier.sender());
423
424        // Start CommitHandler in a single thread
425        let deps = CommitHandlerDependencies {
426            state_machine_handler,
427            raft_log: raft_core.ctx.storage.raft_log.clone(),
428            membership: membership.clone(),
429            event_tx: event_tx_clone,
430            shutdown_signal,
431        };
432
433        let commit_handler = DefaultCommitHandler::<RaftTypeConfig<SE, SM>>::new(
434            node_id,
435            my_role_i32,
436            my_current_term,
437            deps,
438            node_config_arc.clone(),
439            new_commit_event_rx,
440        );
441        // Spawn commit listener via Builder method
442        // This ensures all tokio::spawn calls are visible in one place (Builder impl)
443        // Following the "one page visible" Builder pattern principle
444        let commit_handler_handle = Self::spawn_state_machine_commit_listener(commit_handler);
445
446        let event_tx = raft_core.event_sender();
447        let (rpc_ready_tx, _rpc_ready_rx) = watch::channel(false);
448
449        let node = Node::<RaftTypeConfig<SE, SM>> {
450            node_id,
451            raft_core: Arc::new(Mutex::new(raft_core)),
452            membership,
453            event_tx: event_tx.clone(),
454            ready: AtomicBool::new(false),
455            rpc_ready_tx,
456            leader_notifier,
457            node_config: node_config_arc,
458            #[cfg(feature = "watch")]
459            watch_registry: watch_system.as_ref().map(|(_, reg, _)| Arc::clone(reg)),
460            #[cfg(feature = "watch")]
461            _watch_dispatcher_handle: watch_system.map(|(_, _, handle)| handle),
462            _commit_handler_handle: Some(commit_handler_handle),
463            _lease_cleanup_handle: lease_cleanup_handle,
464            shutdown_signal: self.shutdown_signal.clone(),
465        };
466
467        self.node = Some(Arc::new(node));
468        Ok(self)
469    }
470
471    /// Spawn state machine commit listener as background task.
472    ///
473    /// This method is called during node build() to start the commit handler thread.
474    /// All spawn_* methods are centralized in NodeBuilder so developers can see
475    /// all resource consumption (threads/tasks) in one place.
476    ///
477    /// # Returns
478    /// * `JoinHandle` - Task handle for lifecycle management
479    fn spawn_state_machine_commit_listener(
480        mut commit_handler: DefaultCommitHandler<RaftTypeConfig<SE, SM>>
481    ) -> tokio::task::JoinHandle<()> {
482        tokio::spawn(async move {
483            match commit_handler.run().await {
484                Ok(_) => {
485                    info!("commit_handler exit program");
486                }
487                Err(e) => {
488                    error!("commit_handler exit program with unexpected error: {:?}", e);
489                    println!("commit_handler exit program");
490                }
491            }
492        })
493    }
494
495    /// Spawn lease background cleanup task (if enabled).
496    ///
497    /// This task runs independently from Raft apply pipeline, avoiding any blocking.
498    /// Only spawns when cleanup_strategy = "background" (not "disabled" or "piggyback").
499    ///
500    /// # Arguments
501    /// * `state_machine` - Arc reference to state machine for accessing lease manager
502    /// * `lease_config` - Lease configuration determining cleanup behavior
503    /// * `shutdown_signal` - Watch channel for graceful shutdown notification
504    ///
505    /// # Returns
506    /// * `Option<JoinHandle>` - Task handle if background cleanup is enabled, None otherwise
507    ///
508    /// # Design Principles
509    /// - **Zero overhead**: If disabled, returns None immediately (no task spawned)
510    /// - **One page visible**: All long-running tasks spawned here in builder.rs
511    /// - **Graceful shutdown**: Monitors shutdown signal for clean termination
512    fn spawn_background_cleanup_worker(
513        state_machine: Arc<SM>,
514        interval_ms: u64,
515        mut shutdown_signal: watch::Receiver<()>,
516    ) -> tokio::task::JoinHandle<()> {
517        tokio::spawn(async move {
518            let mut interval = tokio::time::interval(std::time::Duration::from_millis(interval_ms));
519
520            loop {
521                tokio::select! {
522                    _ = interval.tick() => {
523                        // Call state machine's lease background cleanup
524                        match state_machine.lease_background_cleanup().await {
525                            Ok(deleted_keys) => {
526                                if !deleted_keys.is_empty() {
527                                    debug!(
528                                        "Lease background cleanup: deleted {} expired keys",
529                                        deleted_keys.len()
530                                    );
531                                }
532                            }
533                            Err(e) => {
534                                error!("Lease background cleanup failed: {:?}", e);
535                            }
536                        }
537                    }
538                    _ = shutdown_signal.changed() => {
539                        info!("Lease background cleanup received shutdown signal");
540                        break;
541                    }
542                }
543            }
544
545            debug!("Lease background cleanup worker stopped");
546        })
547    }
548
549    /// Spawn watch dispatcher as background task.
550    ///
551    /// The dispatcher manages all watch streams for the lifetime of the node.
552    /// Sets a custom state machine handler implementation.
553    ///
554    /// This allows developers to provide their own implementation of the state machine handler
555    /// which processes committed log entries and applies them to the state machine.
556    ///
557    /// # Arguments
558    /// * `handler` - custom state machine handler that must implement the `StateMachineHandler`
559    ///   trait
560    ///
561    /// # Notes
562    /// - The handler must be thread-safe as it will be shared across multiple threads
563    /// - If not set, a default implementation will be used during `build()`
564    /// - The handler should properly handle snapshot creation and restoration
565    pub fn with_custom_state_machine_handler(
566        mut self,
567        handler: Arc<SMHOF<RaftTypeConfig<SE, SM>>>,
568    ) -> Self {
569        self.state_machine_handler = Some(handler);
570        self
571    }
572
573    /// Starts the gRPC server for cluster communication.
574    ///
575    /// # Panics
576    /// Panics if node hasn't been built or address binding fails
577    async fn start_rpc_server(self) -> Self {
578        debug!("1. --- start RPC server --- ");
579        if let Some(ref node) = self.node {
580            let node_clone = node.clone();
581            let shutdown = self.shutdown_signal.clone();
582            let listen_address = self.node_config.cluster.listen_address;
583            let node_config = self.node_config.clone();
584            tokio::spawn(async move {
585                if let Err(e) =
586                    grpc::start_rpc_server(node_clone, listen_address, node_config, shutdown).await
587                {
588                    eprintln!("RPC server stops. {e:?}");
589                    error!("RPC server stops. {:?}", e);
590                }
591            });
592            self
593        } else {
594            panic!("failed to start RPC server");
595        }
596    }
597
598    /// Builds and starts the Raft node.
599    ///
600    /// This is the primary method to initialize and start a node. It performs:
601    /// 1. State machine initialization (including lease injection if applicable)
602    /// 2. Raft core construction
603    /// 3. Background task spawning (commit handler, replication, election)
604    /// 4. gRPC server startup for cluster communication
605    ///
606    /// # Returns
607    /// An `Arc<Node>` ready for operation
608    ///
609    /// # Errors
610    /// Returns an error if any initialization step fails
611    ///
612    /// # Example
613    /// ```ignore
614    /// let node = NodeBuilder::init(config, shutdown_rx)
615    ///     .storage_engine(storage)
616    ///     .state_machine(state_machine)
617    ///     .start().await?;
618    /// ```
619    pub async fn start(self) -> Result<Arc<Node<RaftTypeConfig<SE, SM>>>> {
620        let builder = self.build().await?;
621        let builder = builder.start_rpc_server().await;
622        builder.node.ok_or_else(|| {
623            SystemError::NodeStartFailed("Node build failed unexpectedly".to_string()).into()
624        })
625    }
626
627    /// Test constructor with custom database path
628    ///
629    /// # Safety
630    /// Bypasses normal configuration validation - use for testing only
631    #[cfg(test)]
632    pub(crate) fn new_from_db_path(
633        db_path: &str,
634        shutdown_signal: watch::Receiver<()>,
635    ) -> Self {
636        use std::path::PathBuf;
637
638        let mut node_config = RaftNodeConfig::new().expect("Load node_config successfully");
639        node_config.cluster.db_root_dir = PathBuf::from(db_path);
640        let node_config = node_config.validate().expect("Validate node_config successfully");
641
642        Self::init(node_config, shutdown_signal)
643    }
644}