1use std::fmt::Debug;
32use std::sync::Arc;
33use std::sync::atomic::AtomicBool;
34
35use d_engine_core::ClusterConfig;
36use d_engine_core::CommitHandler;
37use d_engine_core::CommitHandlerDependencies;
38use d_engine_core::DefaultCommitHandler;
39use d_engine_core::DefaultPurgeExecutor;
40use d_engine_core::DefaultStateMachineHandler;
41use d_engine_core::ElectionHandler;
42use d_engine_core::LogSizePolicy;
43use d_engine_core::NewCommitData;
44use d_engine_core::Raft;
45use d_engine_core::RaftConfig;
46use d_engine_core::RaftCoreHandlers;
47use d_engine_core::RaftLog;
48use d_engine_core::RaftNodeConfig;
49use d_engine_core::RaftRole;
50use d_engine_core::RaftStorageHandles;
51use d_engine_core::ReplicationHandler;
52use d_engine_core::Result;
53use d_engine_core::RoleEvent;
54use d_engine_core::SignalParams;
55use d_engine_core::StateMachine;
56use d_engine_core::StateMachineWorker;
57use d_engine_core::StorageEngine;
58use d_engine_core::SystemError;
59use d_engine_core::alias::MOF;
60use d_engine_core::alias::SMHOF;
61use d_engine_core::alias::SNP;
62use d_engine_core::alias::TROF;
63use d_engine_core::follower_state::FollowerState;
64use d_engine_core::learner_state::LearnerState;
65#[cfg(feature = "watch")]
66use d_engine_core::watch::WatchDispatcher;
67#[cfg(feature = "watch")]
68use d_engine_core::watch::WatchRegistry;
69use tokio::sync::Mutex;
70use tokio::sync::mpsc;
71use tokio::sync::watch;
72use tracing::debug;
73use tracing::error;
74use tracing::info;
75
76use super::LeaderNotifier;
77use super::RaftTypeConfig;
78use crate::Node;
79use crate::membership::RaftMembership;
80use crate::network::grpc;
81use crate::network::grpc::grpc_transport::GrpcTransport;
82use crate::storage::BufferedRaftLog;
83
84pub struct NodeBuilder<SE, SM>
99where
100 SE: StorageEngine + Debug,
101 SM: StateMachine + Debug,
102{
103 node_id: u32,
104
105 pub(super) node_config: RaftNodeConfig,
106 pub(super) storage_engine: Option<Arc<SE>>,
107 pub(super) membership: Option<MOF<RaftTypeConfig<SE, SM>>>,
108 pub(super) state_machine: Option<Arc<SM>>,
109 pub(super) transport: Option<TROF<RaftTypeConfig<SE, SM>>>,
110 pub(super) state_machine_handler: Option<Arc<SMHOF<RaftTypeConfig<SE, SM>>>>,
111 pub(super) snapshot_policy: Option<SNP<RaftTypeConfig<SE, SM>>>,
112 pub(super) shutdown_signal: watch::Receiver<()>,
113
114 pub(super) node: Option<Arc<Node<RaftTypeConfig<SE, SM>>>>,
115}
116
117impl<SE, SM> NodeBuilder<SE, SM>
118where
119 SE: StorageEngine + Debug,
120 SM: StateMachine + Debug,
121{
122 pub fn new(
132 cluster_path: Option<&str>,
133 shutdown_signal: watch::Receiver<()>,
134 ) -> Self {
135 let node_config = if let Some(p) = cluster_path {
136 info!("with_override_config from: {}", &p);
137 RaftNodeConfig::new()
138 .expect("Load node_config successfully")
139 .with_override_config(p)
140 .expect("Overwrite node_config successfully")
141 .validate()
142 .expect("Validate node_config successfully")
143 } else {
144 RaftNodeConfig::new()
145 .expect("Load node_config successfully")
146 .validate()
147 .expect("Validate node_config successfully")
148 };
149
150 Self::init(node_config, shutdown_signal)
151 }
152
153 pub fn from_cluster_config(
164 cluster_config: ClusterConfig,
165 shutdown_signal: watch::Receiver<()>,
166 ) -> Self {
167 let mut node_config = RaftNodeConfig::new().expect("Load node_config successfully");
168 node_config.cluster = cluster_config;
169 let node_config = node_config.validate().expect("Validate node_config successfully");
170 Self::init(node_config, shutdown_signal)
171 }
172
173 pub fn from_node_config(
191 node_config: RaftNodeConfig,
192 shutdown_signal: watch::Receiver<()>,
193 ) -> Self {
194 Self::init(node_config, shutdown_signal)
195 }
196
197 pub(crate) fn init(
199 node_config: RaftNodeConfig,
200 shutdown_signal: watch::Receiver<()>,
201 ) -> Self {
202 Self {
203 node_id: node_config.cluster.node_id,
204 storage_engine: None,
205 state_machine: None,
206 transport: None,
207 membership: None,
208 node_config,
209 shutdown_signal,
210 state_machine_handler: None,
211 snapshot_policy: None,
212 node: None,
213 }
214 }
215
216 pub fn storage_engine(
218 mut self,
219 storage_engine: Arc<SE>,
220 ) -> Self {
221 self.storage_engine = Some(storage_engine);
222 self
223 }
224
225 pub fn state_machine(
227 mut self,
228 state_machine: Arc<SM>,
229 ) -> Self {
230 self.state_machine = Some(state_machine);
231 self
232 }
233
234 pub fn node_config(
236 mut self,
237 node_config: RaftNodeConfig,
238 ) -> Self {
239 self.node_id = node_config.cluster.node_id;
240 self.node_config = node_config;
241 self
242 }
243
244 pub fn raft_config(
246 mut self,
247 config: RaftConfig,
248 ) -> Self {
249 self.node_config.raft = config;
250 self
251 }
252
253 pub async fn build(mut self) -> Result<Self> {
264 let node_id = self.node_id;
265 let node_config = self.node_config.clone();
266
267 let (new_commit_event_tx, new_commit_event_rx) = mpsc::unbounded_channel::<NewCommitData>();
269
270 let state_machine = self.state_machine.take().ok_or_else(|| {
272 SystemError::NodeStartFailed(
273 "State machine must be set before calling build()".to_string(),
274 )
275 })?;
276
277 state_machine.start().await?;
283
284 let lease_cleanup_handle = if node_config.raft.state_machine.lease.enabled {
287 info!(
288 "Starting lease background cleanup worker (interval: {}ms)",
289 node_config.raft.state_machine.lease.cleanup_interval_ms
290 );
291 Some(Self::spawn_background_cleanup_worker(
292 Arc::clone(&state_machine),
293 node_config.raft.state_machine.lease.cleanup_interval_ms,
294 self.shutdown_signal.clone(),
295 ))
296 } else {
297 debug!("Lease feature disabled: no background cleanup worker");
298 None
299 };
300
301 let storage_engine = self.storage_engine.take().ok_or_else(|| {
303 SystemError::NodeStartFailed(
304 "Storage engine must be set before calling build()".to_string(),
305 )
306 })?;
307
308 let last_applied_index = state_machine.last_applied().index;
310 info!("Node startup, Last applied index: {}", last_applied_index);
311
312 let (role_tx, role_rx) = mpsc::unbounded_channel();
315
316 let raft_log = {
317 let (log, receiver) = BufferedRaftLog::new(
318 node_id,
319 node_config.raft.persistence.clone(),
320 storage_engine.clone(),
321 );
322
323 log.start(receiver, Some(role_tx.clone()))
326 };
327
328 let (peer_failure_tx, peer_failure_rx) = mpsc::channel::<u32>(64);
331 let (peer_success_tx, peer_success_rx) = mpsc::channel::<u32>(64);
332 let transport = self.transport.take().unwrap_or_else(|| {
333 GrpcTransport::new_with_channels(node_id, peer_failure_tx, peer_success_tx)
334 });
335
336 let snapshot_policy = self.snapshot_policy.take().unwrap_or(LogSizePolicy::new(
337 node_config.raft.snapshot.max_log_entries_before_snapshot,
338 node_config.raft.snapshot.snapshot_cool_down_since_last_check,
339 ));
340
341 let shutdown_signal = self.shutdown_signal.clone();
342
343 #[cfg(feature = "watch")]
346 let watch_system = {
347 let (broadcast_tx, broadcast_rx) =
348 tokio::sync::broadcast::channel(node_config.raft.watch.event_queue_size);
349
350 let (unregister_tx, unregister_rx) = mpsc::unbounded_channel();
352
353 let registry = Arc::new(WatchRegistry::new_with_limits(
355 node_config.raft.watch.watcher_buffer_size,
356 node_config.raft.watch.max_watcher_count,
357 unregister_tx,
358 ));
359
360 let last_applied_ref = Arc::new(std::sync::atomic::AtomicU64::new(last_applied_index));
362
363 let dispatcher = WatchDispatcher::new(
365 Arc::clone(®istry),
366 broadcast_rx,
367 unregister_rx,
368 Arc::clone(&last_applied_ref),
369 node_config.raft.watch.heartbeat_interval_ms,
370 );
371
372 let dispatcher_handle = tokio::spawn(async move {
374 dispatcher.run().await;
375 });
376
377 Some((broadcast_tx, registry, dispatcher_handle, last_applied_ref))
378 };
379
380 let state_machine_handler = self.state_machine_handler.take().unwrap_or_else(|| {
381 #[cfg(feature = "watch")]
382 let watch_event_tx = watch_system.as_ref().map(|(tx, _, _, _)| tx.clone());
383
384 #[cfg(feature = "watch")]
386 let prev_kv_count = watch_system
387 .as_ref()
388 .map(|(_, registry, _, _)| registry.prev_kv_watcher_count_arc())
389 .unwrap_or_else(|| Arc::new(std::sync::atomic::AtomicUsize::new(0)));
390
391 #[cfg(not(feature = "watch"))]
392 let watch_event_tx: Option<
393 tokio::sync::broadcast::Sender<d_engine_proto::client::WatchResponse>,
394 > = None;
395
396 Arc::new(DefaultStateMachineHandler::new(
397 node_id,
398 last_applied_index,
399 state_machine.clone(),
400 node_config.raft.snapshot.clone(),
401 snapshot_policy,
402 #[cfg(feature = "watch")]
403 watch_event_tx,
404 #[cfg(not(feature = "watch"))]
405 watch_event_tx,
406 #[cfg(feature = "watch")]
407 prev_kv_count,
408 #[cfg(not(feature = "watch"))]
409 Arc::new(std::sync::atomic::AtomicUsize::new(0)),
410 ))
411 });
412 let (membership_inner, zombie_rx) = self.membership.take().map_or_else(
413 || {
414 RaftMembership::new(
415 node_id,
416 node_config.cluster.initial_cluster.clone(),
417 node_config.clone(),
418 )
419 },
420 |m| {
422 let (_tx, rx) = tokio::sync::mpsc::channel(1);
423 (m, rx)
424 },
425 );
426 let membership = Arc::new(membership_inner);
427 let membership_rx = membership.subscribe_membership();
429
430 let purge_executor = DefaultPurgeExecutor::new(raft_log.clone());
431
432 let (event_tx, event_rx) = mpsc::channel(10240);
434 let (cmd_tx, cmd_rx) = mpsc::channel(node_config.raft.cmd_channel_capacity);
435 let event_tx_clone = event_tx.clone(); let role_tx_for_sm = role_tx.clone(); let role_tx_for_zombie = role_tx.clone();
444 let membership_for_zombie = Arc::clone(&membership);
445 tokio::spawn(async move {
446 let mut zombie_rx = zombie_rx;
447 while let Some(node_id) = zombie_rx.recv().await {
448 if membership_for_zombie.is_zombie_valid(node_id) {
449 let _ = role_tx_for_zombie.send(RoleEvent::ZombieDetected(node_id));
450 }
451 }
452 });
453
454 let membership_for_peer_failure = Arc::clone(&membership);
457 tokio::spawn(async move {
458 let mut peer_failure_rx = peer_failure_rx;
459 while let Some(failed_peer_id) = peer_failure_rx.recv().await {
460 membership_for_peer_failure.on_peer_stream_failed(failed_peer_id).await;
461 }
462 });
463
464 let membership_for_peer_success = Arc::clone(&membership);
467 tokio::spawn(async move {
468 let mut peer_success_rx = peer_success_rx;
469 while let Some(succeeded_peer_id) = peer_success_rx.recv().await {
470 membership_for_peer_success.on_peer_stream_success(succeeded_peer_id).await;
471 }
472 });
473
474 let node_config_arc = Arc::new(node_config);
475
476 let last_applied_index = Some(state_machine.last_applied().index);
483 let my_role = if node_config_arc.is_learner() {
484 RaftRole::Learner(Box::new(LearnerState::new(
485 node_id,
486 node_config_arc.clone(),
487 )))
488 } else {
489 RaftRole::Follower(Box::new(FollowerState::new(
490 node_id,
491 node_config_arc.clone(),
492 raft_log.load_hard_state().expect("Failed to load hard state"),
493 last_applied_index,
494 )))
495 };
496 let my_role_i32 = my_role.as_i32();
497 let my_current_term = my_role.current_term();
498 info!(
499 "Start node with role: {} and term: {}",
500 my_role_i32, my_current_term
501 );
502
503 let mut raft_core = Raft::<RaftTypeConfig<SE, SM>>::new(
505 node_id,
506 my_role,
507 RaftStorageHandles::<RaftTypeConfig<SE, SM>> {
508 raft_log,
509 state_machine: state_machine.clone(),
510 },
511 transport,
512 RaftCoreHandlers::<RaftTypeConfig<SE, SM>> {
513 election_handler: ElectionHandler::new(node_id),
514 replication_handler: ReplicationHandler::new(node_id),
515 state_machine_handler: state_machine_handler.clone(),
516 purge_executor: Arc::new(purge_executor),
517 },
518 membership.clone(),
519 SignalParams::new(
520 role_tx,
521 role_rx,
522 event_tx,
523 event_rx,
524 cmd_tx.clone(),
525 cmd_rx,
526 shutdown_signal.clone(),
527 ),
528 node_config_arc.clone(),
529 );
530
531 raft_core.register_new_commit_listener(new_commit_event_tx);
533
534 let leader_notifier = LeaderNotifier::new();
536 raft_core.register_leader_change_listener(leader_notifier.sender());
537
538 let (sm_apply_tx, sm_apply_rx) = mpsc::unbounded_channel();
540
541 let sm_worker = StateMachineWorker::new(
543 node_id,
544 state_machine_handler.clone(),
545 sm_apply_rx,
546 role_tx_for_sm,
547 self.shutdown_signal.clone(),
548 );
549 let sm_worker_handle = Self::spawn_state_machine_worker(sm_worker);
550
551 let deps = CommitHandlerDependencies {
553 state_machine_handler,
554 raft_log: raft_core.ctx.storage.raft_log.clone(),
555 membership: membership.clone(),
556 event_tx: event_tx_clone,
557 sm_apply_tx,
558 shutdown_signal,
559 max_batch_size: raft_core.ctx.node_config.raft.batching.max_batch_size,
560 };
561
562 let commit_handler = DefaultCommitHandler::<RaftTypeConfig<SE, SM>>::new(
563 node_id,
564 my_role_i32,
565 my_current_term,
566 deps,
567 new_commit_event_rx,
568 );
569 let commit_handler_handle = Self::spawn_state_machine_commit_listener(commit_handler);
573
574 let event_tx = raft_core.event_sender();
575 let (rpc_ready_tx, _rpc_ready_rx) = watch::channel(false);
576
577 let node = Node::<RaftTypeConfig<SE, SM>> {
578 node_id,
579 raft_core: Arc::new(Mutex::new(raft_core)),
580 membership,
581 event_tx: event_tx.clone(),
582 cmd_tx,
583 ready: AtomicBool::new(false),
584 rpc_ready_tx,
585 leader_notifier,
586 membership_rx,
587 node_config: node_config_arc,
588 #[cfg(feature = "watch")]
589 watch_registry: watch_system.as_ref().map(|(_, reg, _, _)| Arc::clone(reg)),
590 #[cfg(feature = "watch")]
591 _watch_dispatcher_handle: watch_system.map(|(_, _, handle, _)| handle),
592 sm_worker_handle: std::sync::Mutex::new(Some(sm_worker_handle)),
593 _commit_handler_handle: Some(commit_handler_handle),
594 _lease_cleanup_handle: lease_cleanup_handle,
595 shutdown_signal: self.shutdown_signal.clone(),
596 };
597
598 self.node = Some(Arc::new(node));
599 Ok(self)
600 }
601
602 fn spawn_state_machine_commit_listener(
611 mut commit_handler: DefaultCommitHandler<RaftTypeConfig<SE, SM>>
612 ) -> tokio::task::JoinHandle<()> {
613 tokio::spawn(async move {
614 match commit_handler.run().await {
615 Ok(_) => {
616 info!("commit_handler exit program");
617 }
618 Err(e) => {
619 error!("commit_handler exit program with unexpected error: {:?}", e);
620 println!("commit_handler exit program");
621 }
622 }
623 })
624 }
625
626 fn spawn_state_machine_worker(
635 sm_worker: StateMachineWorker<RaftTypeConfig<SE, SM>>
636 ) -> std::thread::JoinHandle<()> {
637 let node_id = sm_worker.node_id();
638 let handle = tokio::runtime::Handle::current();
639 std::thread::Builder::new()
640 .name(format!("sm-apply-{}", node_id))
641 .spawn(move || {
642 handle.block_on(async move {
643 match sm_worker.run().await {
644 Ok(_) => info!("state_machine_worker exit"),
645 Err(e) => error!("state_machine_worker exit with error: {:?}", e),
646 }
647 });
648 })
649 .expect("failed to spawn sm-apply thread")
650 }
651
652 fn spawn_background_cleanup_worker(
670 state_machine: Arc<SM>,
671 cleanup_interval_ms: u64,
672 mut shutdown_signal: watch::Receiver<()>,
673 ) -> tokio::task::JoinHandle<()> {
674 tokio::spawn(async move {
675 let mut interval =
676 tokio::time::interval(std::time::Duration::from_millis(cleanup_interval_ms));
677
678 loop {
679 tokio::select! {
680 _ = interval.tick() => {
681 match state_machine.lease_background_cleanup().await {
683 Ok(deleted_keys) => {
684 if !deleted_keys.is_empty() {
685 debug!(
686 "Lease background cleanup: deleted {} expired keys",
687 deleted_keys.len()
688 );
689 }
690 }
691 Err(e) => {
692 error!("Lease background cleanup failed: {:?}", e);
693 }
694 }
695 }
696 _ = shutdown_signal.changed() => {
697 info!("Lease background cleanup received shutdown signal");
698 break;
699 }
700 }
701 }
702
703 debug!("Lease background cleanup worker stopped");
704 })
705 }
706
707 pub fn with_custom_state_machine_handler(
720 mut self,
721 handler: Arc<SMHOF<RaftTypeConfig<SE, SM>>>,
722 ) -> Self {
723 self.state_machine_handler = Some(handler);
724 self
725 }
726
727 async fn start_rpc_server(self) -> Self {
732 debug!("1. --- start RPC server --- ");
733 if let Some(ref node) = self.node {
734 let node_clone = node.clone();
735 let shutdown = self.shutdown_signal.clone();
736 let listen_address = self.node_config.cluster.listen_address;
737 let node_config = self.node_config.clone();
738 tokio::spawn(async move {
739 if let Err(e) =
740 grpc::start_rpc_server(node_clone, listen_address, node_config, shutdown).await
741 {
742 eprintln!("RPC server stops. {e:?}");
743 error!("RPC server stops. {:?}", e);
744 }
745 });
746 self
747 } else {
748 panic!("failed to start RPC server");
749 }
750 }
751
752 pub async fn start(self) -> Result<Arc<Node<RaftTypeConfig<SE, SM>>>> {
774 let builder = self.build().await?;
775 let builder = builder.start_rpc_server().await;
776 builder.node.ok_or_else(|| {
777 SystemError::NodeStartFailed("Node build failed unexpectedly".to_string()).into()
778 })
779 }
780
781 #[cfg(test)]
786 pub(crate) fn new_from_db_path(
787 db_path: &str,
788 shutdown_signal: watch::Receiver<()>,
789 ) -> Self {
790 use std::path::PathBuf;
791
792 let mut node_config = RaftNodeConfig::new().expect("Load node_config successfully");
793 node_config.cluster.db_root_dir = PathBuf::from(db_path);
794 let node_config = node_config.validate().expect("Validate node_config successfully");
795
796 Self::init(node_config, shutdown_signal)
797 }
798}