d_engine/node/
node.rs

1//! Raft node container and lifecycle management.
2//!
3//! The [`Node`] struct acts as a host for a Raft consensus participant,
4//! coordinating between the core protocol implementation ([`crate::core::raft::Raft`])
5//! and external subsystems:
6//!
7//! ## Key Responsibilities
8//! - Manages the Raft finite state machine lifecycle
9//! - Coordinates peer networking through [`PeerChannels`]
10//! - Maintains node readiness state for cluster coordination
11//! - Executes the main event processing loop inside Raft
12//!
13//! ## Example Usage
14//! ```ignore
15//! let node = NodeBuilder::new(settings).build().ready().unwrap();
16//! tokio::spawn(async move {
17//!     node.run().await.expect("Raft node execution failed");
18//! });
19//! ```
20
21use std::fmt::Debug;
22use std::sync::atomic::AtomicBool;
23use std::sync::atomic::Ordering;
24use std::sync::Arc;
25
26use tokio::sync::mpsc;
27use tokio::sync::Mutex;
28
29use crate::alias::POF;
30use crate::membership::PeerChannelsFactory;
31use crate::PeerChannels;
32use crate::Raft;
33use crate::RaftEvent;
34use crate::RaftNodeConfig;
35use crate::Result;
36use crate::TypeConfig;
37
38/// Raft node container
39pub struct Node<T>
40where T: TypeConfig
41{
42    pub(crate) node_id: u32,
43    pub(crate) raft_core: Arc<Mutex<Raft<T>>>,
44
45    // Network & Storage events, (copied from Raft)
46    // TODO: find a better solution
47    pub(crate) event_tx: mpsc::Sender<RaftEvent>,
48    pub(crate) ready: AtomicBool,
49
50    /// Raft node config
51    pub settings: Arc<RaftNodeConfig>,
52}
53
54impl<T> Debug for Node<T>
55where T: TypeConfig
56{
57    fn fmt(
58        &self,
59        f: &mut std::fmt::Formatter<'_>,
60    ) -> std::fmt::Result {
61        f.debug_struct("Node").field("node_id", &self.node_id).finish()
62    }
63}
64impl<T> Node<T>
65where T: TypeConfig
66{
67    async fn connect_with_peers(
68        node_id: u32,
69        settings: Arc<RaftNodeConfig>,
70    ) -> Result<POF<T>> {
71        let mut peer_channels = T::P::create(node_id, settings.clone());
72        peer_channels.connect_with_peers(node_id).await?;
73
74        Ok(peer_channels)
75    }
76
77    /// Starts and runs the Raft node's main execution loop.
78    ///
79    /// # Workflow
80    /// 1. Establishes network connections with cluster peers
81    /// 2. Performs cluster health check
82    /// 3. Marks node as ready for operation
83    /// 4. Joins the Raft cluster
84    /// 5. Executes the core Raft event processing loop
85    ///
86    /// # Errors
87    /// Returns `Err` if any of these operations fail:
88    /// - Peer connection establishment
89    /// - Cluster health check
90    /// - Raft core initialization
91    /// - Event processing failures
92    ///
93    /// # Example
94    /// ```ignore
95    /// let node = Node::new(...);
96    /// tokio::spawn(async move {
97    ///     node.run().await.expect("Node execution failed");
98    /// });
99    /// ```
100    pub async fn run(&self) -> Result<()> {
101        // 1. Connect with other peers
102        let peer_channels = Self::connect_with_peers(self.node_id, self.settings.clone()).await?;
103
104        // 2. Healthcheck if all server is start serving
105        peer_channels.check_cluster_is_ready().await?;
106
107        // 3. Set node is ready to run Raft protocol
108        self.set_ready(true);
109
110        let mut raft = self.raft_core.lock().await;
111
112        // 4. Join the node with cluster
113        raft.join_cluster(Arc::new(peer_channels))?;
114
115        // 5. Run the main event processing loop
116        raft.run().await?;
117
118        Ok(())
119    }
120
121    /// Controls the node's operational readiness state.
122    ///
123    /// # Parameters
124    /// - `is_ready`: When `true`, marks node as ready to participate in cluster. When `false`,
125    ///   marks node as temporarily unavailable.
126    ///
127    /// # Usage
128    /// Typically used during cluster bootstrap or maintenance operations.
129    /// The readiness state is atomically updated using SeqCst ordering.
130    pub fn set_ready(
131        &self,
132        is_ready: bool,
133    ) {
134        self.ready.store(is_ready, Ordering::SeqCst);
135    }
136
137    /// Checks if the node is in a ready state to participate in cluster operations.
138    ///
139    /// # Returns
140    /// `true` if the node is operational and ready to handle Raft protocol operations,
141    /// `false` otherwise.
142    pub fn server_is_ready(&self) -> bool {
143        self.ready.load(Ordering::Acquire)
144    }
145}