d_engine/node/node.rs
1//! Raft node container and lifecycle management.
2//!
3//! The [`Node`] struct acts as a host for a Raft consensus participant,
4//! coordinating between the core protocol implementation ([`crate::core::raft::Raft`])
5//! and external subsystems:
6//!
7//! ## Key Responsibilities
8//! - Manages the Raft finite state machine lifecycle
9//! - Coordinates peer networking through [`PeerChannels`]
10//! - Maintains node readiness state for cluster coordination
11//! - Executes the main event processing loop inside Raft
12//!
13//! ## Example Usage
14//! ```ignore
15//! let node = NodeBuilder::new(settings).build().ready().unwrap();
16//! tokio::spawn(async move {
17//! node.run().await.expect("Raft node execution failed");
18//! });
19//! ```
20
21use std::fmt::Debug;
22use std::sync::atomic::AtomicBool;
23use std::sync::atomic::Ordering;
24use std::sync::Arc;
25
26use tokio::sync::mpsc;
27use tokio::sync::Mutex;
28
29use crate::alias::POF;
30use crate::membership::PeerChannelsFactory;
31use crate::PeerChannels;
32use crate::Raft;
33use crate::RaftEvent;
34use crate::RaftNodeConfig;
35use crate::Result;
36use crate::TypeConfig;
37
38/// Raft node container
39pub struct Node<T>
40where T: TypeConfig
41{
42 pub(crate) node_id: u32,
43 pub(crate) raft_core: Arc<Mutex<Raft<T>>>,
44
45 // Network & Storage events, (copied from Raft)
46 // TODO: find a better solution
47 pub(crate) event_tx: mpsc::Sender<RaftEvent>,
48 pub(crate) ready: AtomicBool,
49
50 /// Raft node config
51 pub settings: Arc<RaftNodeConfig>,
52}
53
54impl<T> Debug for Node<T>
55where T: TypeConfig
56{
57 fn fmt(
58 &self,
59 f: &mut std::fmt::Formatter<'_>,
60 ) -> std::fmt::Result {
61 f.debug_struct("Node").field("node_id", &self.node_id).finish()
62 }
63}
64impl<T> Node<T>
65where T: TypeConfig
66{
67 async fn connect_with_peers(
68 node_id: u32,
69 settings: Arc<RaftNodeConfig>,
70 ) -> Result<POF<T>> {
71 let mut peer_channels = T::P::create(node_id, settings.clone());
72 peer_channels.connect_with_peers(node_id).await?;
73
74 Ok(peer_channels)
75 }
76
77 /// Starts and runs the Raft node's main execution loop.
78 ///
79 /// # Workflow
80 /// 1. Establishes network connections with cluster peers
81 /// 2. Performs cluster health check
82 /// 3. Marks node as ready for operation
83 /// 4. Joins the Raft cluster
84 /// 5. Executes the core Raft event processing loop
85 ///
86 /// # Errors
87 /// Returns `Err` if any of these operations fail:
88 /// - Peer connection establishment
89 /// - Cluster health check
90 /// - Raft core initialization
91 /// - Event processing failures
92 ///
93 /// # Example
94 /// ```ignore
95 /// let node = Node::new(...);
96 /// tokio::spawn(async move {
97 /// node.run().await.expect("Node execution failed");
98 /// });
99 /// ```
100 pub async fn run(&self) -> Result<()> {
101 // 1. Connect with other peers
102 let peer_channels = Self::connect_with_peers(self.node_id, self.settings.clone()).await?;
103
104 // 2. Healthcheck if all server is start serving
105 peer_channels.check_cluster_is_ready().await?;
106
107 // 3. Set node is ready to run Raft protocol
108 self.set_ready(true);
109
110 let mut raft = self.raft_core.lock().await;
111
112 // 4. Join the node with cluster
113 raft.join_cluster(Arc::new(peer_channels))?;
114
115 // 5. Run the main event processing loop
116 raft.run().await?;
117
118 Ok(())
119 }
120
121 /// Controls the node's operational readiness state.
122 ///
123 /// # Parameters
124 /// - `is_ready`: When `true`, marks node as ready to participate in cluster. When `false`,
125 /// marks node as temporarily unavailable.
126 ///
127 /// # Usage
128 /// Typically used during cluster bootstrap or maintenance operations.
129 /// The readiness state is atomically updated using SeqCst ordering.
130 pub fn set_ready(
131 &self,
132 is_ready: bool,
133 ) {
134 self.ready.store(is_ready, Ordering::SeqCst);
135 }
136
137 /// Checks if the node is in a ready state to participate in cluster operations.
138 ///
139 /// # Returns
140 /// `true` if the node is operational and ready to handle Raft protocol operations,
141 /// `false` otherwise.
142 pub fn server_is_ready(&self) -> bool {
143 self.ready.load(Ordering::Acquire)
144 }
145}