1use std::fmt::Debug;
32use std::sync::Arc;
33use std::sync::atomic::AtomicBool;
34
35use d_engine_core::ClusterConfig;
36use d_engine_core::CommitHandler;
37use d_engine_core::CommitHandlerDependencies;
38use d_engine_core::DefaultCommitHandler;
39use d_engine_core::DefaultPurgeExecutor;
40use d_engine_core::DefaultStateMachineHandler;
41use d_engine_core::ElectionHandler;
42use d_engine_core::LogSizePolicy;
43use d_engine_core::NewCommitData;
44use d_engine_core::Raft;
45use d_engine_core::RaftConfig;
46use d_engine_core::RaftCoreHandlers;
47use d_engine_core::RaftLog;
48use d_engine_core::RaftNodeConfig;
49use d_engine_core::RaftRole;
50use d_engine_core::RaftStorageHandles;
51use d_engine_core::ReplicationHandler;
52use d_engine_core::Result;
53use d_engine_core::SignalParams;
54use d_engine_core::StateMachine;
55use d_engine_core::StorageEngine;
56use d_engine_core::SystemError;
57use d_engine_core::alias::MOF;
58use d_engine_core::alias::SMHOF;
59use d_engine_core::alias::SNP;
60use d_engine_core::alias::TROF;
61use d_engine_core::follower_state::FollowerState;
62use d_engine_core::learner_state::LearnerState;
63#[cfg(feature = "watch")]
64use d_engine_core::watch::WatchDispatcher;
65#[cfg(feature = "watch")]
66use d_engine_core::watch::WatchRegistry;
67use tokio::sync::Mutex;
68use tokio::sync::mpsc;
69use tokio::sync::watch;
70use tracing::debug;
71use tracing::error;
72use tracing::info;
73
74use super::LeaderNotifier;
75use super::RaftTypeConfig;
76use crate::Node;
77use crate::membership::RaftMembership;
78use crate::network::grpc;
79use crate::network::grpc::grpc_transport::GrpcTransport;
80use crate::storage::BufferedRaftLog;
81
82pub struct NodeBuilder<SE, SM>
97where
98 SE: StorageEngine + Debug,
99 SM: StateMachine + Debug,
100{
101 node_id: u32,
102
103 pub(super) node_config: RaftNodeConfig,
104 pub(super) storage_engine: Option<Arc<SE>>,
105 pub(super) membership: Option<MOF<RaftTypeConfig<SE, SM>>>,
106 pub(super) state_machine: Option<Arc<SM>>,
107 pub(super) transport: Option<TROF<RaftTypeConfig<SE, SM>>>,
108 pub(super) state_machine_handler: Option<Arc<SMHOF<RaftTypeConfig<SE, SM>>>>,
109 pub(super) snapshot_policy: Option<SNP<RaftTypeConfig<SE, SM>>>,
110 pub(super) shutdown_signal: watch::Receiver<()>,
111
112 pub(super) node: Option<Arc<Node<RaftTypeConfig<SE, SM>>>>,
113}
114
115impl<SE, SM> NodeBuilder<SE, SM>
116where
117 SE: StorageEngine + Debug,
118 SM: StateMachine + Debug,
119{
120 pub fn new(
130 cluster_path: Option<&str>,
131 shutdown_signal: watch::Receiver<()>,
132 ) -> Self {
133 let node_config = if let Some(p) = cluster_path {
134 info!("with_override_config from: {}", &p);
135 RaftNodeConfig::new()
136 .expect("Load node_config successfully")
137 .with_override_config(p)
138 .expect("Overwrite node_config successfully")
139 .validate()
140 .expect("Validate node_config successfully")
141 } else {
142 RaftNodeConfig::new()
143 .expect("Load node_config successfully")
144 .validate()
145 .expect("Validate node_config successfully")
146 };
147
148 Self::init(node_config, shutdown_signal)
149 }
150
151 pub fn from_cluster_config(
162 cluster_config: ClusterConfig,
163 shutdown_signal: watch::Receiver<()>,
164 ) -> Self {
165 let mut node_config = RaftNodeConfig::new().expect("Load node_config successfully");
166 node_config.cluster = cluster_config;
167 let node_config = node_config.validate().expect("Validate node_config successfully");
168 Self::init(node_config, shutdown_signal)
169 }
170
171 pub fn init(
173 node_config: RaftNodeConfig,
174 shutdown_signal: watch::Receiver<()>,
175 ) -> Self {
176 Self {
177 node_id: node_config.cluster.node_id,
178 storage_engine: None,
179 state_machine: None,
180 transport: None,
181 membership: None,
182 node_config,
183 shutdown_signal,
184 state_machine_handler: None,
185 snapshot_policy: None,
186 node: None,
187 }
188 }
189
190 pub fn storage_engine(
192 mut self,
193 storage_engine: Arc<SE>,
194 ) -> Self {
195 self.storage_engine = Some(storage_engine);
196 self
197 }
198
199 pub fn state_machine(
201 mut self,
202 state_machine: Arc<SM>,
203 ) -> Self {
204 self.state_machine = Some(state_machine);
205 self
206 }
207
208 pub fn node_config(
210 mut self,
211 node_config: RaftNodeConfig,
212 ) -> Self {
213 self.node_config = node_config;
214 self
215 }
216
217 pub fn raft_config(
219 mut self,
220 config: RaftConfig,
221 ) -> Self {
222 self.node_config.raft = config;
223 self
224 }
225
226 pub async fn build(mut self) -> Result<Self> {
237 let node_id = self.node_id;
238 let node_config = self.node_config.clone();
239
240 let (new_commit_event_tx, new_commit_event_rx) = mpsc::unbounded_channel::<NewCommitData>();
242
243 let state_machine = self.state_machine.take().ok_or_else(|| {
245 SystemError::NodeStartFailed(
246 "State machine must be set before calling build()".to_string(),
247 )
248 })?;
249
250 state_machine.start().await?;
256
257 let lease_cleanup_handle = if node_config.raft.state_machine.lease.enabled {
260 info!(
261 "Starting lease background cleanup worker (interval: {}ms)",
262 node_config.raft.state_machine.lease.interval_ms
263 );
264 Some(Self::spawn_background_cleanup_worker(
265 Arc::clone(&state_machine),
266 node_config.raft.state_machine.lease.interval_ms,
267 self.shutdown_signal.clone(),
268 ))
269 } else {
270 debug!("Lease feature disabled: no background cleanup worker");
271 None
272 };
273
274 let storage_engine = self.storage_engine.take().ok_or_else(|| {
276 SystemError::NodeStartFailed(
277 "Storage engine must be set before calling build()".to_string(),
278 )
279 })?;
280
281 let last_applied_index = state_machine.last_applied().index;
283 info!("Node startup, Last applied index: {}", last_applied_index);
284 let raft_log = {
285 let (log, receiver) = BufferedRaftLog::new(
286 node_id,
287 node_config.raft.persistence.clone(),
288 storage_engine.clone(),
289 );
290
291 log.start(receiver)
293 };
294
295 let transport = self.transport.take().unwrap_or(GrpcTransport::new(node_id));
296
297 let snapshot_policy = self.snapshot_policy.take().unwrap_or(LogSizePolicy::new(
298 node_config.raft.snapshot.max_log_entries_before_snapshot,
299 node_config.raft.snapshot.snapshot_cool_down_since_last_check,
300 ));
301
302 let shutdown_signal = self.shutdown_signal.clone();
303
304 #[cfg(feature = "watch")]
307 let watch_system = {
308 let (broadcast_tx, broadcast_rx) =
309 tokio::sync::broadcast::channel(node_config.raft.watch.event_queue_size);
310
311 let (unregister_tx, unregister_rx) = mpsc::unbounded_channel();
313
314 let registry = Arc::new(WatchRegistry::new(
316 node_config.raft.watch.watcher_buffer_size,
317 unregister_tx,
318 ));
319
320 let dispatcher =
322 WatchDispatcher::new(Arc::clone(®istry), broadcast_rx, unregister_rx);
323
324 let dispatcher_handle = tokio::spawn(async move {
326 dispatcher.run().await;
327 });
328
329 Some((broadcast_tx, registry, dispatcher_handle))
330 };
331
332 let state_machine_handler = self.state_machine_handler.take().unwrap_or_else(|| {
333 #[cfg(feature = "watch")]
334 let watch_event_tx = watch_system.as_ref().map(|(tx, _, _)| tx.clone());
335 #[cfg(not(feature = "watch"))]
336 let watch_event_tx = None;
337
338 Arc::new(DefaultStateMachineHandler::new(
339 node_id,
340 last_applied_index,
341 node_config.raft.commit_handler.max_entries_per_chunk,
342 state_machine.clone(),
343 node_config.raft.snapshot.clone(),
344 snapshot_policy,
345 watch_event_tx,
346 ))
347 });
348 let membership = Arc::new(self.membership.take().unwrap_or_else(|| {
349 RaftMembership::new(
350 node_id,
351 node_config.cluster.initial_cluster.clone(),
352 node_config.clone(),
353 )
354 }));
355
356 let purge_executor = DefaultPurgeExecutor::new(raft_log.clone());
357
358 let (role_tx, role_rx) = mpsc::unbounded_channel();
359 let (event_tx, event_rx) = mpsc::channel(10240);
360 let event_tx_clone = event_tx.clone(); let node_config_arc = Arc::new(node_config);
363
364 let last_applied_index = Some(state_machine.last_applied().index);
371 let my_role = if node_config_arc.is_learner() {
372 RaftRole::Learner(Box::new(LearnerState::new(
373 node_id,
374 node_config_arc.clone(),
375 )))
376 } else {
377 RaftRole::Follower(Box::new(FollowerState::new(
378 node_id,
379 node_config_arc.clone(),
380 raft_log.load_hard_state().expect("Failed to load hard state"),
381 last_applied_index,
382 )))
383 };
384 let my_role_i32 = my_role.as_i32();
385 let my_current_term = my_role.current_term();
386 info!(
387 "Start node with role: {} and term: {}",
388 my_role_i32, my_current_term
389 );
390
391 let mut raft_core = Raft::<RaftTypeConfig<SE, SM>>::new(
393 node_id,
394 my_role,
395 RaftStorageHandles::<RaftTypeConfig<SE, SM>> {
396 raft_log,
397 state_machine: state_machine.clone(),
398 },
399 transport,
400 RaftCoreHandlers::<RaftTypeConfig<SE, SM>> {
401 election_handler: ElectionHandler::new(node_id),
402 replication_handler: ReplicationHandler::new(node_id),
403 state_machine_handler: state_machine_handler.clone(),
404 purge_executor: Arc::new(purge_executor),
405 },
406 membership.clone(),
407 SignalParams::new(
408 role_tx,
409 role_rx,
410 event_tx,
411 event_rx,
412 shutdown_signal.clone(),
413 ),
414 node_config_arc.clone(),
415 );
416
417 raft_core.register_new_commit_listener(new_commit_event_tx);
419
420 let leader_notifier = LeaderNotifier::new();
422 raft_core.register_leader_change_listener(leader_notifier.sender());
423
424 let deps = CommitHandlerDependencies {
426 state_machine_handler,
427 raft_log: raft_core.ctx.storage.raft_log.clone(),
428 membership: membership.clone(),
429 event_tx: event_tx_clone,
430 shutdown_signal,
431 };
432
433 let commit_handler = DefaultCommitHandler::<RaftTypeConfig<SE, SM>>::new(
434 node_id,
435 my_role_i32,
436 my_current_term,
437 deps,
438 node_config_arc.clone(),
439 new_commit_event_rx,
440 );
441 let commit_handler_handle = Self::spawn_state_machine_commit_listener(commit_handler);
445
446 let event_tx = raft_core.event_sender();
447 let (rpc_ready_tx, _rpc_ready_rx) = watch::channel(false);
448
449 let node = Node::<RaftTypeConfig<SE, SM>> {
450 node_id,
451 raft_core: Arc::new(Mutex::new(raft_core)),
452 membership,
453 event_tx: event_tx.clone(),
454 ready: AtomicBool::new(false),
455 rpc_ready_tx,
456 leader_notifier,
457 node_config: node_config_arc,
458 #[cfg(feature = "watch")]
459 watch_registry: watch_system.as_ref().map(|(_, reg, _)| Arc::clone(reg)),
460 #[cfg(feature = "watch")]
461 _watch_dispatcher_handle: watch_system.map(|(_, _, handle)| handle),
462 _commit_handler_handle: Some(commit_handler_handle),
463 _lease_cleanup_handle: lease_cleanup_handle,
464 shutdown_signal: self.shutdown_signal.clone(),
465 };
466
467 self.node = Some(Arc::new(node));
468 Ok(self)
469 }
470
471 fn spawn_state_machine_commit_listener(
480 mut commit_handler: DefaultCommitHandler<RaftTypeConfig<SE, SM>>
481 ) -> tokio::task::JoinHandle<()> {
482 tokio::spawn(async move {
483 match commit_handler.run().await {
484 Ok(_) => {
485 info!("commit_handler exit program");
486 }
487 Err(e) => {
488 error!("commit_handler exit program with unexpected error: {:?}", e);
489 println!("commit_handler exit program");
490 }
491 }
492 })
493 }
494
495 fn spawn_background_cleanup_worker(
513 state_machine: Arc<SM>,
514 interval_ms: u64,
515 mut shutdown_signal: watch::Receiver<()>,
516 ) -> tokio::task::JoinHandle<()> {
517 tokio::spawn(async move {
518 let mut interval = tokio::time::interval(std::time::Duration::from_millis(interval_ms));
519
520 loop {
521 tokio::select! {
522 _ = interval.tick() => {
523 match state_machine.lease_background_cleanup().await {
525 Ok(deleted_keys) => {
526 if !deleted_keys.is_empty() {
527 debug!(
528 "Lease background cleanup: deleted {} expired keys",
529 deleted_keys.len()
530 );
531 }
532 }
533 Err(e) => {
534 error!("Lease background cleanup failed: {:?}", e);
535 }
536 }
537 }
538 _ = shutdown_signal.changed() => {
539 info!("Lease background cleanup received shutdown signal");
540 break;
541 }
542 }
543 }
544
545 debug!("Lease background cleanup worker stopped");
546 })
547 }
548
549 pub fn with_custom_state_machine_handler(
566 mut self,
567 handler: Arc<SMHOF<RaftTypeConfig<SE, SM>>>,
568 ) -> Self {
569 self.state_machine_handler = Some(handler);
570 self
571 }
572
573 async fn start_rpc_server(self) -> Self {
578 debug!("1. --- start RPC server --- ");
579 if let Some(ref node) = self.node {
580 let node_clone = node.clone();
581 let shutdown = self.shutdown_signal.clone();
582 let listen_address = self.node_config.cluster.listen_address;
583 let node_config = self.node_config.clone();
584 tokio::spawn(async move {
585 if let Err(e) =
586 grpc::start_rpc_server(node_clone, listen_address, node_config, shutdown).await
587 {
588 eprintln!("RPC server stops. {e:?}");
589 error!("RPC server stops. {:?}", e);
590 }
591 });
592 self
593 } else {
594 panic!("failed to start RPC server");
595 }
596 }
597
598 pub async fn start(self) -> Result<Arc<Node<RaftTypeConfig<SE, SM>>>> {
620 let builder = self.build().await?;
621 let builder = builder.start_rpc_server().await;
622 builder.node.ok_or_else(|| {
623 SystemError::NodeStartFailed("Node build failed unexpectedly".to_string()).into()
624 })
625 }
626
627 #[cfg(test)]
632 pub(crate) fn new_from_db_path(
633 db_path: &str,
634 shutdown_signal: watch::Receiver<()>,
635 ) -> Self {
636 use std::path::PathBuf;
637
638 let mut node_config = RaftNodeConfig::new().expect("Load node_config successfully");
639 node_config.cluster.db_root_dir = PathBuf::from(db_path);
640 let node_config = node_config.validate().expect("Validate node_config successfully");
641
642 Self::init(node_config, shutdown_signal)
643 }
644}