d_engine/node/
mod.rs

1//! Raft node container and lifecycle management.
2//!
3//! The [`Node`] struct acts as a host for a Raft consensus participant,
4//! coordinating between the core protocol implementation ([`crate::raft::Raft`])
5//! and external subsystems:
6//!
7//! ## Key Responsibilities
8//! - Manages the Raft finite state machine lifecycle
9//! - Maintains node readiness state for cluster coordination
10//! - Executes the main event processing loop inside Raft
11//!
12//! ## Example Usage
13//! ```ignore
14//! let node = NodeBuilder::new(node_config).build().ready().unwrap();
15//! tokio::spawn(async move {
16//!     node.run().await.expect("Raft node execution failed");
17//! });
18//! ```
19
20mod builder;
21pub use builder::*;
22
23#[doc(hidden)]
24mod type_config;
25use tracing::debug;
26use tracing::info;
27#[doc(hidden)]
28pub use type_config::*;
29
30/// Test Modules
31#[cfg(test)]
32mod builder_test;
33#[cfg(test)]
34mod node_test;
35
36use std::fmt::Debug;
37use std::sync::atomic::AtomicBool;
38use std::sync::atomic::Ordering;
39use std::sync::Arc;
40
41use tokio::sync::mpsc;
42use tokio::sync::Mutex;
43
44use crate::alias::MOF;
45use crate::Membership;
46use crate::Raft;
47use crate::RaftEvent;
48use crate::RaftNodeConfig;
49use crate::Result;
50use crate::TypeConfig;
51
52/// Raft node container
53pub struct Node<T>
54where
55    T: TypeConfig,
56{
57    pub(crate) node_id: u32,
58    pub(crate) raft_core: Arc<Mutex<Raft<T>>>,
59
60    // Cluster Membership
61    pub(crate) membership: Arc<MOF<T>>,
62
63    // Network & Storage events, (copied from Raft)
64    // TODO: find a better solution
65    pub(crate) event_tx: mpsc::Sender<RaftEvent>,
66    pub(crate) ready: AtomicBool,
67
68    /// Raft node config
69    pub node_config: Arc<RaftNodeConfig>,
70}
71
72impl<T> Debug for Node<T>
73where
74    T: TypeConfig,
75{
76    fn fmt(
77        &self,
78        f: &mut std::fmt::Formatter<'_>,
79    ) -> std::fmt::Result {
80        f.debug_struct("Node").field("node_id", &self.node_id).finish()
81    }
82}
83impl<T> Node<T>
84where
85    T: TypeConfig,
86{
87    /// Starts and runs the Raft node's main execution loop.
88    ///
89    /// # Workflow
90    /// 1. Establishes network connections with cluster peers
91    /// 2. Performs cluster health check
92    /// 3. Marks node as ready for operation
93    /// 4. Joins the Raft cluster
94    /// 5. Executes the core Raft event processing loop
95    ///
96    /// # Errors
97    /// Returns `Err` if any of these operations fail:
98    /// - Peer connection establishment
99    /// - Cluster health check
100    /// - Raft core initialization
101    /// - Event processing failures
102    ///
103    /// # Example
104    /// ```ignore
105    /// let node = Node::new(...);
106    /// tokio::spawn(async move {
107    ///     node.run().await.expect("Node execution failed");
108    /// });
109    /// ```
110    pub async fn run(&self) -> Result<()> {
111        // 1. Connect with other peers
112        // let peer_channels = Self::connect_with_peers(self.node_id,
113        // self.node_config.clone()).await?;
114
115        // 2. Healthcheck if all server is start serving
116        self.membership.check_cluster_is_ready().await?;
117
118        // 3. Set node is ready to run Raft protocol
119        self.set_ready(true);
120
121        // 4. Warm up connections with peers
122        self.membership.pre_warm_connections().await?;
123
124        let mut raft = self.raft_core.lock().await;
125        // 5. if join as a new node
126        debug!(%self.node_config.cluster.node_id);
127        if self.node_config.is_joining() {
128            info!(%self.node_config.cluster.node_id, "Node is joining...");
129            raft.join_cluster().await?;
130        }
131
132        // 6. Run the main event processing loop
133        raft.run().await?;
134
135        Ok(())
136    }
137
138    /// Controls the node's operational readiness state.
139    ///
140    /// # Parameters
141    /// - `is_ready`: When `true`, marks node as ready to participate in cluster. When `false`,
142    ///   marks node as temporarily unavailable.
143    ///
144    /// # Usage
145    /// Typically used during cluster bootstrap or maintenance operations.
146    /// The readiness state is atomically updated using SeqCst ordering.
147    pub fn set_ready(
148        &self,
149        is_ready: bool,
150    ) {
151        info!("Set node is ready to run Raft protocol");
152        self.ready.store(is_ready, Ordering::SeqCst);
153    }
154
155    /// Checks if the node is in a ready state to participate in cluster operations.
156    ///
157    /// # Returns
158    /// `true` if the node is operational and ready to handle Raft protocol operations,
159    /// `false` otherwise.
160    pub fn server_is_ready(&self) -> bool {
161        self.ready.load(Ordering::Acquire)
162    }
163}