d_engine_server/node/mod.rs
1//! Raft node container and lifecycle management.
2//!
3//! The [`Node`] struct acts as a host for a Raft consensus participant,
4//! coordinating between the core protocol implementation (provided by `d-engine-core`)
5//! and external subsystems:
6//!
7//! ## Key Responsibilities
8//! - Manages the Raft finite state machine lifecycle
9//! - Maintains node readiness state for cluster coordination
10//! - Executes the main event processing loop inside Raft
11//!
12//! ## Example Usage
13//! ```ignore
14//! let node = NodeBuilder::new(node_config).start().await?;
15//! tokio::spawn(async move {
16//! node.run().await.expect("Raft node execution failed");
17//! });
18//! ```
19
20mod builder;
21pub use builder::*;
22
23mod leader_notifier;
24pub(crate) use leader_notifier::*;
25
26#[doc(hidden)]
27mod type_config;
28use tracing::info;
29#[doc(hidden)]
30pub use type_config::*;
31
32/// Test Modules
33#[cfg(test)]
34mod builder_test;
35#[cfg(test)]
36mod node_test;
37#[cfg(test)]
38mod test_helpers;
39
40use std::fmt::Debug;
41use std::sync::Arc;
42use std::sync::atomic::AtomicBool;
43use std::sync::atomic::Ordering;
44
45use d_engine_core::Membership;
46use d_engine_core::Raft;
47use d_engine_core::RaftEvent;
48use d_engine_core::RaftNodeConfig;
49use d_engine_core::Result;
50use d_engine_core::TypeConfig;
51use d_engine_core::alias::MOF;
52#[cfg(feature = "watch")]
53use d_engine_core::watch::WatchRegistry;
54use tokio::sync::Mutex;
55use tokio::sync::mpsc;
56use tokio::sync::watch;
57
58/// Raft consensus node
59///
60/// Represents a single node participating in a Raft cluster.
61/// Coordinates protocol execution, storage, and networking.
62///
63/// Created via [`NodeBuilder`].
64///
65/// # Running the Node
66///
67/// ```rust,ignore
68/// let node = builder.start()?;
69/// node.run().await?; // Blocks until shutdown
70/// ```
71pub struct Node<T>
72where
73 T: TypeConfig,
74{
75 pub(crate) node_id: u32,
76 pub(crate) raft_core: Arc<Mutex<Raft<T>>>,
77
78 // Cluster Membership
79 pub(crate) membership: Arc<MOF<T>>,
80
81 // Network & Storage events, (copied from Raft)
82 // TODO: find a better solution
83 pub(crate) event_tx: mpsc::Sender<RaftEvent>,
84
85 // Client commands (drain-driven)
86 pub(crate) cmd_tx: mpsc::UnboundedSender<d_engine_core::ClientCmd>,
87
88 pub(crate) ready: AtomicBool,
89
90 /// Notifies when RPC server is ready to accept requests
91 pub(crate) rpc_ready_tx: watch::Sender<bool>,
92
93 /// Notifies when leader is elected (includes leader changes)
94 pub(crate) leader_notifier: LeaderNotifier,
95
96 /// Raft node config
97 pub node_config: Arc<RaftNodeConfig>,
98
99 /// Optional watch registry for watcher registration
100 /// When None, watch functionality is disabled
101 #[cfg(feature = "watch")]
102 pub(crate) watch_registry: Option<Arc<WatchRegistry>>,
103
104 /// Watch dispatcher task handle (keeps dispatcher alive)
105 #[cfg(feature = "watch")]
106 pub(crate) _watch_dispatcher_handle: Option<tokio::task::JoinHandle<()>>,
107
108 /// State machine worker task handle (background apply operations)
109 pub(crate) _sm_worker_handle: Option<tokio::task::JoinHandle<()>>,
110
111 /// Commit handler task handle (background log application)
112 pub(crate) _commit_handler_handle: Option<tokio::task::JoinHandle<()>>,
113
114 /// Lease cleanup task handle (background TTL cleanup)
115 pub(crate) _lease_cleanup_handle: Option<tokio::task::JoinHandle<()>>,
116
117 /// Shutdown signal for graceful termination
118 pub(crate) shutdown_signal: watch::Receiver<()>,
119}
120
121impl<T> Debug for Node<T>
122where
123 T: TypeConfig,
124{
125 fn fmt(
126 &self,
127 f: &mut std::fmt::Formatter<'_>,
128 ) -> std::fmt::Result {
129 f.debug_struct("Node").field("node_id", &self.node_id).finish()
130 }
131}
132impl<T> Node<T>
133where
134 T: TypeConfig,
135{
136 /// Starts and runs the Raft node's main execution loop.
137 ///
138 /// # Workflow
139 /// Strategy-based bootstrap depending on node type:
140 /// - **Learner**: Skip cluster ready check, join cluster after warmup
141 /// - **Voter**: Wait for cluster ready, then warmup connections
142 ///
143 /// Both paths converge to the Raft event processing loop.
144 ///
145 /// # Errors
146 /// Returns `Err` if any bootstrap step or Raft execution fails.
147 ///
148 /// # Example
149 /// ```ignore
150 /// let node = Node::new(...);
151 /// tokio::spawn(async move {
152 /// node.run().await.expect("Node execution failed");
153 /// });
154 /// ```
155 pub async fn run(&self) -> Result<()> {
156 let mut shutdown_signal = self.shutdown_signal.clone();
157 shutdown_signal.borrow_and_update();
158
159 // Strategy pattern: bootstrap based on node type
160 if self.node_config.is_learner() {
161 self.run_as_learner(&mut shutdown_signal).await?;
162 } else {
163 self.run_as_voter(&mut shutdown_signal).await?;
164 }
165
166 // Start Raft main loop
167 self.start_raft_loop().await
168 }
169
170 /// Learner bootstrap: skip cluster ready check, join after warmup.
171 async fn run_as_learner(
172 &self,
173 shutdown: &mut watch::Receiver<()>,
174 ) -> Result<()> {
175 info!("Learner node bootstrap initiated");
176
177 // Set RPC ready immediately (no cluster wait needed)
178 self.set_rpc_ready(true);
179
180 // Warm up connections
181 self.warmup_with_shutdown(shutdown).await?;
182
183 // Join cluster as learner
184 let raft = self.raft_core.lock().await;
185 info!(%self.node_config.cluster.node_id, "Learner joining cluster");
186 raft.join_cluster().await?;
187 drop(raft); // Release lock before entering main loop
188
189 Ok(())
190 }
191
192 /// Voter bootstrap: wait for cluster ready, then warmup.
193 async fn run_as_voter(
194 &self,
195 shutdown: &mut watch::Receiver<()>,
196 ) -> Result<()> {
197 info!("Voter node bootstrap initiated");
198
199 // Wait for cluster ready
200 tokio::select! {
201 result = self.membership.check_cluster_is_ready() => result?,
202 _ = shutdown.changed() => {
203 info!("Shutdown during cluster ready check");
204 return Ok(());
205 }
206 }
207
208 // Set RPC ready after cluster is healthy
209 self.set_rpc_ready(true);
210
211 // Warm up connections
212 self.warmup_with_shutdown(shutdown).await
213 }
214
215 /// Warm up peer connections with shutdown handling.
216 async fn warmup_with_shutdown(
217 &self,
218 shutdown: &mut watch::Receiver<()>,
219 ) -> Result<()> {
220 tokio::select! {
221 result = self.membership.pre_warm_connections() => result?,
222 _ = shutdown.changed() => {
223 info!("Shutdown during connection warmup");
224 return Ok(());
225 }
226 }
227 Ok(())
228 }
229
230 /// Start Raft main loop.
231 async fn start_raft_loop(&self) -> Result<()> {
232 let mut raft = self.raft_core.lock().await;
233 raft.run().await
234 }
235
236 /// Marks the node's RPC server as ready to accept requests.
237 ///
238 /// # Parameters
239 /// - `is_ready`: When `true`, marks RPC server as ready. When `false`, marks server as
240 /// temporarily unavailable.
241 ///
242 /// # Note
243 /// This indicates the RPC server is listening, NOT that leader election is complete.
244 /// Use `leader_change_notifier()` to wait for leader election.
245 ///
246 /// # Usage
247 /// Called internally after RPC server starts and cluster health check passes.
248 pub fn set_rpc_ready(
249 &self,
250 is_ready: bool,
251 ) {
252 info!("Set node RPC server ready: {}", is_ready);
253 self.ready.store(is_ready, Ordering::SeqCst);
254 // Notify waiters that RPC server is ready
255 let _ = self.rpc_ready_tx.send(is_ready);
256 }
257
258 /// Checks if the node's RPC server is ready to accept requests.
259 ///
260 /// # Returns
261 /// `true` if the RPC server is operational and listening,
262 /// `false` otherwise.
263 ///
264 /// # Note
265 /// This does NOT indicate leader election status. Use `leader_change_notifier()` for that.
266 pub fn is_rpc_ready(&self) -> bool {
267 self.ready.load(Ordering::Acquire)
268 }
269
270 /// Returns a receiver for node readiness notifications.
271 ///
272 /// Subscribe to this channel to be notified when the node becomes ready
273 /// to participate in cluster operations (NOT the same as leader election).
274 ///
275 /// # Example
276 /// ```ignore
277 /// let ready_rx = node.ready_notifier();
278 /// ready_rx.wait_for(|&ready| ready).await?;
279 /// // RPC server is now listening
280 /// ```
281 pub fn ready_notifier(&self) -> watch::Receiver<bool> {
282 self.rpc_ready_tx.subscribe()
283 }
284
285 /// Returns a receiver for leader change notifications.
286 ///
287 /// Subscribe to be notified when:
288 /// - First leader is elected (initial election)
289 /// - Leader changes (re-election)
290 /// - No leader exists (during election)
291 ///
292 /// # Performance
293 /// Event-driven notification, <1ms latency
294 ///
295 /// # Example
296 /// ```ignore
297 /// let mut leader_rx = node.leader_change_notifier();
298 /// while leader_rx.changed().await.is_ok() {
299 /// if let Some(info) = leader_rx.borrow().as_ref() {
300 /// println!("Leader: {} (term {})", info.leader_id, info.term);
301 /// }
302 /// }
303 /// ```
304 pub fn leader_change_notifier(&self) -> watch::Receiver<Option<crate::LeaderInfo>> {
305 self.leader_notifier.subscribe()
306 }
307
308 /// Create a Node from a pre-built Raft instance
309 /// This method is designed to support testing and external builders
310 pub fn from_raft(
311 raft: Raft<T>,
312 shutdown_signal: watch::Receiver<()>,
313 ) -> Self {
314 let event_tx = raft.event_sender();
315 let node_config = raft.ctx.node_config();
316 let membership = raft.ctx.membership();
317 let node_id = raft.node_id;
318
319 let (rpc_ready_tx, _rpc_ready_rx) = watch::channel(false);
320 let leader_notifier = LeaderNotifier::new();
321
322 // Create dummy cmd_tx (this path is mainly for testing)
323 let (cmd_tx, _cmd_rx) = mpsc::unbounded_channel();
324
325 Node {
326 node_id,
327 raft_core: Arc::new(Mutex::new(raft)),
328 membership,
329 event_tx,
330 cmd_tx,
331 ready: AtomicBool::new(false),
332 rpc_ready_tx,
333 leader_notifier,
334 node_config,
335 #[cfg(feature = "watch")]
336 watch_registry: None,
337 #[cfg(feature = "watch")]
338 _watch_dispatcher_handle: None,
339 _sm_worker_handle: None,
340 _commit_handler_handle: None,
341 _lease_cleanup_handle: None,
342 shutdown_signal,
343 }
344 }
345
346 /// Returns this node's unique identifier.
347 ///
348 /// Useful for logging, metrics, and integrations that need to identify
349 /// which Raft node is handling operations.
350 pub fn node_id(&self) -> u32 {
351 self.node_id
352 }
353}