1use std::fmt::Debug;
34use std::sync::atomic::AtomicBool;
35use std::sync::Arc;
36
37use tokio::sync::mpsc;
38use tokio::sync::watch;
39use tokio::sync::Mutex;
40use tracing::debug;
41use tracing::error;
42use tracing::info;
43
44use super::RaftTypeConfig;
45use crate::alias::MOF;
46use crate::alias::SMHOF;
47use crate::alias::SNP;
48use crate::alias::TROF;
49use crate::follower_state::FollowerState;
50use crate::grpc;
51use crate::grpc::grpc_transport::GrpcTransport;
52use crate::learner_state::LearnerState;
53use crate::BufferedRaftLog;
54use crate::ClusterConfig;
55use crate::CommitHandler;
56use crate::CommitHandlerDependencies;
57use crate::DefaultCommitHandler;
58use crate::DefaultPurgeExecutor;
59use crate::DefaultStateMachineHandler;
60use crate::ElectionHandler;
61use crate::LogSizePolicy;
62use crate::NewCommitData;
63use crate::Node;
64use crate::Raft;
65use crate::RaftConfig;
66use crate::RaftCoreHandlers;
67use crate::RaftLog;
68use crate::RaftMembership;
69use crate::RaftNodeConfig;
70use crate::RaftRole;
71use crate::RaftStorageHandles;
72use crate::ReplicationHandler;
73use crate::Result;
74use crate::SignalParams;
75use crate::StateMachine;
76use crate::StorageEngine;
77use crate::SystemError;
78
79pub enum NodeMode {
80 Joiner,
81 FullMember,
82}
83
84pub struct NodeBuilder<SE, SM>
88where
89 SE: StorageEngine + Debug,
90 SM: StateMachine + Debug,
91{
92 node_id: u32,
93
94 pub(super) node_config: RaftNodeConfig,
95 pub(super) storage_engine: Option<Arc<SE>>,
96 pub(super) membership: Option<MOF<RaftTypeConfig<SE, SM>>>,
97 pub(super) state_machine: Option<Arc<SM>>,
98 pub(super) transport: Option<TROF<RaftTypeConfig<SE, SM>>>,
99 pub(super) state_machine_handler: Option<Arc<SMHOF<RaftTypeConfig<SE, SM>>>>,
100 pub(super) snapshot_policy: Option<SNP<RaftTypeConfig<SE, SM>>>,
101 pub(super) shutdown_signal: watch::Receiver<()>,
102
103 pub(super) node: Option<Arc<Node<RaftTypeConfig<SE, SM>>>>,
104}
105
106impl<SE, SM> NodeBuilder<SE, SM>
107where
108 SE: StorageEngine + Debug,
109 SM: StateMachine + Debug,
110{
111 pub fn new(
121 cluster_path: Option<&str>,
122 shutdown_signal: watch::Receiver<()>,
123 ) -> Self {
124 let mut node_config = RaftNodeConfig::new().expect("Load node_config successfully");
125 if let Some(p) = cluster_path {
126 info!("with_override_config from: {}", &p);
127 node_config = node_config
128 .with_override_config(p)
129 .expect("Overwrite node_config successfully.");
130 }
131
132 Self::init(node_config, shutdown_signal)
133 }
134
135 pub fn from_cluster_config(
146 cluster_config: ClusterConfig,
147 shutdown_signal: watch::Receiver<()>,
148 ) -> Self {
149 let mut node_config = RaftNodeConfig::new().expect("Load node_config successfully!");
150 node_config.cluster = cluster_config;
151 Self::init(node_config, shutdown_signal)
152 }
153
154 pub fn init(
156 node_config: RaftNodeConfig,
157 shutdown_signal: watch::Receiver<()>,
158 ) -> Self {
159 Self {
160 node_id: node_config.cluster.node_id,
161 storage_engine: None,
162 state_machine: None,
163 transport: None,
164 membership: None,
165 node_config,
166 shutdown_signal,
167 state_machine_handler: None,
168 snapshot_policy: None,
169 node: None,
170 }
171 }
172
173 pub fn storage_engine(
175 mut self,
176 storage_engine: Arc<SE>,
177 ) -> Self {
178 self.storage_engine = Some(storage_engine);
179 self
180 }
181
182 pub fn state_machine(
184 mut self,
185 state_machine: Arc<SM>,
186 ) -> Self {
187 self.state_machine = Some(state_machine);
188 self
189 }
190
191 pub fn node_config(
193 mut self,
194 node_config: RaftNodeConfig,
195 ) -> Self {
196 self.node_config = node_config;
197 self
198 }
199
200 pub fn raft_config(
202 mut self,
203 config: RaftConfig,
204 ) -> Self {
205 self.node_config.raft = config;
206 self
207 }
208
209 pub fn build(mut self) -> Self {
220 let node_id = self.node_id;
221 let node_config = self.node_config.clone();
222 let (new_commit_event_tx, new_commit_event_rx) = mpsc::unbounded_channel::<NewCommitData>();
226
227 let state_machine = self.state_machine.take().expect("State machine must be set");
229
230 let storage_engine = self.storage_engine.take().expect("Storage engine must be set");
232
233 let last_applied_index = state_machine.last_applied().index;
235 let raft_log = {
236 let (log, receiver) = BufferedRaftLog::new(
237 node_id,
238 node_config.raft.persistence.clone(),
239 storage_engine.clone(),
240 );
241
242 log.start(receiver)
244 };
245
246 let transport = self.transport.take().unwrap_or(GrpcTransport::new(node_id));
247
248 let snapshot_policy = self.snapshot_policy.take().unwrap_or(LogSizePolicy::new(
249 node_config.raft.snapshot.max_log_entries_before_snapshot,
250 node_config.raft.snapshot.snapshot_cool_down_since_last_check,
251 ));
252
253 let state_machine_handler = self.state_machine_handler.take().unwrap_or_else(|| {
254 Arc::new(DefaultStateMachineHandler::new(
255 node_id,
256 last_applied_index,
257 node_config.raft.commit_handler.max_entries_per_chunk,
258 state_machine.clone(),
259 node_config.raft.snapshot.clone(),
260 snapshot_policy,
261 ))
262 });
263 let membership = Arc::new(self.membership.take().unwrap_or_else(|| {
264 RaftMembership::new(
265 node_id,
266 node_config.cluster.initial_cluster.clone(),
267 node_config.clone(),
268 )
269 }));
270
271 let purge_executor = DefaultPurgeExecutor::new(raft_log.clone());
272
273 let (role_tx, role_rx) = mpsc::unbounded_channel();
274 let (event_tx, event_rx) = mpsc::channel(10240);
275 let event_tx_clone = event_tx.clone(); let shutdown_signal = self.shutdown_signal.clone();
278 let node_config_arc = Arc::new(node_config);
279
280 let last_applied_index = Some(state_machine.last_applied().index);
287 let my_role = if node_config_arc.is_joining() {
288 RaftRole::Learner(Box::new(LearnerState::new(
289 node_id,
290 node_config_arc.clone(),
291 )))
292 } else {
293 RaftRole::Follower(Box::new(FollowerState::new(
294 node_id,
295 node_config_arc.clone(),
296 raft_log.load_hard_state().expect("Failed to load hard state"),
297 last_applied_index,
298 )))
299 };
300 let my_role_i32 = my_role.as_i32();
301 let my_current_term = my_role.current_term();
302 info!(
303 "Start node with role: {} and term: {}",
304 my_role_i32, my_current_term
305 );
306
307 let mut raft_core = Raft::<RaftTypeConfig<SE, SM>>::new(
309 node_id,
310 my_role,
311 RaftStorageHandles::<RaftTypeConfig<SE, SM>> {
312 raft_log,
313 state_machine: state_machine.clone(),
314 },
315 transport,
316 RaftCoreHandlers::<RaftTypeConfig<SE, SM>> {
317 election_handler: ElectionHandler::new(node_id),
318 replication_handler: ReplicationHandler::new(node_id),
319 state_machine_handler: state_machine_handler.clone(),
320 purge_executor: Arc::new(purge_executor),
321 },
322 membership.clone(),
323 SignalParams {
324 role_tx,
325 role_rx,
326 event_tx,
327 event_rx,
328 shutdown_signal: shutdown_signal.clone(),
329 },
330 node_config_arc.clone(),
331 );
332
333 raft_core.register_new_commit_listener(new_commit_event_tx);
335
336 let deps = CommitHandlerDependencies {
338 state_machine_handler,
339 raft_log: raft_core.ctx.storage.raft_log.clone(),
340 membership: membership.clone(),
341 event_tx: event_tx_clone,
342 shutdown_signal,
343 };
344
345 let commit_handler = DefaultCommitHandler::<RaftTypeConfig<SE, SM>>::new(
346 node_id,
347 my_role_i32,
348 my_current_term,
349 deps,
350 node_config_arc.clone(),
351 new_commit_event_rx,
352 );
353 self.enable_state_machine_commit_listener(commit_handler);
354
355 let event_tx = raft_core.event_tx.clone();
356 let node = Node::<RaftTypeConfig<SE, SM>> {
357 node_id,
358 raft_core: Arc::new(Mutex::new(raft_core)),
359 membership,
360 event_tx: event_tx.clone(),
361 ready: AtomicBool::new(false),
362 node_config: node_config_arc,
363 };
364
365 self.node = Some(Arc::new(node));
366 self
367 }
368
369 fn enable_state_machine_commit_listener(
371 &self,
372 mut commit_handler: DefaultCommitHandler<RaftTypeConfig<SE, SM>>,
373 ) {
374 tokio::spawn(async move {
375 match commit_handler.run().await {
376 Ok(_) => {
377 info!("commit_handler exit program");
378 }
379 Err(e) => {
380 error!("commit_handler exit program with unpexected error: {:?}", e);
381 println!("commit_handler exit program");
382 }
383 }
384 });
385 }
386
387 pub fn with_custom_state_machine_handler(
401 mut self,
402 handler: Arc<SMHOF<RaftTypeConfig<SE, SM>>>,
403 ) -> Self {
404 self.state_machine_handler = Some(handler);
405 self
406 }
407
408 pub fn set_snapshot_policy(
409 mut self,
410 snapshot_policy: SNP<RaftTypeConfig<SE, SM>>,
411 ) -> Self {
412 self.snapshot_policy = Some(snapshot_policy);
413 self
414 }
415
416 pub async fn start_rpc_server(self) -> Self {
421 debug!("1. --- start RPC server --- ");
422 if let Some(ref node) = self.node {
423 let node_clone = node.clone();
424 let shutdown = self.shutdown_signal.clone();
425 let listen_address = self.node_config.cluster.listen_address;
426 let node_config = self.node_config.clone();
427 tokio::spawn(async move {
428 if let Err(e) =
429 grpc::start_rpc_server(node_clone, listen_address, node_config, shutdown).await
430 {
431 eprintln!("RPC server stops. {e:?}");
432 error!("RPC server stops. {:?}", e);
433 }
434 });
435 self
436 } else {
437 panic!("failed to start RPC server");
438 }
439 }
440
441 pub fn ready(self) -> Result<Arc<Node<RaftTypeConfig<SE, SM>>>> {
446 self.node.ok_or_else(|| {
447 SystemError::NodeStartFailed("check node ready failed".to_string()).into()
448 })
449 }
450
451 #[cfg(test)]
456 pub fn new_from_db_path(
457 db_path: &str,
458 shutdown_signal: watch::Receiver<()>,
459 ) -> Self {
460 use std::path::PathBuf;
461
462 let mut node_config = RaftNodeConfig::new().expect("Load node_config successfully!");
463 node_config.cluster.db_root_dir = PathBuf::from(db_path);
464
465 Self::init(node_config, shutdown_signal)
466 }
467}