d_engine/node/
builder.rs

1//! A builder pattern implementation for constructing a [`Node`] instance in a
2//! Raft cluster.
3//!
4//! The [`NodeBuilder`] provides a fluent interface to configure and assemble
5//! components required by the Raft node, including storage layers (log, state
6//! machine, membership), transport, and asynchronous handlers.
7//!
8//! ## Key Design Points
9//! - **Default Components**: Initializes with production-ready defaults (Sled-based storage, gRPC
10//!   transport).
11//! - **Customization**: Allows overriding defaults via setter methods (e.g., `raft_log()`,
12//!   `transport()`).
13//! - **Lifecycle Management**:
14//!   - `build()`: Assembles the [`Node`] and spawns background tasks (e.g., [`CommitHandler`]).
15//!   - `start_metrics_server()`/`start_rpc_server()`: Launches auxiliary services.
16//!   - `ready()`: Finalizes construction and returns the initialized [`Node`].
17//!
18//! ## Example
19//! ```ignore
20//! 
21//! let (shutdown_tx, shutdown_rx) = watch::channel(());
22//! let node = NodeBuilder::new(node_config, shutdown_rx)
23//!     .raft_log(custom_raft_log)  // Optional override
24//!     .build()
25//!     .start_metrics_server(shutdown_tx.subscribe())
26//!     .start_rpc_server().await
27//!     .ready()
28//!     .unwrap();
29//! ```
30//!
31//! ## Notes
32//! - **Thread Safety**: All components wrapped in `Arc`/`Mutex` for shared ownership.
33//! - **Resource Cleanup**: Uses `watch::Receiver` for cooperative shutdown signaling.
34
35use std::sync::atomic::AtomicBool;
36use std::sync::Arc;
37
38use log::debug;
39use log::error;
40use log::info;
41use tokio::sync::mpsc;
42use tokio::sync::watch;
43use tokio::sync::Mutex;
44
45use super::RaftTypeConfig;
46use crate::alias::COF;
47use crate::alias::MOF;
48use crate::alias::ROF;
49use crate::alias::SMHOF;
50use crate::alias::SMOF;
51use crate::alias::SSOF;
52use crate::alias::TROF;
53use crate::grpc;
54use crate::grpc::grpc_transport::GrpcTransport;
55use crate::init_sled_raft_log_db;
56use crate::init_sled_state_machine_db;
57use crate::init_sled_state_storage_db;
58use crate::metrics;
59use crate::ClusterConfig;
60use crate::CommitHandler;
61use crate::DefaultCommitHandler;
62use crate::DefaultStateMachineHandler;
63use crate::ElectionHandler;
64use crate::Error;
65use crate::Node;
66use crate::Raft;
67use crate::RaftMembership;
68use crate::RaftNodeConfig;
69use crate::RaftStateMachine;
70use crate::ReplicationHandler;
71use crate::Result;
72use crate::SledRaftLog;
73use crate::SledStateStorage;
74use crate::StateMachine;
75
76/// Builder pattern implementation for constructing a Raft node with configurable components.
77/// Provides a fluent interface to set up node configuration, storage, transport, and other
78/// dependencies.
79pub struct NodeBuilder {
80    node_id: u32,
81    pub(super) node_config: RaftNodeConfig,
82    pub(super) raft_log: Option<ROF<RaftTypeConfig>>,
83    pub(super) membership: Option<MOF<RaftTypeConfig>>,
84    pub(super) state_machine: Option<Arc<SMOF<RaftTypeConfig>>>,
85    pub(super) state_storage: Option<SSOF<RaftTypeConfig>>,
86    pub(super) transport: Option<TROF<RaftTypeConfig>>,
87    pub(super) commit_handler: Option<COF<RaftTypeConfig>>,
88    pub(super) state_machine_handler: Option<Arc<SMHOF<RaftTypeConfig>>>,
89    pub(super) shutdown_signal: watch::Receiver<()>,
90
91    pub(super) node: Option<Arc<Node<RaftTypeConfig>>>,
92}
93
94impl NodeBuilder {
95    /// Creates a new NodeBuilder with cluster configuration loaded from file
96    ///
97    /// # Arguments
98    /// * `cluster_path` - Optional path to node-specific cluster configuration
99    /// * `shutdown_signal` - Watch channel for graceful shutdown signaling
100    ///
101    /// # Panics
102    /// Will panic if configuration loading fails (consider returning Result
103    /// instead)
104    pub fn new(
105        cluster_path: Option<&str>,
106        shutdown_signal: watch::Receiver<()>,
107    ) -> Self {
108        let mut node_config = RaftNodeConfig::new().expect("Load node_config successfully");
109        if let Some(p) = cluster_path {
110            info!("with_override_config from: {}", &p);
111            node_config = node_config
112                .with_override_config(p)
113                .expect("Overwrite node_config successfully.");
114        }
115        Self::init(node_config, shutdown_signal)
116    }
117
118    /// Constructs NodeBuilder from in-memory cluster configuration
119    ///
120    /// # Arguments
121    /// * `cluster_config` - Pre-built cluster configuration
122    /// * `shutdown_signal` - Graceful shutdown notification channel
123    ///
124    /// # Usage
125    /// ```ignore
126    /// let builder = NodeBuilder::from_config(my_config, shutdown_rx);
127    /// ```
128    pub fn from_config(
129        cluster_config: ClusterConfig,
130        shutdown_signal: watch::Receiver<()>,
131    ) -> Self {
132        let mut node_config = RaftNodeConfig::new().expect("Load node_config successfully!");
133        node_config.cluster = cluster_config;
134        Self::init(node_config, shutdown_signal)
135    }
136
137    /// Core initialization logic shared by all construction paths
138    pub fn init(
139        node_config: RaftNodeConfig,
140        shutdown_signal: watch::Receiver<()>,
141    ) -> Self {
142        Self {
143            node_id: node_config.cluster.node_id,
144            raft_log: None,
145            state_machine: None,
146            state_storage: None,
147            transport: None,
148            membership: None,
149            node_config,
150            shutdown_signal,
151            commit_handler: None,
152            state_machine_handler: None,
153            node: None,
154        }
155    }
156
157    /// Sets a custom Raft log storage implementation
158    pub fn raft_log(
159        mut self,
160        raft_log: ROF<RaftTypeConfig>,
161    ) -> Self {
162        self.raft_log = Some(raft_log);
163        self
164    }
165
166    /// Sets a custom state machine implementation
167    pub fn state_machine(
168        mut self,
169        state_machine: Arc<SMOF<RaftTypeConfig>>,
170    ) -> Self {
171        self.state_machine = Some(state_machine);
172        self
173    }
174
175    /// Sets a custom state storage implementation
176    pub fn state_storage(
177        mut self,
178        state_storage: SSOF<RaftTypeConfig>,
179    ) -> Self {
180        self.state_storage = Some(state_storage);
181        self
182    }
183
184    /// Sets a custom network transport implementation
185    pub fn transport(
186        mut self,
187        transport: TROF<RaftTypeConfig>,
188    ) -> Self {
189        self.transport = Some(transport);
190        self
191    }
192
193    /// Sets a custom commit handler implementation
194    pub fn commit_handler(
195        mut self,
196        commit_handler: COF<RaftTypeConfig>,
197    ) -> Self {
198        self.commit_handler = Some(commit_handler);
199        self
200    }
201
202    /// Sets a custom membership management implementation
203    pub fn membership(
204        mut self,
205        membership: MOF<RaftTypeConfig>,
206    ) -> Self {
207        self.membership = Some(membership);
208        self
209    }
210
211    /// Replaces the entire node configuration
212    pub fn node_config(
213        mut self,
214        node_config: RaftNodeConfig,
215    ) -> Self {
216        self.node_config = node_config;
217        self
218    }
219
220    /// Finalizes the builder and constructs the Raft node instance.
221    ///
222    /// Initializes default implementations for any unconfigured components:
223    /// - Creates sled-based databases for state machine and logs
224    /// - Sets up default gRPC transport
225    /// - Initializes commit handling subsystem
226    /// - Configures membership management
227    ///
228    /// # Panics
229    /// Panics if essential components cannot be initialized
230    pub fn build(mut self) -> Self {
231        let node_id = self.node_id;
232        let node_config = self.node_config.clone();
233        let db_root_dir = format!("{}/{}", node_config.cluster.db_root_dir.display(), node_id);
234
235        // Init CommitHandler
236        let (new_commit_event_tx, new_commit_event_rx) = mpsc::unbounded_channel::<u64>();
237
238        let state_machine = self.state_machine.take().unwrap_or_else(|| {
239            let state_machine_db =
240                init_sled_state_machine_db(&db_root_dir).expect("init_sled_state_machine_db successfully.");
241            Arc::new(RaftStateMachine::new(node_id, Arc::new(state_machine_db)))
242        });
243
244        //Retrieve last applied index from state machine
245        let last_applied_index = state_machine.last_entry_index();
246
247        let raft_log = self.raft_log.take().unwrap_or_else(|| {
248            let raft_log_db = init_sled_raft_log_db(&db_root_dir).expect("init_sled_raft_log_db successfully.");
249            SledRaftLog::new(Arc::new(raft_log_db), last_applied_index)
250        });
251
252        let state_storage = self.state_storage.take().unwrap_or_else(|| {
253            let state_storage_db =
254                init_sled_state_storage_db(&db_root_dir).expect("init_sled_state_storage_db successfully.");
255            SledStateStorage::new(Arc::new(state_storage_db))
256        });
257
258        let transport = self.transport.take().unwrap_or(GrpcTransport { my_id: node_id });
259
260        let state_machine_handler = self.state_machine_handler.take().unwrap_or_else(|| {
261            Arc::new(DefaultStateMachineHandler::new(
262                last_applied_index,
263                node_config.raft.commit_handler.max_entries_per_chunk,
264                state_machine.clone(),
265            ))
266        });
267        let membership = self
268            .membership
269            .take()
270            .unwrap_or_else(|| RaftMembership::new(node_id, node_config.cluster.initial_cluster.clone()));
271
272        let (role_tx, role_rx) = mpsc::unbounded_channel();
273        let (event_tx, event_rx) = mpsc::channel(10240);
274
275        let settings_arc = Arc::new(node_config);
276        let shutdown_signal = self.shutdown_signal.clone();
277        let mut raft_core = Raft::<RaftTypeConfig>::new(
278            node_id,
279            raft_log,
280            state_machine.clone(),
281            state_storage,
282            transport,
283            ElectionHandler::new(node_id, event_tx.clone()),
284            ReplicationHandler::new(node_id),
285            state_machine_handler.clone(),
286            Arc::new(membership),
287            settings_arc.clone(),
288            role_tx,
289            role_rx,
290            event_tx,
291            event_rx,
292            shutdown_signal.clone(),
293        );
294
295        // Register commit event listener
296        raft_core.register_new_commit_listener(new_commit_event_tx);
297
298        // Start CommitHandler in a single thread
299        let mut commit_handler = DefaultCommitHandler::<RaftTypeConfig>::new(
300            state_machine_handler,
301            raft_core.ctx.raft_log.clone(),
302            new_commit_event_rx,
303            settings_arc.raft.commit_handler.batch_size,
304            settings_arc.raft.commit_handler.process_interval_ms,
305            shutdown_signal,
306        );
307        tokio::spawn(async move {
308            match commit_handler.run().await {
309                Ok(_) => {
310                    info!("commit_handler exit program");
311                }
312                Err(Error::Exit) => {
313                    info!("commit_handler exit program");
314                    println!("commit_handler exit program");
315                }
316                Err(e) => {
317                    error!("commit_handler exit program with error: {:?}", e);
318                    println!("commit_handler exit program");
319                }
320            }
321        });
322
323        let event_tx = raft_core.event_tx.clone();
324        let node = Node::<RaftTypeConfig> {
325            node_id,
326            raft_core: Arc::new(Mutex::new(raft_core)),
327            event_tx: event_tx.clone(),
328            ready: AtomicBool::new(false),
329            settings: settings_arc,
330        };
331
332        self.node = Some(Arc::new(node));
333        self
334    }
335
336    /// Starts the metrics server for monitoring node operations.
337    ///
338    /// Launches a Prometheus endpoint on the configured port.
339    pub fn start_metrics_server(
340        self,
341        shutdown_signal: watch::Receiver<()>,
342    ) -> Self {
343        println!("start metric server!");
344        let port = self.node_config.monitoring.prometheus_port;
345        tokio::spawn(async move {
346            metrics::start_server(port, shutdown_signal).await;
347        });
348        self
349    }
350
351    /// Starts the gRPC server for cluster communication.
352    ///
353    /// # Panics
354    /// Panics if node hasn't been built or address binding fails
355    pub async fn start_rpc_server(self) -> Self {
356        debug!("1. --- start RPC server --- ");
357        if let Some(ref node) = self.node {
358            let node_clone = node.clone();
359            let shutdown = self.shutdown_signal.clone();
360            let listen_address = self.node_config.cluster.listen_address;
361            let node_config = self.node_config.clone();
362            tokio::spawn(async move {
363                if let Err(e) = grpc::start_rpc_server(node_clone, listen_address, node_config, shutdown).await {
364                    eprintln!("RPC server stops. {:?}", e);
365                    error!("RPC server stops. {:?}", e);
366                }
367            });
368            self
369        } else {
370            panic!("failed to start RPC server");
371        }
372    }
373
374    /// Returns the built node instance after successful construction.
375    ///
376    /// # Errors
377    /// Returns Error::NodeFailedToStartError if build hasn't completed
378    pub fn ready(self) -> Result<Arc<Node<RaftTypeConfig>>> {
379        self.node.ok_or_else(|| Error::NodeFailedToStartError)
380    }
381
382    /// Test constructor with custom database path
383    ///
384    /// # Safety
385    /// Bypasses normal configuration validation - use for testing only
386    #[cfg(test)]
387    pub fn new_from_db_path(
388        db_path: &str,
389        shutdown_signal: watch::Receiver<()>,
390    ) -> Self {
391        use std::path::PathBuf;
392
393        let mut node_config = RaftNodeConfig::new().expect("Load node_config successfully!");
394        node_config.cluster.db_root_dir = PathBuf::from(db_path);
395
396        Self::init(node_config, shutdown_signal)
397    }
398}