1use std::fmt::Debug;
32use std::sync::Arc;
33use std::sync::atomic::AtomicBool;
34
35use d_engine_core::ClusterConfig;
36use d_engine_core::CommitHandler;
37use d_engine_core::CommitHandlerDependencies;
38use d_engine_core::DefaultCommitHandler;
39use d_engine_core::DefaultPurgeExecutor;
40use d_engine_core::DefaultStateMachineHandler;
41use d_engine_core::ElectionHandler;
42use d_engine_core::LogSizePolicy;
43use d_engine_core::NewCommitData;
44use d_engine_core::Raft;
45use d_engine_core::RaftConfig;
46use d_engine_core::RaftCoreHandlers;
47use d_engine_core::RaftLog;
48use d_engine_core::RaftNodeConfig;
49use d_engine_core::RaftRole;
50use d_engine_core::RaftStorageHandles;
51use d_engine_core::ReplicationHandler;
52use d_engine_core::Result;
53use d_engine_core::SignalParams;
54use d_engine_core::StateMachine;
55use d_engine_core::StateMachineWorker;
56use d_engine_core::StorageEngine;
57use d_engine_core::SystemError;
58use d_engine_core::alias::MOF;
59use d_engine_core::alias::SMHOF;
60use d_engine_core::alias::SNP;
61use d_engine_core::alias::TROF;
62use d_engine_core::follower_state::FollowerState;
63use d_engine_core::learner_state::LearnerState;
64#[cfg(feature = "watch")]
65use d_engine_core::watch::WatchDispatcher;
66#[cfg(feature = "watch")]
67use d_engine_core::watch::WatchRegistry;
68use tokio::sync::Mutex;
69use tokio::sync::mpsc;
70use tokio::sync::watch;
71use tracing::debug;
72use tracing::error;
73use tracing::info;
74
75use super::LeaderNotifier;
76use super::RaftTypeConfig;
77use crate::Node;
78use crate::membership::RaftMembership;
79use crate::network::grpc;
80use crate::network::grpc::grpc_transport::GrpcTransport;
81use crate::storage::BufferedRaftLog;
82
83pub struct NodeBuilder<SE, SM>
98where
99 SE: StorageEngine + Debug,
100 SM: StateMachine + Debug,
101{
102 node_id: u32,
103
104 pub(super) node_config: RaftNodeConfig,
105 pub(super) storage_engine: Option<Arc<SE>>,
106 pub(super) membership: Option<MOF<RaftTypeConfig<SE, SM>>>,
107 pub(super) state_machine: Option<Arc<SM>>,
108 pub(super) transport: Option<TROF<RaftTypeConfig<SE, SM>>>,
109 pub(super) state_machine_handler: Option<Arc<SMHOF<RaftTypeConfig<SE, SM>>>>,
110 pub(super) snapshot_policy: Option<SNP<RaftTypeConfig<SE, SM>>>,
111 pub(super) shutdown_signal: watch::Receiver<()>,
112
113 pub(super) node: Option<Arc<Node<RaftTypeConfig<SE, SM>>>>,
114}
115
116impl<SE, SM> NodeBuilder<SE, SM>
117where
118 SE: StorageEngine + Debug,
119 SM: StateMachine + Debug,
120{
121 pub fn new(
131 cluster_path: Option<&str>,
132 shutdown_signal: watch::Receiver<()>,
133 ) -> Self {
134 let node_config = if let Some(p) = cluster_path {
135 info!("with_override_config from: {}", &p);
136 RaftNodeConfig::new()
137 .expect("Load node_config successfully")
138 .with_override_config(p)
139 .expect("Overwrite node_config successfully")
140 .validate()
141 .expect("Validate node_config successfully")
142 } else {
143 RaftNodeConfig::new()
144 .expect("Load node_config successfully")
145 .validate()
146 .expect("Validate node_config successfully")
147 };
148
149 Self::init(node_config, shutdown_signal)
150 }
151
152 pub fn from_cluster_config(
163 cluster_config: ClusterConfig,
164 shutdown_signal: watch::Receiver<()>,
165 ) -> Self {
166 let mut node_config = RaftNodeConfig::new().expect("Load node_config successfully");
167 node_config.cluster = cluster_config;
168 let node_config = node_config.validate().expect("Validate node_config successfully");
169 Self::init(node_config, shutdown_signal)
170 }
171
172 pub fn init(
174 node_config: RaftNodeConfig,
175 shutdown_signal: watch::Receiver<()>,
176 ) -> Self {
177 Self {
178 node_id: node_config.cluster.node_id,
179 storage_engine: None,
180 state_machine: None,
181 transport: None,
182 membership: None,
183 node_config,
184 shutdown_signal,
185 state_machine_handler: None,
186 snapshot_policy: None,
187 node: None,
188 }
189 }
190
191 pub fn storage_engine(
193 mut self,
194 storage_engine: Arc<SE>,
195 ) -> Self {
196 self.storage_engine = Some(storage_engine);
197 self
198 }
199
200 pub fn state_machine(
202 mut self,
203 state_machine: Arc<SM>,
204 ) -> Self {
205 self.state_machine = Some(state_machine);
206 self
207 }
208
209 pub fn node_config(
211 mut self,
212 node_config: RaftNodeConfig,
213 ) -> Self {
214 self.node_config = node_config;
215 self
216 }
217
218 pub fn raft_config(
220 mut self,
221 config: RaftConfig,
222 ) -> Self {
223 self.node_config.raft = config;
224 self
225 }
226
227 pub async fn build(mut self) -> Result<Self> {
238 let node_id = self.node_id;
239 let node_config = self.node_config.clone();
240
241 let (new_commit_event_tx, new_commit_event_rx) = mpsc::unbounded_channel::<NewCommitData>();
243
244 let state_machine = self.state_machine.take().ok_or_else(|| {
246 SystemError::NodeStartFailed(
247 "State machine must be set before calling build()".to_string(),
248 )
249 })?;
250
251 state_machine.start().await?;
257
258 let lease_cleanup_handle = if node_config.raft.state_machine.lease.enabled {
261 info!(
262 "Starting lease background cleanup worker (interval: {}ms)",
263 node_config.raft.state_machine.lease.interval_ms
264 );
265 Some(Self::spawn_background_cleanup_worker(
266 Arc::clone(&state_machine),
267 node_config.raft.state_machine.lease.interval_ms,
268 self.shutdown_signal.clone(),
269 ))
270 } else {
271 debug!("Lease feature disabled: no background cleanup worker");
272 None
273 };
274
275 let storage_engine = self.storage_engine.take().ok_or_else(|| {
277 SystemError::NodeStartFailed(
278 "Storage engine must be set before calling build()".to_string(),
279 )
280 })?;
281
282 let last_applied_index = state_machine.last_applied().index;
284 info!("Node startup, Last applied index: {}", last_applied_index);
285 let raft_log = {
286 let (log, receiver) = BufferedRaftLog::new(
287 node_id,
288 node_config.raft.persistence.clone(),
289 storage_engine.clone(),
290 );
291
292 log.start(receiver)
294 };
295
296 let transport = self.transport.take().unwrap_or(GrpcTransport::new(node_id));
297
298 let snapshot_policy = self.snapshot_policy.take().unwrap_or(LogSizePolicy::new(
299 node_config.raft.snapshot.max_log_entries_before_snapshot,
300 node_config.raft.snapshot.snapshot_cool_down_since_last_check,
301 ));
302
303 let shutdown_signal = self.shutdown_signal.clone();
304
305 #[cfg(feature = "watch")]
308 let watch_system = {
309 let (broadcast_tx, broadcast_rx) =
310 tokio::sync::broadcast::channel(node_config.raft.watch.event_queue_size);
311
312 let (unregister_tx, unregister_rx) = mpsc::unbounded_channel();
314
315 let registry = Arc::new(WatchRegistry::new(
317 node_config.raft.watch.watcher_buffer_size,
318 unregister_tx,
319 ));
320
321 let dispatcher =
323 WatchDispatcher::new(Arc::clone(®istry), broadcast_rx, unregister_rx);
324
325 let dispatcher_handle = tokio::spawn(async move {
327 dispatcher.run().await;
328 });
329
330 Some((broadcast_tx, registry, dispatcher_handle))
331 };
332
333 let state_machine_handler = self.state_machine_handler.take().unwrap_or_else(|| {
334 #[cfg(feature = "watch")]
335 let watch_event_tx = watch_system.as_ref().map(|(tx, _, _)| tx.clone());
336 #[cfg(not(feature = "watch"))]
337 let watch_event_tx = None;
338
339 Arc::new(DefaultStateMachineHandler::new(
340 node_id,
341 last_applied_index,
342 state_machine.clone(),
343 node_config.raft.snapshot.clone(),
344 snapshot_policy,
345 watch_event_tx,
346 ))
347 });
348 let membership = Arc::new(self.membership.take().unwrap_or_else(|| {
349 RaftMembership::new(
350 node_id,
351 node_config.cluster.initial_cluster.clone(),
352 node_config.clone(),
353 )
354 }));
355
356 let purge_executor = DefaultPurgeExecutor::new(raft_log.clone());
357
358 let (role_tx, role_rx) = mpsc::unbounded_channel();
359 let (event_tx, event_rx) = mpsc::channel(10240);
360 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
361 let event_tx_clone = event_tx.clone(); let node_config_arc = Arc::new(node_config);
364
365 let last_applied_index = Some(state_machine.last_applied().index);
372 let my_role = if node_config_arc.is_learner() {
373 RaftRole::Learner(Box::new(LearnerState::new(
374 node_id,
375 node_config_arc.clone(),
376 )))
377 } else {
378 RaftRole::Follower(Box::new(FollowerState::new(
379 node_id,
380 node_config_arc.clone(),
381 raft_log.load_hard_state().expect("Failed to load hard state"),
382 last_applied_index,
383 )))
384 };
385 let my_role_i32 = my_role.as_i32();
386 let my_current_term = my_role.current_term();
387 info!(
388 "Start node with role: {} and term: {}",
389 my_role_i32, my_current_term
390 );
391
392 let mut raft_core = Raft::<RaftTypeConfig<SE, SM>>::new(
394 node_id,
395 my_role,
396 RaftStorageHandles::<RaftTypeConfig<SE, SM>> {
397 raft_log,
398 state_machine: state_machine.clone(),
399 },
400 transport,
401 RaftCoreHandlers::<RaftTypeConfig<SE, SM>> {
402 election_handler: ElectionHandler::new(node_id),
403 replication_handler: ReplicationHandler::new(node_id),
404 state_machine_handler: state_machine_handler.clone(),
405 purge_executor: Arc::new(purge_executor),
406 },
407 membership.clone(),
408 SignalParams::new(
409 role_tx,
410 role_rx,
411 event_tx,
412 event_rx,
413 cmd_tx.clone(),
414 cmd_rx,
415 shutdown_signal.clone(),
416 ),
417 node_config_arc.clone(),
418 );
419
420 raft_core.register_new_commit_listener(new_commit_event_tx);
422
423 let leader_notifier = LeaderNotifier::new();
425 raft_core.register_leader_change_listener(leader_notifier.sender());
426
427 let (sm_apply_tx, sm_apply_rx) = mpsc::unbounded_channel();
429
430 let sm_worker = StateMachineWorker::new(
432 node_id,
433 state_machine_handler.clone(),
434 sm_apply_rx,
435 event_tx_clone.clone(),
436 self.shutdown_signal.clone(),
437 );
438 let sm_worker_handle = Self::spawn_state_machine_worker(sm_worker);
439
440 let deps = CommitHandlerDependencies {
442 state_machine_handler,
443 raft_log: raft_core.ctx.storage.raft_log.clone(),
444 membership: membership.clone(),
445 event_tx: event_tx_clone,
446 sm_apply_tx,
447 shutdown_signal,
448 max_batch_size: raft_core.ctx.node_config.raft.batching.max_batch_size,
449 };
450
451 let commit_handler = DefaultCommitHandler::<RaftTypeConfig<SE, SM>>::new(
452 node_id,
453 my_role_i32,
454 my_current_term,
455 deps,
456 new_commit_event_rx,
457 );
458 let commit_handler_handle = Self::spawn_state_machine_commit_listener(commit_handler);
462
463 let event_tx = raft_core.event_sender();
464 let (rpc_ready_tx, _rpc_ready_rx) = watch::channel(false);
465
466 let node = Node::<RaftTypeConfig<SE, SM>> {
467 node_id,
468 raft_core: Arc::new(Mutex::new(raft_core)),
469 membership,
470 event_tx: event_tx.clone(),
471 cmd_tx,
472 ready: AtomicBool::new(false),
473 rpc_ready_tx,
474 leader_notifier,
475 node_config: node_config_arc,
476 #[cfg(feature = "watch")]
477 watch_registry: watch_system.as_ref().map(|(_, reg, _)| Arc::clone(reg)),
478 #[cfg(feature = "watch")]
479 _watch_dispatcher_handle: watch_system.map(|(_, _, handle)| handle),
480 _sm_worker_handle: Some(sm_worker_handle),
481 _commit_handler_handle: Some(commit_handler_handle),
482 _lease_cleanup_handle: lease_cleanup_handle,
483 shutdown_signal: self.shutdown_signal.clone(),
484 };
485
486 self.node = Some(Arc::new(node));
487 Ok(self)
488 }
489
490 fn spawn_state_machine_commit_listener(
499 mut commit_handler: DefaultCommitHandler<RaftTypeConfig<SE, SM>>
500 ) -> tokio::task::JoinHandle<()> {
501 tokio::spawn(async move {
502 match commit_handler.run().await {
503 Ok(_) => {
504 info!("commit_handler exit program");
505 }
506 Err(e) => {
507 error!("commit_handler exit program with unexpected error: {:?}", e);
508 println!("commit_handler exit program");
509 }
510 }
511 })
512 }
513
514 fn spawn_state_machine_worker(
524 sm_worker: StateMachineWorker<RaftTypeConfig<SE, SM>>
525 ) -> tokio::task::JoinHandle<()> {
526 tokio::spawn(async move {
527 match sm_worker.run().await {
528 Ok(_) => {
529 info!("state_machine_worker exit program");
530 }
531 Err(e) => {
532 error!(
533 "state_machine_worker exit program with unexpected error: {:?}",
534 e
535 );
536 println!("state_machine_worker exit program");
537 }
538 }
539 })
540 }
541
542 fn spawn_background_cleanup_worker(
560 state_machine: Arc<SM>,
561 interval_ms: u64,
562 mut shutdown_signal: watch::Receiver<()>,
563 ) -> tokio::task::JoinHandle<()> {
564 tokio::spawn(async move {
565 let mut interval = tokio::time::interval(std::time::Duration::from_millis(interval_ms));
566
567 loop {
568 tokio::select! {
569 _ = interval.tick() => {
570 match state_machine.lease_background_cleanup().await {
572 Ok(deleted_keys) => {
573 if !deleted_keys.is_empty() {
574 debug!(
575 "Lease background cleanup: deleted {} expired keys",
576 deleted_keys.len()
577 );
578 }
579 }
580 Err(e) => {
581 error!("Lease background cleanup failed: {:?}", e);
582 }
583 }
584 }
585 _ = shutdown_signal.changed() => {
586 info!("Lease background cleanup received shutdown signal");
587 break;
588 }
589 }
590 }
591
592 debug!("Lease background cleanup worker stopped");
593 })
594 }
595
596 pub fn with_custom_state_machine_handler(
613 mut self,
614 handler: Arc<SMHOF<RaftTypeConfig<SE, SM>>>,
615 ) -> Self {
616 self.state_machine_handler = Some(handler);
617 self
618 }
619
620 async fn start_rpc_server(self) -> Self {
625 debug!("1. --- start RPC server --- ");
626 if let Some(ref node) = self.node {
627 let node_clone = node.clone();
628 let shutdown = self.shutdown_signal.clone();
629 let listen_address = self.node_config.cluster.listen_address;
630 let node_config = self.node_config.clone();
631 tokio::spawn(async move {
632 if let Err(e) =
633 grpc::start_rpc_server(node_clone, listen_address, node_config, shutdown).await
634 {
635 eprintln!("RPC server stops. {e:?}");
636 error!("RPC server stops. {:?}", e);
637 }
638 });
639 self
640 } else {
641 panic!("failed to start RPC server");
642 }
643 }
644
645 pub async fn start(self) -> Result<Arc<Node<RaftTypeConfig<SE, SM>>>> {
667 let builder = self.build().await?;
668 let builder = builder.start_rpc_server().await;
669 builder.node.ok_or_else(|| {
670 SystemError::NodeStartFailed("Node build failed unexpectedly".to_string()).into()
671 })
672 }
673
674 #[cfg(test)]
679 pub(crate) fn new_from_db_path(
680 db_path: &str,
681 shutdown_signal: watch::Receiver<()>,
682 ) -> Self {
683 use std::path::PathBuf;
684
685 let mut node_config = RaftNodeConfig::new().expect("Load node_config successfully");
686 node_config.cluster.db_root_dir = PathBuf::from(db_path);
687 let node_config = node_config.validate().expect("Validate node_config successfully");
688
689 Self::init(node_config, shutdown_signal)
690 }
691}