1use std::fmt::Debug;
39use std::sync::atomic::AtomicBool;
40use std::sync::Arc;
41
42use tokio::sync::mpsc;
43use tokio::sync::watch;
44use tokio::sync::Mutex;
45use tracing::debug;
46use tracing::error;
47use tracing::info;
48
49use super::RaftTypeConfig;
50use crate::alias::MOF;
51use crate::alias::SMHOF;
52use crate::alias::SNP;
53use crate::alias::TROF;
54use crate::follower_state::FollowerState;
55use crate::grpc;
56use crate::grpc::grpc_transport::GrpcTransport;
57use crate::learner_state::LearnerState;
58use crate::BufferedRaftLog;
59use crate::ClusterConfig;
60use crate::CommitHandler;
61use crate::CommitHandlerDependencies;
62use crate::DefaultCommitHandler;
63use crate::DefaultPurgeExecutor;
64use crate::DefaultStateMachineHandler;
65use crate::ElectionHandler;
66use crate::LogSizePolicy;
67use crate::NewCommitData;
68use crate::Node;
69use crate::Raft;
70use crate::RaftConfig;
71use crate::RaftCoreHandlers;
72use crate::RaftLog;
73use crate::RaftMembership;
74use crate::RaftNodeConfig;
75use crate::RaftRole;
76use crate::RaftStorageHandles;
77use crate::ReplicationHandler;
78use crate::Result;
79use crate::SignalParams;
80use crate::StateMachine;
81use crate::StorageEngine;
82use crate::SystemError;
83
84pub enum NodeMode {
85 Joiner,
86 FullMember,
87}
88
89pub struct NodeBuilder<SE, SM>
93where
94 SE: StorageEngine + Debug,
95 SM: StateMachine + Debug,
96{
97 node_id: u32,
98
99 pub(super) node_config: RaftNodeConfig,
100 pub(super) storage_engine: Option<Arc<SE>>,
101 pub(super) membership: Option<MOF<RaftTypeConfig<SE, SM>>>,
102 pub(super) state_machine: Option<Arc<SM>>,
103 pub(super) transport: Option<TROF<RaftTypeConfig<SE, SM>>>,
104 pub(super) state_machine_handler: Option<Arc<SMHOF<RaftTypeConfig<SE, SM>>>>,
105 pub(super) snapshot_policy: Option<SNP<RaftTypeConfig<SE, SM>>>,
106 pub(super) shutdown_signal: watch::Receiver<()>,
107
108 pub(super) node: Option<Arc<Node<RaftTypeConfig<SE, SM>>>>,
109}
110
111impl<SE, SM> NodeBuilder<SE, SM>
112where
113 SE: StorageEngine + Debug,
114 SM: StateMachine + Debug,
115{
116 pub fn new(
126 cluster_path: Option<&str>,
127 shutdown_signal: watch::Receiver<()>,
128 ) -> Self {
129 let mut node_config = RaftNodeConfig::new().expect("Load node_config successfully");
130 if let Some(p) = cluster_path {
131 info!("with_override_config from: {}", &p);
132 node_config = node_config
133 .with_override_config(p)
134 .expect("Overwrite node_config successfully.");
135 }
136
137 Self::init(node_config, shutdown_signal)
138 }
139
140 pub fn from_cluster_config(
151 cluster_config: ClusterConfig,
152 shutdown_signal: watch::Receiver<()>,
153 ) -> Self {
154 let mut node_config = RaftNodeConfig::new().expect("Load node_config successfully!");
155 node_config.cluster = cluster_config;
156 Self::init(node_config, shutdown_signal)
157 }
158
159 pub fn init(
161 node_config: RaftNodeConfig,
162 shutdown_signal: watch::Receiver<()>,
163 ) -> Self {
164 Self {
165 node_id: node_config.cluster.node_id,
166 storage_engine: None,
167 state_machine: None,
168 transport: None,
169 membership: None,
170 node_config,
171 shutdown_signal,
172 state_machine_handler: None,
173 snapshot_policy: None,
174 node: None,
175 }
176 }
177
178 pub fn storage_engine(
180 mut self,
181 storage_engine: Arc<SE>,
182 ) -> Self {
183 self.storage_engine = Some(storage_engine);
184 self
185 }
186
187 pub fn state_machine(
189 mut self,
190 state_machine: Arc<SM>,
191 ) -> Self {
192 self.state_machine = Some(state_machine);
193 self
194 }
195
196 pub fn node_config(
198 mut self,
199 node_config: RaftNodeConfig,
200 ) -> Self {
201 self.node_config = node_config;
202 self
203 }
204
205 pub fn raft_config(
207 mut self,
208 config: RaftConfig,
209 ) -> Self {
210 self.node_config.raft = config;
211 self
212 }
213
214 pub fn build(mut self) -> Self {
225 let node_id = self.node_id;
226 let node_config = self.node_config.clone();
227 let (new_commit_event_tx, new_commit_event_rx) = mpsc::unbounded_channel::<NewCommitData>();
231
232 let state_machine = self.state_machine.take().expect("State machine must be set");
234
235 let storage_engine = self.storage_engine.take().expect("Storage engine must be set");
237
238 let last_applied_index = state_machine.last_applied().index;
240 let raft_log = {
241 let (log, receiver) = BufferedRaftLog::new(
242 node_id,
243 node_config.raft.persistence.clone(),
244 storage_engine.clone(),
245 );
246
247 log.start(receiver)
249 };
250
251 let transport = self.transport.take().unwrap_or(GrpcTransport::new(node_id));
252
253 let snapshot_policy = self.snapshot_policy.take().unwrap_or(LogSizePolicy::new(
254 node_config.raft.snapshot.max_log_entries_before_snapshot,
255 node_config.raft.snapshot.snapshot_cool_down_since_last_check,
256 ));
257
258 let state_machine_handler = self.state_machine_handler.take().unwrap_or_else(|| {
259 Arc::new(DefaultStateMachineHandler::new(
260 node_id,
261 last_applied_index,
262 node_config.raft.commit_handler.max_entries_per_chunk,
263 state_machine.clone(),
264 node_config.raft.snapshot.clone(),
265 snapshot_policy,
266 ))
267 });
268 let membership = Arc::new(self.membership.take().unwrap_or_else(|| {
269 RaftMembership::new(
270 node_id,
271 node_config.cluster.initial_cluster.clone(),
272 node_config.clone(),
273 )
274 }));
275
276 let purge_executor = DefaultPurgeExecutor::new(raft_log.clone());
277
278 let (role_tx, role_rx) = mpsc::unbounded_channel();
279 let (event_tx, event_rx) = mpsc::channel(10240);
280 let event_tx_clone = event_tx.clone(); let shutdown_signal = self.shutdown_signal.clone();
283 let node_config_arc = Arc::new(node_config);
284
285 let last_applied_index = Some(state_machine.last_applied().index);
292 let my_role = if node_config_arc.is_joining() {
293 RaftRole::Learner(Box::new(LearnerState::new(
294 node_id,
295 node_config_arc.clone(),
296 )))
297 } else {
298 RaftRole::Follower(Box::new(FollowerState::new(
299 node_id,
300 node_config_arc.clone(),
301 raft_log.load_hard_state().expect("Failed to load hard state"),
302 last_applied_index,
303 )))
304 };
305 let my_role_i32 = my_role.as_i32();
306 let my_current_term = my_role.current_term();
307 info!(
308 "Start node with role: {} and term: {}",
309 my_role_i32, my_current_term
310 );
311
312 let mut raft_core = Raft::<RaftTypeConfig<SE, SM>>::new(
314 node_id,
315 my_role,
316 RaftStorageHandles::<RaftTypeConfig<SE, SM>> {
317 raft_log,
318 state_machine: state_machine.clone(),
319 },
320 transport,
321 RaftCoreHandlers::<RaftTypeConfig<SE, SM>> {
322 election_handler: ElectionHandler::new(node_id),
323 replication_handler: ReplicationHandler::new(node_id),
324 state_machine_handler: state_machine_handler.clone(),
325 purge_executor: Arc::new(purge_executor),
326 },
327 membership.clone(),
328 SignalParams {
329 role_tx,
330 role_rx,
331 event_tx,
332 event_rx,
333 shutdown_signal: shutdown_signal.clone(),
334 },
335 node_config_arc.clone(),
336 );
337
338 raft_core.register_new_commit_listener(new_commit_event_tx);
340
341 let deps = CommitHandlerDependencies {
343 state_machine_handler,
344 raft_log: raft_core.ctx.storage.raft_log.clone(),
345 membership: membership.clone(),
346 event_tx: event_tx_clone,
347 shutdown_signal,
348 };
349
350 let commit_handler = DefaultCommitHandler::<RaftTypeConfig<SE, SM>>::new(
351 node_id,
352 my_role_i32,
353 my_current_term,
354 deps,
355 node_config_arc.clone(),
356 new_commit_event_rx,
357 );
358 self.enable_state_machine_commit_listener(commit_handler);
359
360 let event_tx = raft_core.event_tx.clone();
361 let node = Node::<RaftTypeConfig<SE, SM>> {
362 node_id,
363 raft_core: Arc::new(Mutex::new(raft_core)),
364 membership,
365 event_tx: event_tx.clone(),
366 ready: AtomicBool::new(false),
367 node_config: node_config_arc,
368 };
369
370 self.node = Some(Arc::new(node));
371 self
372 }
373
374 fn enable_state_machine_commit_listener(
376 &self,
377 mut commit_handler: DefaultCommitHandler<RaftTypeConfig<SE, SM>>,
378 ) {
379 tokio::spawn(async move {
380 match commit_handler.run().await {
381 Ok(_) => {
382 info!("commit_handler exit program");
383 }
384 Err(e) => {
385 error!("commit_handler exit program with unpexected error: {:?}", e);
386 println!("commit_handler exit program");
387 }
388 }
389 });
390 }
391
392 pub fn with_custom_state_machine_handler(
406 mut self,
407 handler: Arc<SMHOF<RaftTypeConfig<SE, SM>>>,
408 ) -> Self {
409 self.state_machine_handler = Some(handler);
410 self
411 }
412
413 pub fn set_snapshot_policy(
414 mut self,
415 snapshot_policy: SNP<RaftTypeConfig<SE, SM>>,
416 ) -> Self {
417 self.snapshot_policy = Some(snapshot_policy);
418 self
419 }
420
421 pub async fn start_rpc_server(self) -> Self {
426 debug!("1. --- start RPC server --- ");
427 if let Some(ref node) = self.node {
428 let node_clone = node.clone();
429 let shutdown = self.shutdown_signal.clone();
430 let listen_address = self.node_config.cluster.listen_address;
431 let node_config = self.node_config.clone();
432 tokio::spawn(async move {
433 if let Err(e) =
434 grpc::start_rpc_server(node_clone, listen_address, node_config, shutdown).await
435 {
436 eprintln!("RPC server stops. {e:?}");
437 error!("RPC server stops. {:?}", e);
438 }
439 });
440 self
441 } else {
442 panic!("failed to start RPC server");
443 }
444 }
445
446 pub fn ready(self) -> Result<Arc<Node<RaftTypeConfig<SE, SM>>>> {
451 self.node.ok_or_else(|| {
452 SystemError::NodeStartFailed("check node ready failed".to_string()).into()
453 })
454 }
455
456 #[cfg(test)]
461 pub fn new_from_db_path(
462 db_path: &str,
463 shutdown_signal: watch::Receiver<()>,
464 ) -> Self {
465 use std::path::PathBuf;
466
467 let mut node_config = RaftNodeConfig::new().expect("Load node_config successfully!");
468 node_config.cluster.db_root_dir = PathBuf::from(db_path);
469
470 Self::init(node_config, shutdown_signal)
471 }
472}