d_engine/node/
builder.rs

1//! A builder pattern implementation for constructing a [`Node`] instance in a
2//! Raft cluster.
3//!
4//! The [`NodeBuilder`] provides a fluent interface to configure and assemble
5//! components required by the Raft node, including storage engines, state machines,
6//! transport layers, membership management, and asynchronous handlers.
7//!
8//! ## Key Design Points
9//! - **Explicit Component Initialization**: Requires explicit configuration of storage engines and
10//!   state machines (no implicit defaults).
11//! - **Customization**: Allows overriding components via setter methods (e.g., `storage_engine()`,
12//!   `state_machine()`, `transport()`).
13//! - **Lifecycle Management**:
14//!   - `build()`: Assembles the [`Node`], initializes background tasks (e.g., [`CommitHandler`],
15//!     replication, election).
16//!   - `ready()`: Finalizes construction and returns the initialized [`Node`].
17//!   - `start_rpc_server()`: Spawns the gRPC server for cluster communication.
18//!
19//! ## Example
20//! ```ignore
21//! let (shutdown_tx, shutdown_rx) = watch::channel(());
22//! let node = NodeBuilder::new(Some("cluster_config.yaml"), shutdown_rx)
23//!     .storage_engine(custom_storage_engine)  // Required component
24//!     .state_machine(custom_state_machine)    // Required component
25//!     .build()
26//!     .start_rpc_server().await
27//!     .ready()
28//!     .unwrap();
29//! ```
30//!
31//! ## Notes
32//! - **Thread Safety**: All components wrapped in `Arc`/`Mutex` for shared ownership and thread
33//!   safety.
34//! - **Resource Cleanup**: Uses `watch::Receiver` for cooperative shutdown signaling.
35//! - **Configuration Loading**: Supports loading cluster configuration from file or in-memory
36//!   config.
37
38use std::fmt::Debug;
39use std::sync::atomic::AtomicBool;
40use std::sync::Arc;
41
42use tokio::sync::mpsc;
43use tokio::sync::watch;
44use tokio::sync::Mutex;
45use tracing::debug;
46use tracing::error;
47use tracing::info;
48
49use super::RaftTypeConfig;
50use crate::alias::MOF;
51use crate::alias::SMHOF;
52use crate::alias::SNP;
53use crate::alias::TROF;
54use crate::follower_state::FollowerState;
55use crate::grpc;
56use crate::grpc::grpc_transport::GrpcTransport;
57use crate::learner_state::LearnerState;
58use crate::BufferedRaftLog;
59use crate::ClusterConfig;
60use crate::CommitHandler;
61use crate::CommitHandlerDependencies;
62use crate::DefaultCommitHandler;
63use crate::DefaultPurgeExecutor;
64use crate::DefaultStateMachineHandler;
65use crate::ElectionHandler;
66use crate::LogSizePolicy;
67use crate::NewCommitData;
68use crate::Node;
69use crate::Raft;
70use crate::RaftConfig;
71use crate::RaftCoreHandlers;
72use crate::RaftLog;
73use crate::RaftMembership;
74use crate::RaftNodeConfig;
75use crate::RaftRole;
76use crate::RaftStorageHandles;
77use crate::ReplicationHandler;
78use crate::Result;
79use crate::SignalParams;
80use crate::StateMachine;
81use crate::StorageEngine;
82use crate::SystemError;
83
84pub enum NodeMode {
85    Joiner,
86    FullMember,
87}
88
89/// Builder pattern implementation for constructing a Raft node with configurable components.
90/// Provides a fluent interface to set up node configuration, storage, transport, and other
91/// dependencies.
92pub struct NodeBuilder<SE, SM>
93where
94    SE: StorageEngine + Debug,
95    SM: StateMachine + Debug,
96{
97    node_id: u32,
98
99    pub(super) node_config: RaftNodeConfig,
100    pub(super) storage_engine: Option<Arc<SE>>,
101    pub(super) membership: Option<MOF<RaftTypeConfig<SE, SM>>>,
102    pub(super) state_machine: Option<Arc<SM>>,
103    pub(super) transport: Option<TROF<RaftTypeConfig<SE, SM>>>,
104    pub(super) state_machine_handler: Option<Arc<SMHOF<RaftTypeConfig<SE, SM>>>>,
105    pub(super) snapshot_policy: Option<SNP<RaftTypeConfig<SE, SM>>>,
106    pub(super) shutdown_signal: watch::Receiver<()>,
107
108    pub(super) node: Option<Arc<Node<RaftTypeConfig<SE, SM>>>>,
109}
110
111impl<SE, SM> NodeBuilder<SE, SM>
112where
113    SE: StorageEngine + Debug,
114    SM: StateMachine + Debug,
115{
116    /// Creates a new NodeBuilder with cluster configuration loaded from file
117    ///
118    /// # Arguments
119    /// * `cluster_path` - Optional path to node-specific cluster configuration
120    /// * `shutdown_signal` - Watch channel for graceful shutdown signaling
121    ///
122    /// # Panics
123    /// Will panic if configuration loading fails (consider returning Result
124    /// instead)
125    pub fn new(
126        cluster_path: Option<&str>,
127        shutdown_signal: watch::Receiver<()>,
128    ) -> Self {
129        let mut node_config = RaftNodeConfig::new().expect("Load node_config successfully");
130        if let Some(p) = cluster_path {
131            info!("with_override_config from: {}", &p);
132            node_config = node_config
133                .with_override_config(p)
134                .expect("Overwrite node_config successfully.");
135        }
136
137        Self::init(node_config, shutdown_signal)
138    }
139
140    /// Constructs NodeBuilder from in-memory cluster configuration
141    ///
142    /// # Arguments
143    /// * `cluster_config` - Pre-built cluster configuration
144    /// * `shutdown_signal` - Graceful shutdown notification channel
145    ///
146    /// # Usage
147    /// ```ignore
148    /// let builder = NodeBuilder::from_cluster_config(my_config, shutdown_rx);
149    /// ```
150    pub fn from_cluster_config(
151        cluster_config: ClusterConfig,
152        shutdown_signal: watch::Receiver<()>,
153    ) -> Self {
154        let mut node_config = RaftNodeConfig::new().expect("Load node_config successfully!");
155        node_config.cluster = cluster_config;
156        Self::init(node_config, shutdown_signal)
157    }
158
159    /// Core initialization logic shared by all construction paths
160    pub fn init(
161        node_config: RaftNodeConfig,
162        shutdown_signal: watch::Receiver<()>,
163    ) -> Self {
164        Self {
165            node_id: node_config.cluster.node_id,
166            storage_engine: None,
167            state_machine: None,
168            transport: None,
169            membership: None,
170            node_config,
171            shutdown_signal,
172            state_machine_handler: None,
173            snapshot_policy: None,
174            node: None,
175        }
176    }
177
178    /// Sets a custom storage engine implementation
179    pub fn storage_engine(
180        mut self,
181        storage_engine: Arc<SE>,
182    ) -> Self {
183        self.storage_engine = Some(storage_engine);
184        self
185    }
186
187    /// Sets a custom state machine implementation
188    pub fn state_machine(
189        mut self,
190        state_machine: Arc<SM>,
191    ) -> Self {
192        self.state_machine = Some(state_machine);
193        self
194    }
195
196    /// Replaces the entire node configuration
197    pub fn node_config(
198        mut self,
199        node_config: RaftNodeConfig,
200    ) -> Self {
201        self.node_config = node_config;
202        self
203    }
204
205    /// Replaces the raft  configuration
206    pub fn raft_config(
207        mut self,
208        config: RaftConfig,
209    ) -> Self {
210        self.node_config.raft = config;
211        self
212    }
213
214    /// Finalizes the builder and constructs the Raft node instance.
215    ///
216    /// Initializes default implementations for any unconfigured components:
217    /// - Creates file-based databases for state machine and logs
218    /// - Sets up default gRPC transport
219    /// - Initializes commit handling subsystem
220    /// - Configures membership management
221    ///
222    /// # Panics
223    /// Panics if essential components cannot be initialized
224    pub fn build(mut self) -> Self {
225        let node_id = self.node_id;
226        let node_config = self.node_config.clone();
227        // let db_root_dir = format!("{}/{}", node_config.cluster.db_root_dir.display(), node_id);
228
229        // Init CommitHandler
230        let (new_commit_event_tx, new_commit_event_rx) = mpsc::unbounded_channel::<NewCommitData>();
231
232        // Handle state machine initialization
233        let state_machine = self.state_machine.take().expect("State machine must be set");
234
235        // Handle storage engine initialization
236        let storage_engine = self.storage_engine.take().expect("Storage engine must be set");
237
238        //Retrieve last applied index from state machine
239        let last_applied_index = state_machine.last_applied().index;
240        let raft_log = {
241            let (log, receiver) = BufferedRaftLog::new(
242                node_id,
243                node_config.raft.persistence.clone(),
244                storage_engine.clone(),
245            );
246
247            // Start processor and get Arc-wrapped instance
248            log.start(receiver)
249        };
250
251        let transport = self.transport.take().unwrap_or(GrpcTransport::new(node_id));
252
253        let snapshot_policy = self.snapshot_policy.take().unwrap_or(LogSizePolicy::new(
254            node_config.raft.snapshot.max_log_entries_before_snapshot,
255            node_config.raft.snapshot.snapshot_cool_down_since_last_check,
256        ));
257
258        let state_machine_handler = self.state_machine_handler.take().unwrap_or_else(|| {
259            Arc::new(DefaultStateMachineHandler::new(
260                node_id,
261                last_applied_index,
262                node_config.raft.commit_handler.max_entries_per_chunk,
263                state_machine.clone(),
264                node_config.raft.snapshot.clone(),
265                snapshot_policy,
266            ))
267        });
268        let membership = Arc::new(self.membership.take().unwrap_or_else(|| {
269            RaftMembership::new(
270                node_id,
271                node_config.cluster.initial_cluster.clone(),
272                node_config.clone(),
273            )
274        }));
275
276        let purge_executor = DefaultPurgeExecutor::new(raft_log.clone());
277
278        let (role_tx, role_rx) = mpsc::unbounded_channel();
279        let (event_tx, event_rx) = mpsc::channel(10240);
280        let event_tx_clone = event_tx.clone(); // used in commit handler
281
282        let shutdown_signal = self.shutdown_signal.clone();
283        let node_config_arc = Arc::new(node_config);
284
285        // Construct my role
286        // Role initialization flow:
287        // 1. Check joining status from node config
288        // 2. Load persisted hard state from storage
289        // 3. Determine initial role based on cluster state
290        // 4. Inject dependencies to role state
291        let last_applied_index = Some(state_machine.last_applied().index);
292        let my_role = if node_config_arc.is_joining() {
293            RaftRole::Learner(Box::new(LearnerState::new(
294                node_id,
295                node_config_arc.clone(),
296            )))
297        } else {
298            RaftRole::Follower(Box::new(FollowerState::new(
299                node_id,
300                node_config_arc.clone(),
301                raft_log.load_hard_state().expect("Failed to load hard state"),
302                last_applied_index,
303            )))
304        };
305        let my_role_i32 = my_role.as_i32();
306        let my_current_term = my_role.current_term();
307        info!(
308            "Start node with role: {} and term: {}",
309            my_role_i32, my_current_term
310        );
311
312        // Construct raft core
313        let mut raft_core = Raft::<RaftTypeConfig<SE, SM>>::new(
314            node_id,
315            my_role,
316            RaftStorageHandles::<RaftTypeConfig<SE, SM>> {
317                raft_log,
318                state_machine: state_machine.clone(),
319            },
320            transport,
321            RaftCoreHandlers::<RaftTypeConfig<SE, SM>> {
322                election_handler: ElectionHandler::new(node_id),
323                replication_handler: ReplicationHandler::new(node_id),
324                state_machine_handler: state_machine_handler.clone(),
325                purge_executor: Arc::new(purge_executor),
326            },
327            membership.clone(),
328            SignalParams {
329                role_tx,
330                role_rx,
331                event_tx,
332                event_rx,
333                shutdown_signal: shutdown_signal.clone(),
334            },
335            node_config_arc.clone(),
336        );
337
338        // Register commit event listener
339        raft_core.register_new_commit_listener(new_commit_event_tx);
340
341        // Start CommitHandler in a single thread
342        let deps = CommitHandlerDependencies {
343            state_machine_handler,
344            raft_log: raft_core.ctx.storage.raft_log.clone(),
345            membership: membership.clone(),
346            event_tx: event_tx_clone,
347            shutdown_signal,
348        };
349
350        let commit_handler = DefaultCommitHandler::<RaftTypeConfig<SE, SM>>::new(
351            node_id,
352            my_role_i32,
353            my_current_term,
354            deps,
355            node_config_arc.clone(),
356            new_commit_event_rx,
357        );
358        self.enable_state_machine_commit_listener(commit_handler);
359
360        let event_tx = raft_core.event_tx.clone();
361        let node = Node::<RaftTypeConfig<SE, SM>> {
362            node_id,
363            raft_core: Arc::new(Mutex::new(raft_core)),
364            membership,
365            event_tx: event_tx.clone(),
366            ready: AtomicBool::new(false),
367            node_config: node_config_arc,
368        };
369
370        self.node = Some(Arc::new(node));
371        self
372    }
373
374    /// When a new commit is detected, convert the log into a state machine log.
375    fn enable_state_machine_commit_listener(
376        &self,
377        mut commit_handler: DefaultCommitHandler<RaftTypeConfig<SE, SM>>,
378    ) {
379        tokio::spawn(async move {
380            match commit_handler.run().await {
381                Ok(_) => {
382                    info!("commit_handler exit program");
383                }
384                Err(e) => {
385                    error!("commit_handler exit program with unpexected error: {:?}", e);
386                    println!("commit_handler exit program");
387                }
388            }
389        });
390    }
391
392    /// Sets a custom state machine handler implementation.
393    ///
394    /// This allows developers to provide their own implementation of the state machine handler
395    /// which processes committed log entries and applies them to the state machine.
396    ///
397    /// # Arguments
398    /// * `handler` - custom state machine handler that must implement the `StateMachineHandler`
399    ///   trait
400    ///
401    /// # Notes
402    /// - The handler must be thread-safe as it will be shared across multiple threads
403    /// - If not set, a default implementation will be used during `build()`
404    /// - The handler should properly handle snapshot creation and restoration
405    pub fn with_custom_state_machine_handler(
406        mut self,
407        handler: Arc<SMHOF<RaftTypeConfig<SE, SM>>>,
408    ) -> Self {
409        self.state_machine_handler = Some(handler);
410        self
411    }
412
413    pub fn set_snapshot_policy(
414        mut self,
415        snapshot_policy: SNP<RaftTypeConfig<SE, SM>>,
416    ) -> Self {
417        self.snapshot_policy = Some(snapshot_policy);
418        self
419    }
420
421    /// Starts the gRPC server for cluster communication.
422    ///
423    /// # Panics
424    /// Panics if node hasn't been built or address binding fails
425    pub async fn start_rpc_server(self) -> Self {
426        debug!("1. --- start RPC server --- ");
427        if let Some(ref node) = self.node {
428            let node_clone = node.clone();
429            let shutdown = self.shutdown_signal.clone();
430            let listen_address = self.node_config.cluster.listen_address;
431            let node_config = self.node_config.clone();
432            tokio::spawn(async move {
433                if let Err(e) =
434                    grpc::start_rpc_server(node_clone, listen_address, node_config, shutdown).await
435                {
436                    eprintln!("RPC server stops. {e:?}");
437                    error!("RPC server stops. {:?}", e);
438                }
439            });
440            self
441        } else {
442            panic!("failed to start RPC server");
443        }
444    }
445
446    /// Returns the built node instance after successful construction.
447    ///
448    /// # Errors
449    /// Returns Error::NodeFailedToStartError if build hasn't completed
450    pub fn ready(self) -> Result<Arc<Node<RaftTypeConfig<SE, SM>>>> {
451        self.node.ok_or_else(|| {
452            SystemError::NodeStartFailed("check node ready failed".to_string()).into()
453        })
454    }
455
456    /// Test constructor with custom database path
457    ///
458    /// # Safety
459    /// Bypasses normal configuration validation - use for testing only
460    #[cfg(test)]
461    pub fn new_from_db_path(
462        db_path: &str,
463        shutdown_signal: watch::Receiver<()>,
464    ) -> Self {
465        use std::path::PathBuf;
466
467        let mut node_config = RaftNodeConfig::new().expect("Load node_config successfully!");
468        node_config.cluster.db_root_dir = PathBuf::from(db_path);
469
470        Self::init(node_config, shutdown_signal)
471    }
472}