d_engine/node/
builder.rs

1//! A builder pattern implementation for constructing a [`Node`] instance in a
2//! Raft cluster.
3//!
4//! The [`NodeBuilder`] provides a fluent interface to configure and assemble
5//! components required by the Raft node, including storage engines, state machines,
6//! transport layers, membership management, and asynchronous handlers.
7//!
8//! ## Key Design Points
9//! - **Explicit Component Initialization**: Requires explicit configuration of storage engines and state machines (no implicit defaults).
10//! - **Customization**: Allows overriding components via setter methods (e.g., `storage_engine()`, `state_machine()`, `transport()`).
11//! - **Lifecycle Management**:
12//!   - `build()`: Assembles the [`Node`], initializes background tasks (e.g., [`CommitHandler`], replication, election).
13//!   - `ready()`: Finalizes construction and returns the initialized [`Node`].
14//!   - `start_rpc_server()`: Spawns the gRPC server for cluster communication.
15//!
16//! ## Example
17//! ```ignore
18//! let (shutdown_tx, shutdown_rx) = watch::channel(());
19//! let node = NodeBuilder::new(Some("cluster_config.yaml"), shutdown_rx)
20//!     .storage_engine(custom_storage_engine)  // Required component
21//!     .state_machine(custom_state_machine)    // Required component
22//!     .build()
23//!     .start_rpc_server().await
24//!     .ready()
25//!     .unwrap();
26//! ```
27//!
28//! ## Notes
29//! - **Thread Safety**: All components wrapped in `Arc`/`Mutex` for shared ownership and thread safety.
30//! - **Resource Cleanup**: Uses `watch::Receiver` for cooperative shutdown signaling.
31//! - **Configuration Loading**: Supports loading cluster configuration from file or in-memory config.
32
33use std::fmt::Debug;
34use std::sync::atomic::AtomicBool;
35use std::sync::Arc;
36
37use tokio::sync::mpsc;
38use tokio::sync::watch;
39use tokio::sync::Mutex;
40use tracing::debug;
41use tracing::error;
42use tracing::info;
43
44use super::RaftTypeConfig;
45use crate::alias::MOF;
46use crate::alias::SMHOF;
47use crate::alias::SNP;
48use crate::alias::TROF;
49use crate::follower_state::FollowerState;
50use crate::grpc;
51use crate::grpc::grpc_transport::GrpcTransport;
52use crate::learner_state::LearnerState;
53use crate::BufferedRaftLog;
54use crate::ClusterConfig;
55use crate::CommitHandler;
56use crate::CommitHandlerDependencies;
57use crate::DefaultCommitHandler;
58use crate::DefaultPurgeExecutor;
59use crate::DefaultStateMachineHandler;
60use crate::ElectionHandler;
61use crate::LogSizePolicy;
62use crate::NewCommitData;
63use crate::Node;
64use crate::Raft;
65use crate::RaftConfig;
66use crate::RaftCoreHandlers;
67use crate::RaftLog;
68use crate::RaftMembership;
69use crate::RaftNodeConfig;
70use crate::RaftRole;
71use crate::RaftStorageHandles;
72use crate::ReplicationHandler;
73use crate::Result;
74use crate::SignalParams;
75use crate::StateMachine;
76use crate::StorageEngine;
77use crate::SystemError;
78
79pub enum NodeMode {
80    Joiner,
81    FullMember,
82}
83
84/// Builder pattern implementation for constructing a Raft node with configurable components.
85/// Provides a fluent interface to set up node configuration, storage, transport, and other
86/// dependencies.
87pub struct NodeBuilder<SE, SM>
88where
89    SE: StorageEngine + Debug,
90    SM: StateMachine + Debug,
91{
92    node_id: u32,
93
94    pub(super) node_config: RaftNodeConfig,
95    pub(super) storage_engine: Option<Arc<SE>>,
96    pub(super) membership: Option<MOF<RaftTypeConfig<SE, SM>>>,
97    pub(super) state_machine: Option<Arc<SM>>,
98    pub(super) transport: Option<TROF<RaftTypeConfig<SE, SM>>>,
99    pub(super) state_machine_handler: Option<Arc<SMHOF<RaftTypeConfig<SE, SM>>>>,
100    pub(super) snapshot_policy: Option<SNP<RaftTypeConfig<SE, SM>>>,
101    pub(super) shutdown_signal: watch::Receiver<()>,
102
103    pub(super) node: Option<Arc<Node<RaftTypeConfig<SE, SM>>>>,
104}
105
106impl<SE, SM> NodeBuilder<SE, SM>
107where
108    SE: StorageEngine + Debug,
109    SM: StateMachine + Debug,
110{
111    /// Creates a new NodeBuilder with cluster configuration loaded from file
112    ///
113    /// # Arguments
114    /// * `cluster_path` - Optional path to node-specific cluster configuration
115    /// * `shutdown_signal` - Watch channel for graceful shutdown signaling
116    ///
117    /// # Panics
118    /// Will panic if configuration loading fails (consider returning Result
119    /// instead)
120    pub fn new(
121        cluster_path: Option<&str>,
122        shutdown_signal: watch::Receiver<()>,
123    ) -> Self {
124        let mut node_config = RaftNodeConfig::new().expect("Load node_config successfully");
125        if let Some(p) = cluster_path {
126            info!("with_override_config from: {}", &p);
127            node_config = node_config
128                .with_override_config(p)
129                .expect("Overwrite node_config successfully.");
130        }
131
132        Self::init(node_config, shutdown_signal)
133    }
134
135    /// Constructs NodeBuilder from in-memory cluster configuration
136    ///
137    /// # Arguments
138    /// * `cluster_config` - Pre-built cluster configuration
139    /// * `shutdown_signal` - Graceful shutdown notification channel
140    ///
141    /// # Usage
142    /// ```ignore
143    /// let builder = NodeBuilder::from_cluster_config(my_config, shutdown_rx);
144    /// ```
145    pub fn from_cluster_config(
146        cluster_config: ClusterConfig,
147        shutdown_signal: watch::Receiver<()>,
148    ) -> Self {
149        let mut node_config = RaftNodeConfig::new().expect("Load node_config successfully!");
150        node_config.cluster = cluster_config;
151        Self::init(node_config, shutdown_signal)
152    }
153
154    /// Core initialization logic shared by all construction paths
155    pub fn init(
156        node_config: RaftNodeConfig,
157        shutdown_signal: watch::Receiver<()>,
158    ) -> Self {
159        Self {
160            node_id: node_config.cluster.node_id,
161            storage_engine: None,
162            state_machine: None,
163            transport: None,
164            membership: None,
165            node_config,
166            shutdown_signal,
167            state_machine_handler: None,
168            snapshot_policy: None,
169            node: None,
170        }
171    }
172
173    /// Sets a custom storage engine implementation
174    pub fn storage_engine(
175        mut self,
176        storage_engine: Arc<SE>,
177    ) -> Self {
178        self.storage_engine = Some(storage_engine);
179        self
180    }
181
182    /// Sets a custom state machine implementation
183    pub fn state_machine(
184        mut self,
185        state_machine: Arc<SM>,
186    ) -> Self {
187        self.state_machine = Some(state_machine);
188        self
189    }
190
191    /// Replaces the entire node configuration
192    pub fn node_config(
193        mut self,
194        node_config: RaftNodeConfig,
195    ) -> Self {
196        self.node_config = node_config;
197        self
198    }
199
200    /// Replaces the raft  configuration
201    pub fn raft_config(
202        mut self,
203        config: RaftConfig,
204    ) -> Self {
205        self.node_config.raft = config;
206        self
207    }
208
209    /// Finalizes the builder and constructs the Raft node instance.
210    ///
211    /// Initializes default implementations for any unconfigured components:
212    /// - Creates file-based databases for state machine and logs
213    /// - Sets up default gRPC transport
214    /// - Initializes commit handling subsystem
215    /// - Configures membership management
216    ///
217    /// # Panics
218    /// Panics if essential components cannot be initialized
219    pub fn build(mut self) -> Self {
220        let node_id = self.node_id;
221        let node_config = self.node_config.clone();
222        // let db_root_dir = format!("{}/{}", node_config.cluster.db_root_dir.display(), node_id);
223
224        // Init CommitHandler
225        let (new_commit_event_tx, new_commit_event_rx) = mpsc::unbounded_channel::<NewCommitData>();
226
227        // Handle state machine initialization
228        let state_machine = self.state_machine.take().expect("State machine must be set");
229
230        // Handle storage engine initialization
231        let storage_engine = self.storage_engine.take().expect("Storage engine must be set");
232
233        //Retrieve last applied index from state machine
234        let last_applied_index = state_machine.last_applied().index;
235        let raft_log = {
236            let (log, receiver) = BufferedRaftLog::new(
237                node_id,
238                node_config.raft.persistence.clone(),
239                storage_engine.clone(),
240            );
241
242            // Start processor and get Arc-wrapped instance
243            log.start(receiver)
244        };
245
246        let transport = self.transport.take().unwrap_or(GrpcTransport::new(node_id));
247
248        let snapshot_policy = self.snapshot_policy.take().unwrap_or(LogSizePolicy::new(
249            node_config.raft.snapshot.max_log_entries_before_snapshot,
250            node_config.raft.snapshot.snapshot_cool_down_since_last_check,
251        ));
252
253        let state_machine_handler = self.state_machine_handler.take().unwrap_or_else(|| {
254            Arc::new(DefaultStateMachineHandler::new(
255                node_id,
256                last_applied_index,
257                node_config.raft.commit_handler.max_entries_per_chunk,
258                state_machine.clone(),
259                node_config.raft.snapshot.clone(),
260                snapshot_policy,
261            ))
262        });
263        let membership = Arc::new(self.membership.take().unwrap_or_else(|| {
264            RaftMembership::new(
265                node_id,
266                node_config.cluster.initial_cluster.clone(),
267                node_config.clone(),
268            )
269        }));
270
271        let purge_executor = DefaultPurgeExecutor::new(raft_log.clone());
272
273        let (role_tx, role_rx) = mpsc::unbounded_channel();
274        let (event_tx, event_rx) = mpsc::channel(10240);
275        let event_tx_clone = event_tx.clone(); // used in commit handler
276
277        let shutdown_signal = self.shutdown_signal.clone();
278        let node_config_arc = Arc::new(node_config);
279
280        // Construct my role
281        // Role initialization flow:
282        // 1. Check joining status from node config
283        // 2. Load persisted hard state from storage
284        // 3. Determine initial role based on cluster state
285        // 4. Inject dependencies to role state
286        let last_applied_index = Some(state_machine.last_applied().index);
287        let my_role = if node_config_arc.is_joining() {
288            RaftRole::Learner(Box::new(LearnerState::new(
289                node_id,
290                node_config_arc.clone(),
291            )))
292        } else {
293            RaftRole::Follower(Box::new(FollowerState::new(
294                node_id,
295                node_config_arc.clone(),
296                raft_log.load_hard_state().expect("Failed to load hard state"),
297                last_applied_index,
298            )))
299        };
300        let my_role_i32 = my_role.as_i32();
301        let my_current_term = my_role.current_term();
302        info!(
303            "Start node with role: {} and term: {}",
304            my_role_i32, my_current_term
305        );
306
307        // Construct raft core
308        let mut raft_core = Raft::<RaftTypeConfig<SE, SM>>::new(
309            node_id,
310            my_role,
311            RaftStorageHandles::<RaftTypeConfig<SE, SM>> {
312                raft_log,
313                state_machine: state_machine.clone(),
314            },
315            transport,
316            RaftCoreHandlers::<RaftTypeConfig<SE, SM>> {
317                election_handler: ElectionHandler::new(node_id),
318                replication_handler: ReplicationHandler::new(node_id),
319                state_machine_handler: state_machine_handler.clone(),
320                purge_executor: Arc::new(purge_executor),
321            },
322            membership.clone(),
323            SignalParams {
324                role_tx,
325                role_rx,
326                event_tx,
327                event_rx,
328                shutdown_signal: shutdown_signal.clone(),
329            },
330            node_config_arc.clone(),
331        );
332
333        // Register commit event listener
334        raft_core.register_new_commit_listener(new_commit_event_tx);
335
336        // Start CommitHandler in a single thread
337        let deps = CommitHandlerDependencies {
338            state_machine_handler,
339            raft_log: raft_core.ctx.storage.raft_log.clone(),
340            membership: membership.clone(),
341            event_tx: event_tx_clone,
342            shutdown_signal,
343        };
344
345        let commit_handler = DefaultCommitHandler::<RaftTypeConfig<SE, SM>>::new(
346            node_id,
347            my_role_i32,
348            my_current_term,
349            deps,
350            node_config_arc.clone(),
351            new_commit_event_rx,
352        );
353        self.enable_state_machine_commit_listener(commit_handler);
354
355        let event_tx = raft_core.event_tx.clone();
356        let node = Node::<RaftTypeConfig<SE, SM>> {
357            node_id,
358            raft_core: Arc::new(Mutex::new(raft_core)),
359            membership,
360            event_tx: event_tx.clone(),
361            ready: AtomicBool::new(false),
362            node_config: node_config_arc,
363        };
364
365        self.node = Some(Arc::new(node));
366        self
367    }
368
369    /// When a new commit is detected, convert the log into a state machine log.
370    fn enable_state_machine_commit_listener(
371        &self,
372        mut commit_handler: DefaultCommitHandler<RaftTypeConfig<SE, SM>>,
373    ) {
374        tokio::spawn(async move {
375            match commit_handler.run().await {
376                Ok(_) => {
377                    info!("commit_handler exit program");
378                }
379                Err(e) => {
380                    error!("commit_handler exit program with unpexected error: {:?}", e);
381                    println!("commit_handler exit program");
382                }
383            }
384        });
385    }
386
387    /// Sets a custom state machine handler implementation.
388    ///
389    /// This allows developers to provide their own implementation of the state machine handler
390    /// which processes committed log entries and applies them to the state machine.
391    ///
392    /// # Arguments
393    /// * `handler` - custom state machine handler that must implement the `StateMachineHandler`
394    ///   trait
395    ///
396    /// # Notes
397    /// - The handler must be thread-safe as it will be shared across multiple threads
398    /// - If not set, a default implementation will be used during `build()`
399    /// - The handler should properly handle snapshot creation and restoration
400    pub fn with_custom_state_machine_handler(
401        mut self,
402        handler: Arc<SMHOF<RaftTypeConfig<SE, SM>>>,
403    ) -> Self {
404        self.state_machine_handler = Some(handler);
405        self
406    }
407
408    pub fn set_snapshot_policy(
409        mut self,
410        snapshot_policy: SNP<RaftTypeConfig<SE, SM>>,
411    ) -> Self {
412        self.snapshot_policy = Some(snapshot_policy);
413        self
414    }
415
416    /// Starts the gRPC server for cluster communication.
417    ///
418    /// # Panics
419    /// Panics if node hasn't been built or address binding fails
420    pub async fn start_rpc_server(self) -> Self {
421        debug!("1. --- start RPC server --- ");
422        if let Some(ref node) = self.node {
423            let node_clone = node.clone();
424            let shutdown = self.shutdown_signal.clone();
425            let listen_address = self.node_config.cluster.listen_address;
426            let node_config = self.node_config.clone();
427            tokio::spawn(async move {
428                if let Err(e) =
429                    grpc::start_rpc_server(node_clone, listen_address, node_config, shutdown).await
430                {
431                    eprintln!("RPC server stops. {e:?}");
432                    error!("RPC server stops. {:?}", e);
433                }
434            });
435            self
436        } else {
437            panic!("failed to start RPC server");
438        }
439    }
440
441    /// Returns the built node instance after successful construction.
442    ///
443    /// # Errors
444    /// Returns Error::NodeFailedToStartError if build hasn't completed
445    pub fn ready(self) -> Result<Arc<Node<RaftTypeConfig<SE, SM>>>> {
446        self.node.ok_or_else(|| {
447            SystemError::NodeStartFailed("check node ready failed".to_string()).into()
448        })
449    }
450
451    /// Test constructor with custom database path
452    ///
453    /// # Safety
454    /// Bypasses normal configuration validation - use for testing only
455    #[cfg(test)]
456    pub fn new_from_db_path(
457        db_path: &str,
458        shutdown_signal: watch::Receiver<()>,
459    ) -> Self {
460        use std::path::PathBuf;
461
462        let mut node_config = RaftNodeConfig::new().expect("Load node_config successfully!");
463        node_config.cluster.db_root_dir = PathBuf::from(db_path);
464
465        Self::init(node_config, shutdown_signal)
466    }
467}