d_engine_server/node/mod.rs
1//! Raft node container and lifecycle management.
2//!
3//! The [`Node`] struct acts as a host for a Raft consensus participant,
4//! coordinating between the core protocol implementation (provided by `d-engine-core`)
5//! and external subsystems:
6//!
7//! ## Key Responsibilities
8//! - Manages the Raft finite state machine lifecycle
9//! - Maintains node readiness state for cluster coordination
10//! - Executes the main event processing loop inside Raft
11//!
12//! ## Example Usage
13//! ```ignore
14//! let node = NodeBuilder::new(node_config).start().await?;
15//! tokio::spawn(async move {
16//! node.run().await.expect("Raft node execution failed");
17//! });
18//! ```
19
20mod builder;
21pub use builder::*;
22
23mod leader_notifier;
24pub(crate) use leader_notifier::*;
25
26#[doc(hidden)]
27mod type_config;
28use tracing::info;
29#[doc(hidden)]
30pub use type_config::*;
31
32/// Test Modules
33#[cfg(test)]
34mod builder_test;
35#[cfg(test)]
36mod node_test;
37#[cfg(test)]
38mod test_helpers;
39
40use std::fmt::Debug;
41use std::sync::Arc;
42use std::sync::atomic::AtomicBool;
43use std::sync::atomic::Ordering;
44
45use d_engine_core::Membership;
46use d_engine_core::Raft;
47use d_engine_core::RaftEvent;
48use d_engine_core::RaftNodeConfig;
49use d_engine_core::Result;
50use d_engine_core::TypeConfig;
51use d_engine_core::alias::MOF;
52#[cfg(feature = "watch")]
53use d_engine_core::watch::WatchRegistry;
54use tokio::sync::Mutex;
55use tokio::sync::mpsc;
56use tokio::sync::watch;
57
58/// Raft consensus node
59///
60/// Represents a single node participating in a Raft cluster.
61/// Coordinates protocol execution, storage, and networking.
62///
63/// Created via [`NodeBuilder`].
64///
65/// # Running the Node
66///
67/// ```rust,ignore
68/// let node = builder.start()?;
69/// node.run().await?; // Blocks until shutdown
70/// ```
71pub struct Node<T>
72where
73 T: TypeConfig,
74{
75 pub(crate) node_id: u32,
76 pub(crate) raft_core: Arc<Mutex<Raft<T>>>,
77
78 // Cluster Membership
79 pub(crate) membership: Arc<MOF<T>>,
80
81 // Network & Storage events, (copied from Raft)
82 // TODO: find a better solution
83 pub(crate) event_tx: mpsc::Sender<RaftEvent>,
84
85 // Client commands (drain-driven)
86 pub(crate) cmd_tx: mpsc::Sender<d_engine_core::ClientCmd>,
87
88 pub(crate) ready: AtomicBool,
89
90 /// Notifies when RPC server is ready to accept requests
91 pub(crate) rpc_ready_tx: watch::Sender<bool>,
92
93 /// Notifies when leader is elected (includes leader changes)
94 pub(crate) leader_notifier: LeaderNotifier,
95
96 /// Current membership snapshot; fires on every committed ConfChange.
97 pub(crate) membership_rx: watch::Receiver<crate::membership::MembershipSnapshot>,
98
99 /// Raft node config
100 pub(crate) node_config: Arc<RaftNodeConfig>,
101
102 /// Optional watch registry for watcher registration
103 /// When None, watch functionality is disabled
104 #[cfg(feature = "watch")]
105 pub(crate) watch_registry: Option<Arc<WatchRegistry>>,
106
107 /// Watch dispatcher task handle (keeps dispatcher alive)
108 #[cfg(feature = "watch")]
109 pub(crate) _watch_dispatcher_handle: Option<tokio::task::JoinHandle<()>>,
110
111 /// State machine worker thread handle (dedicated OS thread, not a tokio task).
112 /// Wrapped in Mutex so run(&self) can take it for joining after Raft loop exits,
113 /// ensuring `Arc<DB>` is released before run() returns.
114 pub(crate) sm_worker_handle: std::sync::Mutex<Option<std::thread::JoinHandle<()>>>,
115
116 /// Commit handler task handle (background log application)
117 pub(crate) _commit_handler_handle: Option<tokio::task::JoinHandle<()>>,
118
119 /// Lease cleanup task handle (background TTL cleanup)
120 pub(crate) _lease_cleanup_handle: Option<tokio::task::JoinHandle<()>>,
121
122 /// Shutdown signal for graceful termination
123 pub(crate) shutdown_signal: watch::Receiver<()>,
124}
125
126impl<T> Debug for Node<T>
127where
128 T: TypeConfig,
129{
130 fn fmt(
131 &self,
132 f: &mut std::fmt::Formatter<'_>,
133 ) -> std::fmt::Result {
134 f.debug_struct("Node").field("node_id", &self.node_id).finish()
135 }
136}
137impl<T> Node<T>
138where
139 T: TypeConfig,
140{
141 /// Starts and runs the Raft node's main execution loop.
142 ///
143 /// # Workflow
144 /// Strategy-based bootstrap depending on node type:
145 /// - **Learner**: Skip cluster ready check, join cluster after warmup
146 /// - **Voter**: Wait for cluster ready, then warmup connections
147 ///
148 /// Both paths converge to the Raft event processing loop.
149 ///
150 /// # Errors
151 /// Returns `Err` if any bootstrap step or Raft execution fails.
152 ///
153 /// # Example
154 /// ```ignore
155 /// let node = Node::new(...);
156 /// tokio::spawn(async move {
157 /// node.run().await.expect("Node execution failed");
158 /// });
159 /// ```
160 pub async fn run(&self) -> Result<()> {
161 let mut shutdown_signal = self.shutdown_signal.clone();
162 shutdown_signal.borrow_and_update();
163
164 // Strategy pattern: bootstrap based on node type
165 if self.node_config.is_learner() {
166 self.run_as_learner(&mut shutdown_signal).await?;
167 } else {
168 self.run_as_voter(&mut shutdown_signal).await?;
169 }
170
171 // Start Raft main loop.
172 // Note: IO thread is closed inside Raft::run() on shutdown before returning.
173 self.start_raft_loop().await?;
174
175 // Shutdown in reverse startup order: join sm-worker thread first so its
176 // Arc<DB> clone is dropped before we return, releasing the RocksDB LOCK.
177 let handle = self.sm_worker_handle.lock().unwrap().take();
178 if let Some(handle) = handle {
179 tokio::task::spawn_blocking(move || {
180 let _ = handle.join();
181 })
182 .await
183 .ok();
184 }
185
186 Ok(())
187 }
188
189 /// Learner bootstrap: skip cluster ready check, join after warmup.
190 async fn run_as_learner(
191 &self,
192 shutdown: &mut watch::Receiver<()>,
193 ) -> Result<()> {
194 info!("Learner node bootstrap initiated");
195
196 // Set RPC ready immediately (no cluster wait needed)
197 self.set_rpc_ready(true);
198
199 // Warm up connections
200 self.warmup_with_shutdown(shutdown).await?;
201
202 // Join cluster as learner
203 let raft = self.raft_core.lock().await;
204 info!(%self.node_config.cluster.node_id, "Learner joining cluster");
205 raft.join_cluster().await?;
206 drop(raft); // Release lock before entering main loop
207
208 Ok(())
209 }
210
211 /// Voter bootstrap: wait for cluster ready, then warmup.
212 async fn run_as_voter(
213 &self,
214 shutdown: &mut watch::Receiver<()>,
215 ) -> Result<()> {
216 info!("Voter node bootstrap initiated");
217
218 // Wait for cluster ready
219 tokio::select! {
220 result = self.membership.check_cluster_is_ready() => result?,
221 _ = shutdown.changed() => {
222 info!("Shutdown during cluster ready check");
223 return Ok(());
224 }
225 }
226
227 // Set RPC ready after cluster is healthy
228 self.set_rpc_ready(true);
229
230 // Warm up connections
231 self.warmup_with_shutdown(shutdown).await
232 }
233
234 /// Warm up peer connections with shutdown handling.
235 async fn warmup_with_shutdown(
236 &self,
237 shutdown: &mut watch::Receiver<()>,
238 ) -> Result<()> {
239 tokio::select! {
240 result = self.membership.pre_warm_connections() => result?,
241 _ = shutdown.changed() => {
242 info!("Shutdown during connection warmup");
243 return Ok(());
244 }
245 }
246 Ok(())
247 }
248
249 /// Start Raft main loop.
250 async fn start_raft_loop(&self) -> Result<()> {
251 let mut raft = self.raft_core.lock().await;
252 raft.run().await
253 }
254
255 /// Marks the node's RPC server as ready to accept requests.
256 ///
257 /// # Parameters
258 /// - `is_ready`: When `true`, marks RPC server as ready. When `false`, marks server as
259 /// temporarily unavailable.
260 ///
261 /// # Note
262 /// This indicates the RPC server is listening, NOT that leader election is complete.
263 /// Use `leader_change_notifier()` to wait for leader election.
264 ///
265 /// # Usage
266 /// Called internally after RPC server starts and cluster health check passes.
267 pub(crate) fn set_rpc_ready(
268 &self,
269 is_ready: bool,
270 ) {
271 info!("Set node RPC server ready: {}", is_ready);
272 self.ready.store(is_ready, Ordering::SeqCst);
273 // Notify waiters that RPC server is ready
274 let _ = self.rpc_ready_tx.send(is_ready);
275 }
276
277 /// Checks if the node's RPC server is ready to accept requests.
278 ///
279 /// # Returns
280 /// `true` if the RPC server is operational and listening,
281 /// `false` otherwise.
282 ///
283 /// # Note
284 /// This does NOT indicate leader election status. Use `leader_change_notifier()` for that.
285 pub(crate) fn is_rpc_ready(&self) -> bool {
286 self.ready.load(Ordering::Acquire)
287 }
288
289 /// Returns a receiver for leader change notifications.
290 ///
291 /// Subscribe to be notified when:
292 /// - First leader is elected (initial election)
293 /// - Leader changes (re-election)
294 /// - No leader exists (during election)
295 ///
296 /// # Performance
297 /// Event-driven notification, <1ms latency
298 ///
299 /// # Example
300 /// ```ignore
301 /// let mut leader_rx = node.leader_change_notifier();
302 /// while leader_rx.changed().await.is_ok() {
303 /// if let Some(info) = leader_rx.borrow().as_ref() {
304 /// println!("Leader: {} (term {})", info.leader_id, info.term);
305 /// }
306 /// }
307 /// ```
308 pub fn leader_change_notifier(&self) -> watch::Receiver<Option<crate::LeaderInfo>> {
309 self.leader_notifier.subscribe()
310 }
311
312 /// Subscribe to committed membership change notifications.
313 ///
314 /// Returns a `watch::Receiver` that fires whenever a `ConfChange` entry
315 /// commits. The first `borrow()` returns the current membership state
316 /// without waiting for a change.
317 pub fn membership_change_notifier(
318 &self
319 ) -> watch::Receiver<crate::membership::MembershipSnapshot> {
320 self.membership_rx.clone()
321 }
322
323 /// Returns this node's unique identifier.
324 ///
325 /// Useful for logging, metrics, and integrations that need to identify
326 /// which Raft node is handling operations.
327 pub fn node_id(&self) -> u32 {
328 self.node_id
329 }
330}