Skip to main content

d_engine_server/node/
mod.rs

1//! Raft node container and lifecycle management.
2//!
3//! The [`Node`] struct acts as a host for a Raft consensus participant,
4//! coordinating between the core protocol implementation (provided by `d-engine-core`)
5//! and external subsystems:
6//!
7//! ## Key Responsibilities
8//! - Manages the Raft finite state machine lifecycle
9//! - Maintains node readiness state for cluster coordination
10//! - Executes the main event processing loop inside Raft
11//!
12//! ## Example Usage
13//! ```ignore
14//! let node = NodeBuilder::new(node_config).start().await?;
15//! tokio::spawn(async move {
16//!     node.run().await.expect("Raft node execution failed");
17//! });
18//! ```
19
20mod builder;
21pub use builder::*;
22
23mod leader_notifier;
24pub(crate) use leader_notifier::*;
25
26#[doc(hidden)]
27mod type_config;
28use tracing::info;
29#[doc(hidden)]
30pub use type_config::*;
31
32/// Test Modules
33#[cfg(test)]
34mod builder_test;
35#[cfg(test)]
36mod node_test;
37#[cfg(test)]
38mod test_helpers;
39
40use std::fmt::Debug;
41use std::sync::Arc;
42use std::sync::atomic::AtomicBool;
43use std::sync::atomic::Ordering;
44
45use d_engine_core::Membership;
46use d_engine_core::Raft;
47use d_engine_core::RaftEvent;
48use d_engine_core::RaftNodeConfig;
49use d_engine_core::Result;
50use d_engine_core::TypeConfig;
51use d_engine_core::alias::MOF;
52#[cfg(feature = "watch")]
53use d_engine_core::watch::WatchRegistry;
54use tokio::sync::Mutex;
55use tokio::sync::mpsc;
56use tokio::sync::watch;
57
58/// Raft consensus node
59///
60/// Represents a single node participating in a Raft cluster.
61/// Coordinates protocol execution, storage, and networking.
62///
63/// Created via [`NodeBuilder`].
64///
65/// # Running the Node
66///
67/// ```rust,ignore
68/// let node = builder.start()?;
69/// node.run().await?;  // Blocks until shutdown
70/// ```
71pub struct Node<T>
72where
73    T: TypeConfig,
74{
75    pub(crate) node_id: u32,
76    pub(crate) raft_core: Arc<Mutex<Raft<T>>>,
77
78    // Cluster Membership
79    pub(crate) membership: Arc<MOF<T>>,
80
81    // Network & Storage events, (copied from Raft)
82    // TODO: find a better solution
83    pub(crate) event_tx: mpsc::Sender<RaftEvent>,
84
85    // Client commands (drain-driven)
86    pub(crate) cmd_tx: mpsc::Sender<d_engine_core::ClientCmd>,
87
88    pub(crate) ready: AtomicBool,
89
90    /// Notifies when RPC server is ready to accept requests
91    pub(crate) rpc_ready_tx: watch::Sender<bool>,
92
93    /// Notifies when leader is elected (includes leader changes)
94    pub(crate) leader_notifier: LeaderNotifier,
95
96    /// Current membership snapshot; fires on every committed ConfChange.
97    pub(crate) membership_rx: watch::Receiver<crate::membership::MembershipSnapshot>,
98
99    /// Raft node config
100    pub(crate) node_config: Arc<RaftNodeConfig>,
101
102    /// Optional watch registry for watcher registration
103    /// When None, watch functionality is disabled
104    #[cfg(feature = "watch")]
105    pub(crate) watch_registry: Option<Arc<WatchRegistry>>,
106
107    /// Watch dispatcher task handle (keeps dispatcher alive)
108    #[cfg(feature = "watch")]
109    pub(crate) _watch_dispatcher_handle: Option<tokio::task::JoinHandle<()>>,
110
111    /// State machine worker thread handle (dedicated OS thread, not a tokio task).
112    /// Wrapped in Mutex so run(&self) can take it for joining after Raft loop exits,
113    /// ensuring `Arc<DB>` is released before run() returns.
114    pub(crate) sm_worker_handle: std::sync::Mutex<Option<std::thread::JoinHandle<()>>>,
115
116    /// Commit handler task handle (background log application)
117    pub(crate) _commit_handler_handle: Option<tokio::task::JoinHandle<()>>,
118
119    /// Lease cleanup task handle (background TTL cleanup)
120    pub(crate) _lease_cleanup_handle: Option<tokio::task::JoinHandle<()>>,
121
122    /// Shutdown signal for graceful termination
123    pub(crate) shutdown_signal: watch::Receiver<()>,
124}
125
126impl<T> Debug for Node<T>
127where
128    T: TypeConfig,
129{
130    fn fmt(
131        &self,
132        f: &mut std::fmt::Formatter<'_>,
133    ) -> std::fmt::Result {
134        f.debug_struct("Node").field("node_id", &self.node_id).finish()
135    }
136}
137impl<T> Node<T>
138where
139    T: TypeConfig,
140{
141    /// Starts and runs the Raft node's main execution loop.
142    ///
143    /// # Workflow
144    /// Strategy-based bootstrap depending on node type:
145    /// - **Learner**: Skip cluster ready check, join cluster after warmup
146    /// - **Voter**: Wait for cluster ready, then warmup connections
147    ///
148    /// Both paths converge to the Raft event processing loop.
149    ///
150    /// # Errors
151    /// Returns `Err` if any bootstrap step or Raft execution fails.
152    ///
153    /// # Example
154    /// ```ignore
155    /// let node = Node::new(...);
156    /// tokio::spawn(async move {
157    ///     node.run().await.expect("Node execution failed");
158    /// });
159    /// ```
160    pub async fn run(&self) -> Result<()> {
161        let mut shutdown_signal = self.shutdown_signal.clone();
162        shutdown_signal.borrow_and_update();
163
164        // Strategy pattern: bootstrap based on node type
165        if self.node_config.is_learner() {
166            self.run_as_learner(&mut shutdown_signal).await?;
167        } else {
168            self.run_as_voter(&mut shutdown_signal).await?;
169        }
170
171        // Start Raft main loop.
172        // Note: IO thread is closed inside Raft::run() on shutdown before returning.
173        self.start_raft_loop().await?;
174
175        // Shutdown in reverse startup order: join sm-worker thread first so its
176        // Arc<DB> clone is dropped before we return, releasing the RocksDB LOCK.
177        let handle = self.sm_worker_handle.lock().unwrap().take();
178        if let Some(handle) = handle {
179            tokio::task::spawn_blocking(move || {
180                let _ = handle.join();
181            })
182            .await
183            .ok();
184        }
185
186        Ok(())
187    }
188
189    /// Learner bootstrap: skip cluster ready check, join after warmup.
190    async fn run_as_learner(
191        &self,
192        shutdown: &mut watch::Receiver<()>,
193    ) -> Result<()> {
194        info!("Learner node bootstrap initiated");
195
196        // Set RPC ready immediately (no cluster wait needed)
197        self.set_rpc_ready(true);
198
199        // Warm up connections
200        self.warmup_with_shutdown(shutdown).await?;
201
202        // Join cluster as learner
203        let raft = self.raft_core.lock().await;
204        info!(%self.node_config.cluster.node_id, "Learner joining cluster");
205        raft.join_cluster().await?;
206        drop(raft); // Release lock before entering main loop
207
208        Ok(())
209    }
210
211    /// Voter bootstrap: wait for cluster ready, then warmup.
212    async fn run_as_voter(
213        &self,
214        shutdown: &mut watch::Receiver<()>,
215    ) -> Result<()> {
216        info!("Voter node bootstrap initiated");
217
218        // Wait for cluster ready
219        tokio::select! {
220            result = self.membership.check_cluster_is_ready() => result?,
221            _ = shutdown.changed() => {
222                info!("Shutdown during cluster ready check");
223                return Ok(());
224            }
225        }
226
227        // Set RPC ready after cluster is healthy
228        self.set_rpc_ready(true);
229
230        // Warm up connections
231        self.warmup_with_shutdown(shutdown).await
232    }
233
234    /// Warm up peer connections with shutdown handling.
235    async fn warmup_with_shutdown(
236        &self,
237        shutdown: &mut watch::Receiver<()>,
238    ) -> Result<()> {
239        tokio::select! {
240            result = self.membership.pre_warm_connections() => result?,
241            _ = shutdown.changed() => {
242                info!("Shutdown during connection warmup");
243                return Ok(());
244            }
245        }
246        Ok(())
247    }
248
249    /// Start Raft main loop.
250    async fn start_raft_loop(&self) -> Result<()> {
251        let mut raft = self.raft_core.lock().await;
252        raft.run().await
253    }
254
255    /// Marks the node's RPC server as ready to accept requests.
256    ///
257    /// # Parameters
258    /// - `is_ready`: When `true`, marks RPC server as ready. When `false`, marks server as
259    ///   temporarily unavailable.
260    ///
261    /// # Note
262    /// This indicates the RPC server is listening, NOT that leader election is complete.
263    /// Use `leader_change_notifier()` to wait for leader election.
264    ///
265    /// # Usage
266    /// Called internally after RPC server starts and cluster health check passes.
267    pub(crate) fn set_rpc_ready(
268        &self,
269        is_ready: bool,
270    ) {
271        info!("Set node RPC server ready: {}", is_ready);
272        self.ready.store(is_ready, Ordering::SeqCst);
273        // Notify waiters that RPC server is ready
274        let _ = self.rpc_ready_tx.send(is_ready);
275    }
276
277    /// Checks if the node's RPC server is ready to accept requests.
278    ///
279    /// # Returns
280    /// `true` if the RPC server is operational and listening,
281    /// `false` otherwise.
282    ///
283    /// # Note
284    /// This does NOT indicate leader election status. Use `leader_change_notifier()` for that.
285    pub(crate) fn is_rpc_ready(&self) -> bool {
286        self.ready.load(Ordering::Acquire)
287    }
288
289    /// Returns a receiver for leader change notifications.
290    ///
291    /// Subscribe to be notified when:
292    /// - First leader is elected (initial election)
293    /// - Leader changes (re-election)
294    /// - No leader exists (during election)
295    ///
296    /// # Performance
297    /// Event-driven notification, <1ms latency
298    ///
299    /// # Example
300    /// ```ignore
301    /// let mut leader_rx = node.leader_change_notifier();
302    /// while leader_rx.changed().await.is_ok() {
303    ///     if let Some(info) = leader_rx.borrow().as_ref() {
304    ///         println!("Leader: {} (term {})", info.leader_id, info.term);
305    ///     }
306    /// }
307    /// ```
308    pub fn leader_change_notifier(&self) -> watch::Receiver<Option<crate::LeaderInfo>> {
309        self.leader_notifier.subscribe()
310    }
311
312    /// Subscribe to committed membership change notifications.
313    ///
314    /// Returns a `watch::Receiver` that fires whenever a `ConfChange` entry
315    /// commits.  The first `borrow()` returns the current membership state
316    /// without waiting for a change.
317    pub fn membership_change_notifier(
318        &self
319    ) -> watch::Receiver<crate::membership::MembershipSnapshot> {
320        self.membership_rx.clone()
321    }
322
323    /// Returns this node's unique identifier.
324    ///
325    /// Useful for logging, metrics, and integrations that need to identify
326    /// which Raft node is handling operations.
327    pub fn node_id(&self) -> u32 {
328        self.node_id
329    }
330}