d_engine_server/node/
mod.rs

1//! Raft node container and lifecycle management.
2//!
3//! The [`Node`] struct acts as a host for a Raft consensus participant,
4//! coordinating between the core protocol implementation (provided by `d-engine-core`)
5//! and external subsystems:
6//!
7//! ## Key Responsibilities
8//! - Manages the Raft finite state machine lifecycle
9//! - Maintains node readiness state for cluster coordination
10//! - Executes the main event processing loop inside Raft
11//!
12//! ## Example Usage
13//! ```ignore
14//! let node = NodeBuilder::new(node_config).start().await?;
15//! tokio::spawn(async move {
16//!     node.run().await.expect("Raft node execution failed");
17//! });
18//! ```
19
20mod builder;
21pub use builder::*;
22
23mod client;
24pub use client::*;
25
26mod leader_notifier;
27pub(crate) use leader_notifier::*;
28
29#[doc(hidden)]
30mod type_config;
31use tracing::info;
32#[doc(hidden)]
33pub use type_config::*;
34
35/// Test Modules
36#[cfg(test)]
37mod builder_test;
38#[cfg(test)]
39mod node_test;
40#[cfg(test)]
41mod test_helpers;
42
43use std::fmt::Debug;
44use std::sync::Arc;
45use std::sync::atomic::AtomicBool;
46use std::sync::atomic::Ordering;
47use std::time::Duration;
48
49use d_engine_core::Membership;
50use d_engine_core::Raft;
51use d_engine_core::RaftEvent;
52use d_engine_core::RaftNodeConfig;
53use d_engine_core::Result;
54use d_engine_core::TypeConfig;
55use d_engine_core::alias::MOF;
56#[cfg(feature = "watch")]
57use d_engine_core::watch::WatchRegistry;
58use tokio::sync::Mutex;
59use tokio::sync::mpsc;
60use tokio::sync::watch;
61
62/// Raft consensus node
63///
64/// Represents a single node participating in a Raft cluster.
65/// Coordinates protocol execution, storage, and networking.
66///
67/// Created via [`NodeBuilder`].
68///
69/// # Running the Node
70///
71/// ```rust,ignore
72/// let node = builder.start()?;
73/// node.run().await?;  // Blocks until shutdown
74/// ```
75pub struct Node<T>
76where
77    T: TypeConfig,
78{
79    pub(crate) node_id: u32,
80    pub(crate) raft_core: Arc<Mutex<Raft<T>>>,
81
82    // Cluster Membership
83    pub(crate) membership: Arc<MOF<T>>,
84
85    // Network & Storage events, (copied from Raft)
86    // TODO: find a better solution
87    pub(crate) event_tx: mpsc::Sender<RaftEvent>,
88    pub(crate) ready: AtomicBool,
89
90    /// Notifies when RPC server is ready to accept requests
91    pub(crate) rpc_ready_tx: watch::Sender<bool>,
92
93    /// Notifies when leader is elected (includes leader changes)
94    pub(crate) leader_notifier: LeaderNotifier,
95
96    /// Raft node config
97    pub node_config: Arc<RaftNodeConfig>,
98
99    /// Optional watch registry for watcher registration
100    /// When None, watch functionality is disabled
101    #[cfg(feature = "watch")]
102    pub(crate) watch_registry: Option<Arc<WatchRegistry>>,
103
104    /// Watch dispatcher task handle (keeps dispatcher alive)
105    #[cfg(feature = "watch")]
106    pub(crate) _watch_dispatcher_handle: Option<tokio::task::JoinHandle<()>>,
107
108    /// Commit handler task handle (background log application)
109    pub(crate) _commit_handler_handle: Option<tokio::task::JoinHandle<()>>,
110
111    /// Lease cleanup task handle (background TTL cleanup)
112    pub(crate) _lease_cleanup_handle: Option<tokio::task::JoinHandle<()>>,
113
114    /// Shutdown signal for graceful termination
115    pub(crate) shutdown_signal: watch::Receiver<()>,
116}
117
118impl<T> Debug for Node<T>
119where
120    T: TypeConfig,
121{
122    fn fmt(
123        &self,
124        f: &mut std::fmt::Formatter<'_>,
125    ) -> std::fmt::Result {
126        f.debug_struct("Node").field("node_id", &self.node_id).finish()
127    }
128}
129impl<T> Node<T>
130where
131    T: TypeConfig,
132{
133    /// Starts and runs the Raft node's main execution loop.
134    ///
135    /// # Workflow
136    /// Strategy-based bootstrap depending on node type:
137    /// - **Learner**: Skip cluster ready check, join cluster after warmup
138    /// - **Voter**: Wait for cluster ready, then warmup connections
139    ///
140    /// Both paths converge to the Raft event processing loop.
141    ///
142    /// # Errors
143    /// Returns `Err` if any bootstrap step or Raft execution fails.
144    ///
145    /// # Example
146    /// ```ignore
147    /// let node = Node::new(...);
148    /// tokio::spawn(async move {
149    ///     node.run().await.expect("Node execution failed");
150    /// });
151    /// ```
152    pub async fn run(&self) -> Result<()> {
153        let mut shutdown_signal = self.shutdown_signal.clone();
154        shutdown_signal.borrow_and_update();
155
156        // Strategy pattern: bootstrap based on node type
157        if self.node_config.is_learner() {
158            self.run_as_learner(&mut shutdown_signal).await?;
159        } else {
160            self.run_as_voter(&mut shutdown_signal).await?;
161        }
162
163        // Start Raft main loop
164        self.start_raft_loop().await
165    }
166
167    /// Learner bootstrap: skip cluster ready check, join after warmup.
168    async fn run_as_learner(
169        &self,
170        shutdown: &mut watch::Receiver<()>,
171    ) -> Result<()> {
172        info!("Learner node bootstrap initiated");
173
174        // Set RPC ready immediately (no cluster wait needed)
175        self.set_rpc_ready(true);
176
177        // Warm up connections
178        self.warmup_with_shutdown(shutdown).await?;
179
180        // Join cluster as learner
181        let raft = self.raft_core.lock().await;
182        info!(%self.node_config.cluster.node_id, "Learner joining cluster");
183        raft.join_cluster().await?;
184        drop(raft); // Release lock before entering main loop
185
186        Ok(())
187    }
188
189    /// Voter bootstrap: wait for cluster ready, then warmup.
190    async fn run_as_voter(
191        &self,
192        shutdown: &mut watch::Receiver<()>,
193    ) -> Result<()> {
194        info!("Voter node bootstrap initiated");
195
196        // Wait for cluster ready
197        tokio::select! {
198            result = self.membership.check_cluster_is_ready() => result?,
199            _ = shutdown.changed() => {
200                info!("Shutdown during cluster ready check");
201                return Ok(());
202            }
203        }
204
205        // Set RPC ready after cluster is healthy
206        self.set_rpc_ready(true);
207
208        // Warm up connections
209        self.warmup_with_shutdown(shutdown).await
210    }
211
212    /// Warm up peer connections with shutdown handling.
213    async fn warmup_with_shutdown(
214        &self,
215        shutdown: &mut watch::Receiver<()>,
216    ) -> Result<()> {
217        tokio::select! {
218            result = self.membership.pre_warm_connections() => result?,
219            _ = shutdown.changed() => {
220                info!("Shutdown during connection warmup");
221                return Ok(());
222            }
223        }
224        Ok(())
225    }
226
227    /// Start Raft main loop.
228    async fn start_raft_loop(&self) -> Result<()> {
229        let mut raft = self.raft_core.lock().await;
230        raft.run().await
231    }
232
233    /// Marks the node's RPC server as ready to accept requests.
234    ///
235    /// # Parameters
236    /// - `is_ready`: When `true`, marks RPC server as ready. When `false`, marks server as
237    ///   temporarily unavailable.
238    ///
239    /// # Note
240    /// This indicates the RPC server is listening, NOT that leader election is complete.
241    /// Use `leader_change_notifier()` to wait for leader election.
242    ///
243    /// # Usage
244    /// Called internally after RPC server starts and cluster health check passes.
245    pub fn set_rpc_ready(
246        &self,
247        is_ready: bool,
248    ) {
249        info!("Set node RPC server ready: {}", is_ready);
250        self.ready.store(is_ready, Ordering::SeqCst);
251        // Notify waiters that RPC server is ready
252        let _ = self.rpc_ready_tx.send(is_ready);
253    }
254
255    /// Checks if the node's RPC server is ready to accept requests.
256    ///
257    /// # Returns
258    /// `true` if the RPC server is operational and listening,
259    /// `false` otherwise.
260    ///
261    /// # Note
262    /// This does NOT indicate leader election status. Use `leader_change_notifier()` for that.
263    pub fn is_rpc_ready(&self) -> bool {
264        self.ready.load(Ordering::Acquire)
265    }
266
267    /// Returns a receiver for node readiness notifications.
268    ///
269    /// Subscribe to this channel to be notified when the node becomes ready
270    /// to participate in cluster operations (NOT the same as leader election).
271    ///
272    /// # Example
273    /// ```ignore
274    /// let ready_rx = node.ready_notifier();
275    /// ready_rx.wait_for(|&ready| ready).await?;
276    /// // RPC server is now listening
277    /// ```
278    pub fn ready_notifier(&self) -> watch::Receiver<bool> {
279        self.rpc_ready_tx.subscribe()
280    }
281
282    /// Returns a receiver for leader change notifications.
283    ///
284    /// Subscribe to be notified when:
285    /// - First leader is elected (initial election)
286    /// - Leader changes (re-election)
287    /// - No leader exists (during election)
288    ///
289    /// # Performance
290    /// Event-driven notification, <1ms latency
291    ///
292    /// # Example
293    /// ```ignore
294    /// let mut leader_rx = node.leader_change_notifier();
295    /// while leader_rx.changed().await.is_ok() {
296    ///     if let Some(info) = leader_rx.borrow().as_ref() {
297    ///         println!("Leader: {} (term {})", info.leader_id, info.term);
298    ///     }
299    /// }
300    /// ```
301    pub fn leader_change_notifier(&self) -> watch::Receiver<Option<crate::LeaderInfo>> {
302        self.leader_notifier.subscribe()
303    }
304
305    /// Create a Node from a pre-built Raft instance
306    /// This method is designed to support testing and external builders
307    pub fn from_raft(
308        raft: Raft<T>,
309        shutdown_signal: watch::Receiver<()>,
310    ) -> Self {
311        let event_tx = raft.event_sender();
312        let node_config = raft.ctx.node_config();
313        let membership = raft.ctx.membership();
314        let node_id = raft.node_id;
315
316        let (rpc_ready_tx, _rpc_ready_rx) = watch::channel(false);
317        let leader_notifier = LeaderNotifier::new();
318
319        Node {
320            node_id,
321            raft_core: Arc::new(Mutex::new(raft)),
322            membership,
323            event_tx,
324            ready: AtomicBool::new(false),
325            rpc_ready_tx,
326            leader_notifier,
327            node_config,
328            #[cfg(feature = "watch")]
329            watch_registry: None,
330            #[cfg(feature = "watch")]
331            _watch_dispatcher_handle: None,
332            _commit_handler_handle: None,
333            _lease_cleanup_handle: None,
334            shutdown_signal,
335        }
336    }
337
338    /// Returns this node's unique identifier.
339    ///
340    /// Useful for logging, metrics, and integrations that need to identify
341    /// which Raft node is handling operations.
342    pub fn node_id(&self) -> u32 {
343        self.node_id
344    }
345
346    /// Creates a zero-overhead local KV client for embedded access.
347    ///
348    /// Returns a client that directly communicates with Raft core
349    /// without gRPC serialization or network traversal.
350    ///
351    /// # Performance
352    /// - 10-20x faster than gRPC client
353    /// - <0.1ms latency per operation
354    ///
355    /// # Example
356    /// ```ignore
357    /// let node = NodeBuilder::new(config).start().await?;
358    /// let client = node.local_client();
359    /// client.put(b"key", b"value").await?;
360    /// ```
361    pub fn local_client(&self) -> LocalKvClient {
362        LocalKvClient::new_internal(
363            self.event_tx.clone(),
364            self.node_id,
365            Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms),
366        )
367    }
368}