1mod membership_guard;
2mod raft_membership;
3pub(crate) use membership_guard::*;
4pub use raft_membership::*;
5
6#[cfg(test)]
7mod membership_guard_test;
8#[cfg(test)]
9mod raft_membership_test;
10
11#[cfg(test)]
12use mockall::automock;
13use tonic::async_trait;
14use tonic::transport::Channel;
15
16use crate::proto::cluster::cluster_conf_update_response::ErrorCode;
17use crate::proto::cluster::ClusterConfChangeRequest;
18use crate::proto::cluster::ClusterConfUpdateResponse;
19use crate::proto::cluster::ClusterMembership;
20use crate::proto::common::NodeStatus;
21use crate::Result;
22use crate::TypeConfig;
23
24#[derive(Clone, Debug, Eq, Hash, PartialEq)]
26pub(crate) enum ConnectionType {
27 Control, Data, Bulk, }
31impl ConnectionType {
32 pub(crate) fn all() -> Vec<ConnectionType> {
33 vec![
34 ConnectionType::Control,
35 ConnectionType::Data,
36 ConnectionType::Bulk,
37 ]
38 }
39}
40
41#[cfg_attr(test, automock)]
42#[async_trait]
43pub trait Membership<T>: Sync + Send + 'static
44where
45 T: TypeConfig,
46{
47 #[allow(unused)]
49 async fn members(&self) -> Vec<crate::proto::cluster::NodeMeta>;
50
51 async fn replication_peers(&self) -> Vec<crate::proto::cluster::NodeMeta>;
55
56 async fn voters(&self) -> Vec<crate::proto::cluster::NodeMeta>;
58
59 #[allow(unused)]
61 async fn nodes_with_status(
62 &self,
63 status: NodeStatus,
64 ) -> Vec<crate::proto::cluster::NodeMeta>;
65
66 async fn get_node_status(
67 &self,
68 node_id: u32,
69 ) -> Option<NodeStatus>;
70
71 async fn check_cluster_is_ready(&self) -> Result<()>;
72
73 async fn get_peers_id_with_condition<F>(
74 &self,
75 condition: F,
76 ) -> Vec<u32>
77 where
78 F: Fn(i32) -> bool + Send + Sync + 'static;
79
80 async fn mark_leader_id(
81 &self,
82 leader_id: u32,
83 ) -> Result<()>;
84
85 async fn current_leader_id(&self) -> Option<u32>;
86
87 async fn reset_leader(&self) -> Result<()>;
89
90 async fn update_node_role(
92 &self,
93 node_id: u32,
94 new_role: i32,
95 ) -> Result<()>;
96
97 async fn retrieve_cluster_membership_config(&self) -> ClusterMembership;
99
100 async fn update_cluster_conf_from_leader(
102 &self,
103 my_id: u32,
104 my_current_term: u64,
105 current_conf_version: u64,
106 current_leader_id: Option<u32>,
107 cluster_conf_change_req: &ClusterConfChangeRequest,
108 ) -> Result<ClusterConfUpdateResponse>;
109
110 async fn get_cluster_conf_version(&self) -> u64;
111
112 async fn update_conf_version(
113 &self,
114 version: u64,
115 );
116
117 #[allow(unused)]
118 async fn incr_conf_version(&self);
119
120 async fn add_learner(
122 &self,
123 node_id: u32,
124 address: String,
125 ) -> Result<()>;
126
127 #[allow(unused)]
129 async fn activate_node(
130 &mut self,
131 new_node_id: u32,
132 ) -> Result<()>;
133
134 async fn update_node_status(
136 &self,
137 node_id: u32,
138 status: NodeStatus,
139 ) -> Result<()>;
140
141 async fn contains_node(
143 &self,
144 node_id: u32,
145 ) -> bool;
146
147 async fn retrieve_node_meta(
148 &self,
149 node_id: u32,
150 ) -> Option<crate::proto::cluster::NodeMeta>;
151
152 async fn remove_node(
154 &self,
155 node_id: u32,
156 ) -> Result<()>;
157
158 #[allow(unused)]
160 async fn force_remove_node(
161 &self,
162 node_id: u32,
163 ) -> Result<()>;
164
165 #[allow(unused)]
167 async fn get_all_nodes(&self) -> Vec<crate::proto::cluster::NodeMeta>;
168
169 async fn pre_warm_connections(&self) -> Result<()>;
171
172 async fn get_peer_channel(
173 &self,
174 node_id: u32,
175 conn_type: ConnectionType,
176 ) -> Option<Channel>;
177
178 async fn get_address(
179 &self,
180 node_id: u32,
181 ) -> Option<String>;
182
183 async fn apply_config_change(
185 &self,
186 change: crate::proto::common::MembershipChange,
187 ) -> Result<()>;
188
189 async fn notify_config_applied(
190 &self,
191 index: u64,
192 );
193
194 async fn get_zombie_candidates(&self) -> Vec<u32>;
195
196 async fn can_rejoin(
198 &self,
199 node_id: u32,
200 role: i32,
201 ) -> Result<()>;
202}
203
204impl ClusterConfUpdateResponse {
205 pub(crate) fn success(
207 node_id: u32,
208 term: u64,
209 version: u64,
210 ) -> Self {
211 Self {
212 id: node_id,
213 term,
214 version,
215 success: true,
216 error_code: ErrorCode::None.into(),
217 }
218 }
219
220 pub(crate) fn higher_term(
222 node_id: u32,
223 term: u64,
224 version: u64,
225 ) -> Self {
226 Self {
227 id: node_id,
228 term,
229 version,
230 success: false,
231 error_code: ErrorCode::TermOutdated.into(),
232 }
233 }
234
235 pub(crate) fn not_leader(
237 node_id: u32,
238 term: u64,
239 version: u64,
240 ) -> Self {
241 Self {
242 id: node_id,
243 term,
244 version,
245 success: false,
246 error_code: ErrorCode::NotLeader.into(),
247 }
248 }
249
250 pub(crate) fn version_conflict(
252 node_id: u32,
253 term: u64,
254 version: u64,
255 ) -> Self {
256 Self {
257 id: node_id,
258 term,
259 version,
260 success: false,
261 error_code: ErrorCode::VersionConflict.into(),
262 }
263 }
264
265 #[allow(unused)]
267 pub(crate) fn invalid_change(
268 node_id: u32,
269 term: u64,
270 version: u64,
271 ) -> Self {
272 Self {
273 id: node_id,
274 term,
275 version,
276 success: false,
277 error_code: ErrorCode::InvalidChange.into(),
278 }
279 }
280
281 pub(crate) fn internal_error(
283 node_id: u32,
284 term: u64,
285 version: u64,
286 ) -> Self {
287 Self {
288 id: node_id,
289 term,
290 version,
291 success: false,
292 error_code: ErrorCode::InternalError.into(),
293 }
294 }
295
296 #[allow(unused)]
297 pub(crate) fn is_higher_term(&self) -> bool {
298 self.error_code == <ErrorCode as Into<i32>>::into(ErrorCode::TermOutdated)
299 }
300}
301
302impl NodeStatus {
303 pub fn is_promotable(&self) -> bool {
304 matches!(self, NodeStatus::Syncing)
305 }
306
307 pub fn is_i32_promotable(value: i32) -> bool {
308 matches!(value, v if v == (NodeStatus::Syncing as i32))
309 }
310}