Skip to main content

d_engine_server/node/
mod.rs

1//! Raft node container and lifecycle management.
2//!
3//! The [`Node`] struct acts as a host for a Raft consensus participant,
4//! coordinating between the core protocol implementation (provided by `d-engine-core`)
5//! and external subsystems:
6//!
7//! ## Key Responsibilities
8//! - Manages the Raft finite state machine lifecycle
9//! - Maintains node readiness state for cluster coordination
10//! - Executes the main event processing loop inside Raft
11//!
12//! ## Example Usage
13//! ```ignore
14//! let node = NodeBuilder::new(node_config).start().await?;
15//! tokio::spawn(async move {
16//!     node.run().await.expect("Raft node execution failed");
17//! });
18//! ```
19
20mod builder;
21pub use builder::*;
22
23mod leader_notifier;
24pub(crate) use leader_notifier::*;
25
26#[doc(hidden)]
27mod type_config;
28use tracing::info;
29#[doc(hidden)]
30pub use type_config::*;
31
32/// Test Modules
33#[cfg(test)]
34mod builder_test;
35#[cfg(test)]
36mod node_test;
37#[cfg(test)]
38mod test_helpers;
39
40use std::fmt::Debug;
41use std::sync::Arc;
42use std::sync::atomic::AtomicBool;
43use std::sync::atomic::Ordering;
44
45use d_engine_core::Membership;
46use d_engine_core::Raft;
47use d_engine_core::RaftEvent;
48use d_engine_core::RaftNodeConfig;
49use d_engine_core::Result;
50use d_engine_core::TypeConfig;
51use d_engine_core::alias::MOF;
52#[cfg(feature = "watch")]
53use d_engine_core::watch::WatchRegistry;
54use tokio::sync::Mutex;
55use tokio::sync::mpsc;
56use tokio::sync::watch;
57
58/// Raft consensus node
59///
60/// Represents a single node participating in a Raft cluster.
61/// Coordinates protocol execution, storage, and networking.
62///
63/// Created via [`NodeBuilder`].
64///
65/// # Running the Node
66///
67/// ```rust,ignore
68/// let node = builder.start()?;
69/// node.run().await?;  // Blocks until shutdown
70/// ```
71pub struct Node<T>
72where
73    T: TypeConfig,
74{
75    pub(crate) node_id: u32,
76    pub(crate) raft_core: Arc<Mutex<Raft<T>>>,
77
78    // Cluster Membership
79    pub(crate) membership: Arc<MOF<T>>,
80
81    // Network & Storage events, (copied from Raft)
82    // TODO: find a better solution
83    pub(crate) event_tx: mpsc::Sender<RaftEvent>,
84
85    // Client commands (drain-driven)
86    pub(crate) cmd_tx: mpsc::UnboundedSender<d_engine_core::ClientCmd>,
87
88    pub(crate) ready: AtomicBool,
89
90    /// Notifies when RPC server is ready to accept requests
91    pub(crate) rpc_ready_tx: watch::Sender<bool>,
92
93    /// Notifies when leader is elected (includes leader changes)
94    pub(crate) leader_notifier: LeaderNotifier,
95
96    /// Raft node config
97    pub node_config: Arc<RaftNodeConfig>,
98
99    /// Optional watch registry for watcher registration
100    /// When None, watch functionality is disabled
101    #[cfg(feature = "watch")]
102    pub(crate) watch_registry: Option<Arc<WatchRegistry>>,
103
104    /// Watch dispatcher task handle (keeps dispatcher alive)
105    #[cfg(feature = "watch")]
106    pub(crate) _watch_dispatcher_handle: Option<tokio::task::JoinHandle<()>>,
107
108    /// State machine worker task handle (background apply operations)
109    pub(crate) _sm_worker_handle: Option<tokio::task::JoinHandle<()>>,
110
111    /// Commit handler task handle (background log application)
112    pub(crate) _commit_handler_handle: Option<tokio::task::JoinHandle<()>>,
113
114    /// Lease cleanup task handle (background TTL cleanup)
115    pub(crate) _lease_cleanup_handle: Option<tokio::task::JoinHandle<()>>,
116
117    /// Shutdown signal for graceful termination
118    pub(crate) shutdown_signal: watch::Receiver<()>,
119}
120
121impl<T> Debug for Node<T>
122where
123    T: TypeConfig,
124{
125    fn fmt(
126        &self,
127        f: &mut std::fmt::Formatter<'_>,
128    ) -> std::fmt::Result {
129        f.debug_struct("Node").field("node_id", &self.node_id).finish()
130    }
131}
132impl<T> Node<T>
133where
134    T: TypeConfig,
135{
136    /// Starts and runs the Raft node's main execution loop.
137    ///
138    /// # Workflow
139    /// Strategy-based bootstrap depending on node type:
140    /// - **Learner**: Skip cluster ready check, join cluster after warmup
141    /// - **Voter**: Wait for cluster ready, then warmup connections
142    ///
143    /// Both paths converge to the Raft event processing loop.
144    ///
145    /// # Errors
146    /// Returns `Err` if any bootstrap step or Raft execution fails.
147    ///
148    /// # Example
149    /// ```ignore
150    /// let node = Node::new(...);
151    /// tokio::spawn(async move {
152    ///     node.run().await.expect("Node execution failed");
153    /// });
154    /// ```
155    pub async fn run(&self) -> Result<()> {
156        let mut shutdown_signal = self.shutdown_signal.clone();
157        shutdown_signal.borrow_and_update();
158
159        // Strategy pattern: bootstrap based on node type
160        if self.node_config.is_learner() {
161            self.run_as_learner(&mut shutdown_signal).await?;
162        } else {
163            self.run_as_voter(&mut shutdown_signal).await?;
164        }
165
166        // Start Raft main loop
167        self.start_raft_loop().await
168    }
169
170    /// Learner bootstrap: skip cluster ready check, join after warmup.
171    async fn run_as_learner(
172        &self,
173        shutdown: &mut watch::Receiver<()>,
174    ) -> Result<()> {
175        info!("Learner node bootstrap initiated");
176
177        // Set RPC ready immediately (no cluster wait needed)
178        self.set_rpc_ready(true);
179
180        // Warm up connections
181        self.warmup_with_shutdown(shutdown).await?;
182
183        // Join cluster as learner
184        let raft = self.raft_core.lock().await;
185        info!(%self.node_config.cluster.node_id, "Learner joining cluster");
186        raft.join_cluster().await?;
187        drop(raft); // Release lock before entering main loop
188
189        Ok(())
190    }
191
192    /// Voter bootstrap: wait for cluster ready, then warmup.
193    async fn run_as_voter(
194        &self,
195        shutdown: &mut watch::Receiver<()>,
196    ) -> Result<()> {
197        info!("Voter node bootstrap initiated");
198
199        // Wait for cluster ready
200        tokio::select! {
201            result = self.membership.check_cluster_is_ready() => result?,
202            _ = shutdown.changed() => {
203                info!("Shutdown during cluster ready check");
204                return Ok(());
205            }
206        }
207
208        // Set RPC ready after cluster is healthy
209        self.set_rpc_ready(true);
210
211        // Warm up connections
212        self.warmup_with_shutdown(shutdown).await
213    }
214
215    /// Warm up peer connections with shutdown handling.
216    async fn warmup_with_shutdown(
217        &self,
218        shutdown: &mut watch::Receiver<()>,
219    ) -> Result<()> {
220        tokio::select! {
221            result = self.membership.pre_warm_connections() => result?,
222            _ = shutdown.changed() => {
223                info!("Shutdown during connection warmup");
224                return Ok(());
225            }
226        }
227        Ok(())
228    }
229
230    /// Start Raft main loop.
231    async fn start_raft_loop(&self) -> Result<()> {
232        let mut raft = self.raft_core.lock().await;
233        raft.run().await
234    }
235
236    /// Marks the node's RPC server as ready to accept requests.
237    ///
238    /// # Parameters
239    /// - `is_ready`: When `true`, marks RPC server as ready. When `false`, marks server as
240    ///   temporarily unavailable.
241    ///
242    /// # Note
243    /// This indicates the RPC server is listening, NOT that leader election is complete.
244    /// Use `leader_change_notifier()` to wait for leader election.
245    ///
246    /// # Usage
247    /// Called internally after RPC server starts and cluster health check passes.
248    pub fn set_rpc_ready(
249        &self,
250        is_ready: bool,
251    ) {
252        info!("Set node RPC server ready: {}", is_ready);
253        self.ready.store(is_ready, Ordering::SeqCst);
254        // Notify waiters that RPC server is ready
255        let _ = self.rpc_ready_tx.send(is_ready);
256    }
257
258    /// Checks if the node's RPC server is ready to accept requests.
259    ///
260    /// # Returns
261    /// `true` if the RPC server is operational and listening,
262    /// `false` otherwise.
263    ///
264    /// # Note
265    /// This does NOT indicate leader election status. Use `leader_change_notifier()` for that.
266    pub fn is_rpc_ready(&self) -> bool {
267        self.ready.load(Ordering::Acquire)
268    }
269
270    /// Returns a receiver for node readiness notifications.
271    ///
272    /// Subscribe to this channel to be notified when the node becomes ready
273    /// to participate in cluster operations (NOT the same as leader election).
274    ///
275    /// # Example
276    /// ```ignore
277    /// let ready_rx = node.ready_notifier();
278    /// ready_rx.wait_for(|&ready| ready).await?;
279    /// // RPC server is now listening
280    /// ```
281    pub fn ready_notifier(&self) -> watch::Receiver<bool> {
282        self.rpc_ready_tx.subscribe()
283    }
284
285    /// Returns a receiver for leader change notifications.
286    ///
287    /// Subscribe to be notified when:
288    /// - First leader is elected (initial election)
289    /// - Leader changes (re-election)
290    /// - No leader exists (during election)
291    ///
292    /// # Performance
293    /// Event-driven notification, <1ms latency
294    ///
295    /// # Example
296    /// ```ignore
297    /// let mut leader_rx = node.leader_change_notifier();
298    /// while leader_rx.changed().await.is_ok() {
299    ///     if let Some(info) = leader_rx.borrow().as_ref() {
300    ///         println!("Leader: {} (term {})", info.leader_id, info.term);
301    ///     }
302    /// }
303    /// ```
304    pub fn leader_change_notifier(&self) -> watch::Receiver<Option<crate::LeaderInfo>> {
305        self.leader_notifier.subscribe()
306    }
307
308    /// Create a Node from a pre-built Raft instance
309    /// This method is designed to support testing and external builders
310    pub fn from_raft(
311        raft: Raft<T>,
312        shutdown_signal: watch::Receiver<()>,
313    ) -> Self {
314        let event_tx = raft.event_sender();
315        let node_config = raft.ctx.node_config();
316        let membership = raft.ctx.membership();
317        let node_id = raft.node_id;
318
319        let (rpc_ready_tx, _rpc_ready_rx) = watch::channel(false);
320        let leader_notifier = LeaderNotifier::new();
321
322        // Create dummy cmd_tx (this path is mainly for testing)
323        let (cmd_tx, _cmd_rx) = mpsc::unbounded_channel();
324
325        Node {
326            node_id,
327            raft_core: Arc::new(Mutex::new(raft)),
328            membership,
329            event_tx,
330            cmd_tx,
331            ready: AtomicBool::new(false),
332            rpc_ready_tx,
333            leader_notifier,
334            node_config,
335            #[cfg(feature = "watch")]
336            watch_registry: None,
337            #[cfg(feature = "watch")]
338            _watch_dispatcher_handle: None,
339            _sm_worker_handle: None,
340            _commit_handler_handle: None,
341            _lease_cleanup_handle: None,
342            shutdown_signal,
343        }
344    }
345
346    /// Returns this node's unique identifier.
347    ///
348    /// Useful for logging, metrics, and integrations that need to identify
349    /// which Raft node is handling operations.
350    pub fn node_id(&self) -> u32 {
351        self.node_id
352    }
353}