d_engine_core/
membership.rs

1use d_engine_proto::common::MembershipChange;
2use d_engine_proto::common::NodeStatus;
3use d_engine_proto::server::cluster::ClusterConfChangeRequest;
4use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
5use d_engine_proto::server::cluster::ClusterMembership;
6use d_engine_proto::server::cluster::NodeMeta;
7#[cfg(any(test, feature = "__test_support"))]
8use mockall::automock;
9use tonic::async_trait;
10use tonic::transport::Channel;
11use tracing::warn;
12
13use crate::MembershipError;
14use crate::Result;
15use crate::TypeConfig;
16
17// Add connection type management in RpcPeerChannels
18#[derive(Clone, Debug, Eq, Hash, PartialEq)]
19pub enum ConnectionType {
20    Control, // Used for key operations such as elections/heartbeats
21    Data,    // Used for log replication
22    Bulk,    // Used for high-traffic operations such as snapshot transmission
23}
24impl ConnectionType {
25    pub fn all() -> Vec<ConnectionType> {
26        vec![
27            ConnectionType::Control,
28            ConnectionType::Data,
29            ConnectionType::Bulk,
30        ]
31    }
32}
33
34#[cfg_attr(any(test, feature = "__test_support"), automock)]
35#[async_trait]
36pub trait Membership<T>: Sync + Send + 'static
37where
38    T: TypeConfig,
39{
40    /// All nodes (including itself)
41    #[allow(unused)]
42    async fn members(&self) -> Vec<NodeMeta>;
43
44    /// All non-self nodes (including Syncing and Active)
45    /// Note:
46    /// Joining node has not start its Raft event processing engine yet.
47    async fn replication_peers(&self) -> Vec<NodeMeta>;
48
49    /// All non-self nodes in Active state
50    async fn voters(&self) -> Vec<NodeMeta>;
51
52    /// Get the initial cluster size from configuration
53    ///
54    /// This value is determined at node startup from the `initial_cluster` configuration
55    /// and remains constant throughout the node's lifetime. It represents the designed
56    /// cluster size, not the current runtime membership state.
57    ///
58    /// Used for:
59    /// - Quorum calculations
60    /// - Cluster topology decisions
61    ///
62    /// # Safety
63    /// This method is safe to use for cluster topology decisions as it's based on
64    /// immutable configuration rather than runtime state that could be affected by
65    /// network partitions or bugs.
66    async fn initial_cluster_size(&self) -> usize;
67
68    /// Check if this is a single-node cluster
69    ///
70    /// Returns `true` if the initial cluster size is 1, indicating this node
71    /// was configured to run in standalone mode without any peers.
72    ///
73    /// This is a convenience method equivalent to `initial_cluster_size() == 1`.
74    ///
75    /// # Use Cases
76    /// - Skip Raft election in single-node mode (no peers to vote)
77    /// - Skip log replication in single-node mode (no peers to replicate to)
78    /// - Optimize performance by avoiding unnecessary network operations
79    ///
80    /// # Safety
81    /// Safe for all cluster topology decisions as it's based on immutable configuration.
82    async fn is_single_node_cluster(&self) -> bool {
83        self.initial_cluster_size().await == 1
84    }
85
86    /// All pending active nodes in Active state
87    #[allow(unused)]
88    async fn nodes_with_status(
89        &self,
90        status: NodeStatus,
91    ) -> Vec<NodeMeta>;
92
93    async fn get_node_status(
94        &self,
95        node_id: u32,
96    ) -> Option<NodeStatus>;
97
98    async fn check_cluster_is_ready(&self) -> Result<()>;
99
100    async fn get_peers_id_with_condition<F>(
101        &self,
102        condition: F,
103    ) -> Vec<u32>
104    where
105        F: Fn(i32) -> bool + Send + Sync + 'static;
106
107    /// retrieve latest cluster membership with current leader ID
108    ///
109    /// # Parameters
110    /// - `current_leader_id`: Optional current leader ID from runtime state
111    async fn retrieve_cluster_membership_config(
112        &self,
113        current_leader_id: Option<u32>,
114    ) -> ClusterMembership;
115
116    /// invoked when receive requests from Leader
117    async fn update_cluster_conf_from_leader(
118        &self,
119        my_id: u32,
120        my_current_term: u64,
121        current_conf_version: u64,
122        current_leader_id: Option<u32>,
123        cluster_conf_change_req: &ClusterConfChangeRequest,
124    ) -> Result<ClusterConfUpdateResponse>;
125
126    async fn get_cluster_conf_version(&self) -> u64;
127
128    async fn update_conf_version(
129        &self,
130        version: u64,
131    );
132
133    #[allow(unused)]
134    async fn incr_conf_version(&self);
135
136    /// Add a new node as a learner
137    async fn add_learner(
138        &self,
139        node_id: u32,
140        address: String,
141        status: NodeStatus,
142    ) -> Result<()>;
143
144    /// Activate node
145    #[allow(unused)]
146    async fn activate_node(
147        &mut self,
148        new_node_id: u32,
149    ) -> Result<()>;
150
151    /// Update status of a node
152    async fn update_node_status(
153        &self,
154        node_id: u32,
155        status: NodeStatus,
156    ) -> Result<()>;
157
158    /// Check if the node already exists
159    async fn contains_node(
160        &self,
161        node_id: u32,
162    ) -> bool;
163
164    async fn retrieve_node_meta(
165        &self,
166        node_id: u32,
167    ) -> Option<NodeMeta>;
168
169    /// Elegantly remove nodes
170    async fn remove_node(
171        &self,
172        node_id: u32,
173    ) -> Result<()>;
174
175    /// Forcefully remove faulty nodes
176    #[allow(unused)]
177    async fn force_remove_node(
178        &self,
179        node_id: u32,
180    ) -> Result<()>;
181
182    /// Get all node status
183    #[allow(unused)]
184    async fn get_all_nodes(&self) -> Vec<NodeMeta>;
185
186    /// Pre-warms connection cache for all replication peers
187    async fn pre_warm_connections(&self) -> Result<()>;
188
189    async fn get_peer_channel(
190        &self,
191        node_id: u32,
192        conn_type: ConnectionType,
193    ) -> Option<Channel>;
194
195    async fn get_address(
196        &self,
197        node_id: u32,
198    ) -> Option<String>;
199
200    /// Apply committed configuration change
201    async fn apply_config_change(
202        &self,
203        change: MembershipChange,
204    ) -> Result<()>;
205
206    async fn notify_config_applied(
207        &self,
208        index: u64,
209    );
210
211    async fn get_zombie_candidates(&self) -> Vec<u32>;
212
213    /// If new node could rejoin the cluster again
214    async fn can_rejoin(
215        &self,
216        node_id: u32,
217        role: i32,
218    ) -> Result<()>;
219}
220
221pub fn ensure_safe_join(
222    node_id: u32,
223    current_voters: usize,
224) -> Result<()> {
225    // Total voters including leader = current_voters + 1
226    let total_voters = current_voters + 1;
227
228    // Always allow if cluster will have even number of voters
229    if (total_voters + 1).is_multiple_of(2) {
230        Ok(())
231    } else {
232        // metrics::counter!("cluster.unsafe_join_attempts", 1);
233        metrics::counter!(
234            "cluster.unsafe_join_attempts",
235            &[("node_id", node_id.to_string())]
236        )
237        .increment(1);
238
239        warn!(
240            "Unsafe join attempt: current_voters={} (total_voters={})",
241            current_voters, total_voters
242        );
243        Err(
244            MembershipError::JoinClusterError("Cluster must maintain odd number of voters".into())
245                .into(),
246        )
247    }
248}