d_engine_core/
membership.rs1use d_engine_proto::common::MembershipChange;
2use d_engine_proto::common::NodeStatus;
3use d_engine_proto::server::cluster::ClusterConfChangeRequest;
4use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
5use d_engine_proto::server::cluster::ClusterMembership;
6use d_engine_proto::server::cluster::NodeMeta;
7#[cfg(any(test, feature = "__test_support"))]
8use mockall::automock;
9use tonic::async_trait;
10use tonic::transport::Channel;
11use tracing::warn;
12
13use crate::MembershipError;
14use crate::Result;
15use crate::TypeConfig;
16
17#[derive(Clone, Debug, Eq, Hash, PartialEq)]
19pub enum ConnectionType {
20 Control, Data, Bulk, }
24impl ConnectionType {
25 pub fn all() -> Vec<ConnectionType> {
26 vec![
27 ConnectionType::Control,
28 ConnectionType::Data,
29 ConnectionType::Bulk,
30 ]
31 }
32}
33
34#[cfg_attr(any(test, feature = "__test_support"), automock)]
35#[async_trait]
36pub trait Membership<T>: Sync + Send + 'static
37where
38 T: TypeConfig,
39{
40 #[allow(unused)]
42 async fn members(&self) -> Vec<NodeMeta>;
43
44 async fn replication_peers(&self) -> Vec<NodeMeta>;
48
49 async fn voters(&self) -> Vec<NodeMeta>;
51
52 async fn initial_cluster_size(&self) -> usize;
67
68 async fn is_single_node_cluster(&self) -> bool {
83 self.initial_cluster_size().await == 1
84 }
85
86 #[allow(unused)]
88 async fn nodes_with_status(
89 &self,
90 status: NodeStatus,
91 ) -> Vec<NodeMeta>;
92
93 async fn get_node_status(
94 &self,
95 node_id: u32,
96 ) -> Option<NodeStatus>;
97
98 async fn check_cluster_is_ready(&self) -> Result<()>;
99
100 async fn get_peers_id_with_condition<F>(
101 &self,
102 condition: F,
103 ) -> Vec<u32>
104 where
105 F: Fn(i32) -> bool + Send + Sync + 'static;
106
107 async fn retrieve_cluster_membership_config(
112 &self,
113 current_leader_id: Option<u32>,
114 ) -> ClusterMembership;
115
116 async fn update_cluster_conf_from_leader(
118 &self,
119 my_id: u32,
120 my_current_term: u64,
121 current_conf_version: u64,
122 current_leader_id: Option<u32>,
123 cluster_conf_change_req: &ClusterConfChangeRequest,
124 ) -> Result<ClusterConfUpdateResponse>;
125
126 async fn get_cluster_conf_version(&self) -> u64;
127
128 async fn update_conf_version(
129 &self,
130 version: u64,
131 );
132
133 #[allow(unused)]
134 async fn incr_conf_version(&self);
135
136 async fn add_learner(
138 &self,
139 node_id: u32,
140 address: String,
141 status: NodeStatus,
142 ) -> Result<()>;
143
144 #[allow(unused)]
146 async fn activate_node(
147 &mut self,
148 new_node_id: u32,
149 ) -> Result<()>;
150
151 async fn update_node_status(
153 &self,
154 node_id: u32,
155 status: NodeStatus,
156 ) -> Result<()>;
157
158 async fn contains_node(
160 &self,
161 node_id: u32,
162 ) -> bool;
163
164 async fn retrieve_node_meta(
165 &self,
166 node_id: u32,
167 ) -> Option<NodeMeta>;
168
169 async fn remove_node(
171 &self,
172 node_id: u32,
173 ) -> Result<()>;
174
175 #[allow(unused)]
177 async fn force_remove_node(
178 &self,
179 node_id: u32,
180 ) -> Result<()>;
181
182 #[allow(unused)]
184 async fn get_all_nodes(&self) -> Vec<NodeMeta>;
185
186 async fn pre_warm_connections(&self) -> Result<()>;
188
189 async fn get_peer_channel(
190 &self,
191 node_id: u32,
192 conn_type: ConnectionType,
193 ) -> Option<Channel>;
194
195 async fn get_address(
196 &self,
197 node_id: u32,
198 ) -> Option<String>;
199
200 async fn apply_config_change(
202 &self,
203 change: MembershipChange,
204 ) -> Result<()>;
205
206 async fn notify_config_applied(
207 &self,
208 index: u64,
209 );
210
211 async fn get_zombie_candidates(&self) -> Vec<u32>;
212
213 async fn can_rejoin(
215 &self,
216 node_id: u32,
217 role: i32,
218 ) -> Result<()>;
219}
220
221pub fn ensure_safe_join(
222 node_id: u32,
223 current_voters: usize,
224) -> Result<()> {
225 let total_voters = current_voters + 1;
227
228 if (total_voters + 1).is_multiple_of(2) {
230 Ok(())
231 } else {
232 metrics::counter!(
234 "cluster.unsafe_join_attempts",
235 &[("node_id", node_id.to_string())]
236 )
237 .increment(1);
238
239 warn!(
240 "Unsafe join attempt: current_voters={} (total_voters={})",
241 current_voters, total_voters
242 );
243 Err(
244 MembershipError::JoinClusterError("Cluster must maintain odd number of voters".into())
245 .into(),
246 )
247 }
248}