d_engine/node/mod.rs
1//! Raft node container and lifecycle management.
2//!
3//! The [`Node`] struct acts as a host for a Raft consensus participant,
4//! coordinating between the core protocol implementation ([`crate::raft::Raft`])
5//! and external subsystems:
6//!
7//! ## Key Responsibilities
8//! - Manages the Raft finite state machine lifecycle
9//! - Maintains node readiness state for cluster coordination
10//! - Executes the main event processing loop inside Raft
11//!
12//! ## Example Usage
13//! ```ignore
14//! let node = NodeBuilder::new(node_config).build().ready().unwrap();
15//! tokio::spawn(async move {
16//! node.run().await.expect("Raft node execution failed");
17//! });
18//! ```
19
20mod builder;
21pub use builder::*;
22
23#[doc(hidden)]
24mod type_config;
25use tracing::debug;
26use tracing::info;
27#[doc(hidden)]
28pub use type_config::*;
29
30/// Test Modules
31#[cfg(test)]
32mod builder_test;
33#[cfg(test)]
34mod node_test;
35
36use std::fmt::Debug;
37use std::sync::atomic::AtomicBool;
38use std::sync::atomic::Ordering;
39use std::sync::Arc;
40
41use tokio::sync::mpsc;
42use tokio::sync::Mutex;
43
44use crate::alias::MOF;
45use crate::Membership;
46use crate::Raft;
47use crate::RaftEvent;
48use crate::RaftNodeConfig;
49use crate::Result;
50use crate::TypeConfig;
51
52/// Raft node container
53pub struct Node<T>
54where
55 T: TypeConfig,
56{
57 pub(crate) node_id: u32,
58 pub(crate) raft_core: Arc<Mutex<Raft<T>>>,
59
60 // Cluster Membership
61 pub(crate) membership: Arc<MOF<T>>,
62
63 // Network & Storage events, (copied from Raft)
64 // TODO: find a better solution
65 pub(crate) event_tx: mpsc::Sender<RaftEvent>,
66 pub(crate) ready: AtomicBool,
67
68 /// Raft node config
69 pub node_config: Arc<RaftNodeConfig>,
70}
71
72impl<T> Debug for Node<T>
73where
74 T: TypeConfig,
75{
76 fn fmt(
77 &self,
78 f: &mut std::fmt::Formatter<'_>,
79 ) -> std::fmt::Result {
80 f.debug_struct("Node").field("node_id", &self.node_id).finish()
81 }
82}
83impl<T> Node<T>
84where
85 T: TypeConfig,
86{
87 /// Starts and runs the Raft node's main execution loop.
88 ///
89 /// # Workflow
90 /// 1. Establishes network connections with cluster peers
91 /// 2. Performs cluster health check
92 /// 3. Marks node as ready for operation
93 /// 4. Joins the Raft cluster
94 /// 5. Executes the core Raft event processing loop
95 ///
96 /// # Errors
97 /// Returns `Err` if any of these operations fail:
98 /// - Peer connection establishment
99 /// - Cluster health check
100 /// - Raft core initialization
101 /// - Event processing failures
102 ///
103 /// # Example
104 /// ```ignore
105 /// let node = Node::new(...);
106 /// tokio::spawn(async move {
107 /// node.run().await.expect("Node execution failed");
108 /// });
109 /// ```
110 pub async fn run(&self) -> Result<()> {
111 // 1. Connect with other peers
112 // let peer_channels = Self::connect_with_peers(self.node_id,
113 // self.node_config.clone()).await?;
114
115 // 2. Healthcheck if all server is start serving
116 self.membership.check_cluster_is_ready().await?;
117
118 // 3. Set node is ready to run Raft protocol
119 self.set_ready(true);
120
121 // 4. Warm up connections with peers
122 self.membership.pre_warm_connections().await?;
123
124 let mut raft = self.raft_core.lock().await;
125 // 5. if join as a new node
126 debug!(%self.node_config.cluster.node_id);
127 if self.node_config.is_joining() {
128 info!(%self.node_config.cluster.node_id, "Node is joining...");
129 raft.join_cluster().await?;
130 }
131
132 // 6. Run the main event processing loop
133 raft.run().await?;
134
135 Ok(())
136 }
137
138 /// Controls the node's operational readiness state.
139 ///
140 /// # Parameters
141 /// - `is_ready`: When `true`, marks node as ready to participate in cluster. When `false`,
142 /// marks node as temporarily unavailable.
143 ///
144 /// # Usage
145 /// Typically used during cluster bootstrap or maintenance operations.
146 /// The readiness state is atomically updated using SeqCst ordering.
147 pub fn set_ready(
148 &self,
149 is_ready: bool,
150 ) {
151 info!("Set node is ready to run Raft protocol");
152 self.ready.store(is_ready, Ordering::SeqCst);
153 }
154
155 /// Checks if the node is in a ready state to participate in cluster operations.
156 ///
157 /// # Returns
158 /// `true` if the node is operational and ready to handle Raft protocol operations,
159 /// `false` otherwise.
160 pub fn server_is_ready(&self) -> bool {
161 self.ready.load(Ordering::Acquire)
162 }
163}