1use std::sync::atomic::AtomicBool;
36use std::sync::Arc;
37
38use log::debug;
39use log::error;
40use log::info;
41use tokio::sync::mpsc;
42use tokio::sync::watch;
43use tokio::sync::Mutex;
44
45use super::RaftTypeConfig;
46use crate::alias::COF;
47use crate::alias::MOF;
48use crate::alias::ROF;
49use crate::alias::SMHOF;
50use crate::alias::SMOF;
51use crate::alias::SSOF;
52use crate::alias::TROF;
53use crate::grpc;
54use crate::grpc::grpc_transport::GrpcTransport;
55use crate::init_sled_raft_log_db;
56use crate::init_sled_state_machine_db;
57use crate::init_sled_state_storage_db;
58use crate::metrics;
59use crate::ClusterConfig;
60use crate::CommitHandler;
61use crate::DefaultCommitHandler;
62use crate::DefaultStateMachineHandler;
63use crate::ElectionHandler;
64use crate::Error;
65use crate::Node;
66use crate::Raft;
67use crate::RaftMembership;
68use crate::RaftNodeConfig;
69use crate::RaftStateMachine;
70use crate::ReplicationHandler;
71use crate::Result;
72use crate::SledRaftLog;
73use crate::SledStateStorage;
74use crate::StateMachine;
75
76pub struct NodeBuilder {
80 node_id: u32,
81 pub(super) node_config: RaftNodeConfig,
82 pub(super) raft_log: Option<ROF<RaftTypeConfig>>,
83 pub(super) membership: Option<MOF<RaftTypeConfig>>,
84 pub(super) state_machine: Option<Arc<SMOF<RaftTypeConfig>>>,
85 pub(super) state_storage: Option<SSOF<RaftTypeConfig>>,
86 pub(super) transport: Option<TROF<RaftTypeConfig>>,
87 pub(super) commit_handler: Option<COF<RaftTypeConfig>>,
88 pub(super) state_machine_handler: Option<Arc<SMHOF<RaftTypeConfig>>>,
89 pub(super) shutdown_signal: watch::Receiver<()>,
90
91 pub(super) node: Option<Arc<Node<RaftTypeConfig>>>,
92}
93
94impl NodeBuilder {
95 pub fn new(
105 cluster_path: Option<&str>,
106 shutdown_signal: watch::Receiver<()>,
107 ) -> Self {
108 let mut node_config = RaftNodeConfig::new().expect("Load node_config successfully");
109 if let Some(p) = cluster_path {
110 info!("with_override_config from: {}", &p);
111 node_config = node_config
112 .with_override_config(p)
113 .expect("Overwrite node_config successfully.");
114 }
115 Self::init(node_config, shutdown_signal)
116 }
117
118 pub fn from_config(
129 cluster_config: ClusterConfig,
130 shutdown_signal: watch::Receiver<()>,
131 ) -> Self {
132 let mut node_config = RaftNodeConfig::new().expect("Load node_config successfully!");
133 node_config.cluster = cluster_config;
134 Self::init(node_config, shutdown_signal)
135 }
136
137 pub fn init(
139 node_config: RaftNodeConfig,
140 shutdown_signal: watch::Receiver<()>,
141 ) -> Self {
142 Self {
143 node_id: node_config.cluster.node_id,
144 raft_log: None,
145 state_machine: None,
146 state_storage: None,
147 transport: None,
148 membership: None,
149 node_config,
150 shutdown_signal,
151 commit_handler: None,
152 state_machine_handler: None,
153 node: None,
154 }
155 }
156
157 pub fn raft_log(
159 mut self,
160 raft_log: ROF<RaftTypeConfig>,
161 ) -> Self {
162 self.raft_log = Some(raft_log);
163 self
164 }
165
166 pub fn state_machine(
168 mut self,
169 state_machine: Arc<SMOF<RaftTypeConfig>>,
170 ) -> Self {
171 self.state_machine = Some(state_machine);
172 self
173 }
174
175 pub fn state_storage(
177 mut self,
178 state_storage: SSOF<RaftTypeConfig>,
179 ) -> Self {
180 self.state_storage = Some(state_storage);
181 self
182 }
183
184 pub fn transport(
186 mut self,
187 transport: TROF<RaftTypeConfig>,
188 ) -> Self {
189 self.transport = Some(transport);
190 self
191 }
192
193 pub fn commit_handler(
195 mut self,
196 commit_handler: COF<RaftTypeConfig>,
197 ) -> Self {
198 self.commit_handler = Some(commit_handler);
199 self
200 }
201
202 pub fn membership(
204 mut self,
205 membership: MOF<RaftTypeConfig>,
206 ) -> Self {
207 self.membership = Some(membership);
208 self
209 }
210
211 pub fn node_config(
213 mut self,
214 node_config: RaftNodeConfig,
215 ) -> Self {
216 self.node_config = node_config;
217 self
218 }
219
220 pub fn build(mut self) -> Self {
231 let node_id = self.node_id;
232 let node_config = self.node_config.clone();
233 let db_root_dir = format!("{}/{}", node_config.cluster.db_root_dir.display(), node_id);
234
235 let (new_commit_event_tx, new_commit_event_rx) = mpsc::unbounded_channel::<u64>();
237
238 let state_machine = self.state_machine.take().unwrap_or_else(|| {
239 let state_machine_db =
240 init_sled_state_machine_db(&db_root_dir).expect("init_sled_state_machine_db successfully.");
241 Arc::new(RaftStateMachine::new(node_id, Arc::new(state_machine_db)))
242 });
243
244 let last_applied_index = state_machine.last_entry_index();
246
247 let raft_log = self.raft_log.take().unwrap_or_else(|| {
248 let raft_log_db = init_sled_raft_log_db(&db_root_dir).expect("init_sled_raft_log_db successfully.");
249 SledRaftLog::new(Arc::new(raft_log_db), last_applied_index)
250 });
251
252 let state_storage = self.state_storage.take().unwrap_or_else(|| {
253 let state_storage_db =
254 init_sled_state_storage_db(&db_root_dir).expect("init_sled_state_storage_db successfully.");
255 SledStateStorage::new(Arc::new(state_storage_db))
256 });
257
258 let transport = self.transport.take().unwrap_or(GrpcTransport { my_id: node_id });
259
260 let state_machine_handler = self.state_machine_handler.take().unwrap_or_else(|| {
261 Arc::new(DefaultStateMachineHandler::new(
262 last_applied_index,
263 node_config.raft.commit_handler.max_entries_per_chunk,
264 state_machine.clone(),
265 ))
266 });
267 let membership = self
268 .membership
269 .take()
270 .unwrap_or_else(|| RaftMembership::new(node_id, node_config.cluster.initial_cluster.clone()));
271
272 let (role_tx, role_rx) = mpsc::unbounded_channel();
273 let (event_tx, event_rx) = mpsc::channel(10240);
274
275 let settings_arc = Arc::new(node_config);
276 let shutdown_signal = self.shutdown_signal.clone();
277 let mut raft_core = Raft::<RaftTypeConfig>::new(
278 node_id,
279 raft_log,
280 state_machine.clone(),
281 state_storage,
282 transport,
283 ElectionHandler::new(node_id, event_tx.clone()),
284 ReplicationHandler::new(node_id),
285 state_machine_handler.clone(),
286 Arc::new(membership),
287 settings_arc.clone(),
288 role_tx,
289 role_rx,
290 event_tx,
291 event_rx,
292 shutdown_signal.clone(),
293 );
294
295 raft_core.register_new_commit_listener(new_commit_event_tx);
297
298 let mut commit_handler = DefaultCommitHandler::<RaftTypeConfig>::new(
300 state_machine_handler,
301 raft_core.ctx.raft_log.clone(),
302 new_commit_event_rx,
303 settings_arc.raft.commit_handler.batch_size,
304 settings_arc.raft.commit_handler.process_interval_ms,
305 shutdown_signal,
306 );
307 tokio::spawn(async move {
308 match commit_handler.run().await {
309 Ok(_) => {
310 info!("commit_handler exit program");
311 }
312 Err(Error::Exit) => {
313 info!("commit_handler exit program");
314 println!("commit_handler exit program");
315 }
316 Err(e) => {
317 error!("commit_handler exit program with error: {:?}", e);
318 println!("commit_handler exit program");
319 }
320 }
321 });
322
323 let event_tx = raft_core.event_tx.clone();
324 let node = Node::<RaftTypeConfig> {
325 node_id,
326 raft_core: Arc::new(Mutex::new(raft_core)),
327 event_tx: event_tx.clone(),
328 ready: AtomicBool::new(false),
329 settings: settings_arc,
330 };
331
332 self.node = Some(Arc::new(node));
333 self
334 }
335
336 pub fn start_metrics_server(
340 self,
341 shutdown_signal: watch::Receiver<()>,
342 ) -> Self {
343 println!("start metric server!");
344 let port = self.node_config.monitoring.prometheus_port;
345 tokio::spawn(async move {
346 metrics::start_server(port, shutdown_signal).await;
347 });
348 self
349 }
350
351 pub async fn start_rpc_server(self) -> Self {
356 debug!("1. --- start RPC server --- ");
357 if let Some(ref node) = self.node {
358 let node_clone = node.clone();
359 let shutdown = self.shutdown_signal.clone();
360 let listen_address = self.node_config.cluster.listen_address;
361 let node_config = self.node_config.clone();
362 tokio::spawn(async move {
363 if let Err(e) = grpc::start_rpc_server(node_clone, listen_address, node_config, shutdown).await {
364 eprintln!("RPC server stops. {:?}", e);
365 error!("RPC server stops. {:?}", e);
366 }
367 });
368 self
369 } else {
370 panic!("failed to start RPC server");
371 }
372 }
373
374 pub fn ready(self) -> Result<Arc<Node<RaftTypeConfig>>> {
379 self.node.ok_or_else(|| Error::NodeFailedToStartError)
380 }
381
382 #[cfg(test)]
387 pub fn new_from_db_path(
388 db_path: &str,
389 shutdown_signal: watch::Receiver<()>,
390 ) -> Self {
391 use std::path::PathBuf;
392
393 let mut node_config = RaftNodeConfig::new().expect("Load node_config successfully!");
394 node_config.cluster.db_root_dir = PathBuf::from(db_path);
395
396 Self::init(node_config, shutdown_signal)
397 }
398}