d_engine/membership/
mod.rs

1mod membership_guard;
2mod raft_membership;
3pub(crate) use membership_guard::*;
4pub use raft_membership::*;
5
6#[cfg(test)]
7mod membership_guard_test;
8#[cfg(test)]
9mod raft_membership_test;
10
11#[cfg(test)]
12use mockall::automock;
13use tonic::async_trait;
14use tonic::transport::Channel;
15
16use crate::proto::cluster::cluster_conf_update_response::ErrorCode;
17use crate::proto::cluster::ClusterConfChangeRequest;
18use crate::proto::cluster::ClusterConfUpdateResponse;
19use crate::proto::cluster::ClusterMembership;
20use crate::proto::common::NodeStatus;
21use crate::Result;
22use crate::TypeConfig;
23
24// Add connection type management in RpcPeerChannels
25#[derive(Clone, Debug, Eq, Hash, PartialEq)]
26pub(crate) enum ConnectionType {
27    Control, // Used for key operations such as elections/heartbeats
28    Data,    // Used for log replication
29    Bulk,    // Used for high-traffic operations such as snapshot transmission
30}
31impl ConnectionType {
32    pub(crate) fn all() -> Vec<ConnectionType> {
33        vec![
34            ConnectionType::Control,
35            ConnectionType::Data,
36            ConnectionType::Bulk,
37        ]
38    }
39}
40
41#[cfg_attr(test, automock)]
42#[async_trait]
43pub trait Membership<T>: Sync + Send + 'static
44where
45    T: TypeConfig,
46{
47    /// All nodes (including itself)
48    #[allow(unused)]
49    async fn members(&self) -> Vec<crate::proto::cluster::NodeMeta>;
50
51    /// All non-self nodes (including Syncing and Active)
52    /// Note:
53    /// Joining node has not start its Raft event processing engine yet.
54    async fn replication_peers(&self) -> Vec<crate::proto::cluster::NodeMeta>;
55
56    /// All non-self nodes in Active state
57    async fn voters(&self) -> Vec<crate::proto::cluster::NodeMeta>;
58
59    /// All pending active nodes in Active state
60    #[allow(unused)]
61    async fn nodes_with_status(
62        &self,
63        status: NodeStatus,
64    ) -> Vec<crate::proto::cluster::NodeMeta>;
65
66    async fn get_node_status(
67        &self,
68        node_id: u32,
69    ) -> Option<NodeStatus>;
70
71    async fn check_cluster_is_ready(&self) -> Result<()>;
72
73    async fn get_peers_id_with_condition<F>(
74        &self,
75        condition: F,
76    ) -> Vec<u32>
77    where
78        F: Fn(i32) -> bool + Send + Sync + 'static;
79
80    async fn mark_leader_id(
81        &self,
82        leader_id: u32,
83    ) -> Result<()>;
84
85    async fn current_leader_id(&self) -> Option<u32>;
86
87    /// Reset old leader to follower
88    async fn reset_leader(&self) -> Result<()>;
89
90    /// If node role not found return Error
91    async fn update_node_role(
92        &self,
93        node_id: u32,
94        new_role: i32,
95    ) -> Result<()>;
96
97    /// retrieve latest cluster membership
98    async fn retrieve_cluster_membership_config(&self) -> ClusterMembership;
99
100    /// invoked when receive requests from Leader
101    async fn update_cluster_conf_from_leader(
102        &self,
103        my_id: u32,
104        my_current_term: u64,
105        current_conf_version: u64,
106        current_leader_id: Option<u32>,
107        cluster_conf_change_req: &ClusterConfChangeRequest,
108    ) -> Result<ClusterConfUpdateResponse>;
109
110    async fn get_cluster_conf_version(&self) -> u64;
111
112    async fn update_conf_version(
113        &self,
114        version: u64,
115    );
116
117    #[allow(unused)]
118    async fn incr_conf_version(&self);
119
120    /// Add a new node as a learner
121    async fn add_learner(
122        &self,
123        node_id: u32,
124        address: String,
125    ) -> Result<()>;
126
127    /// Activate node
128    #[allow(unused)]
129    async fn activate_node(
130        &mut self,
131        new_node_id: u32,
132    ) -> Result<()>;
133
134    /// Update status of a node
135    async fn update_node_status(
136        &self,
137        node_id: u32,
138        status: NodeStatus,
139    ) -> Result<()>;
140
141    /// Check if the node already exists
142    async fn contains_node(
143        &self,
144        node_id: u32,
145    ) -> bool;
146
147    async fn retrieve_node_meta(
148        &self,
149        node_id: u32,
150    ) -> Option<crate::proto::cluster::NodeMeta>;
151
152    /// Elegantly remove nodes
153    async fn remove_node(
154        &self,
155        node_id: u32,
156    ) -> Result<()>;
157
158    /// Forcefully remove faulty nodes
159    #[allow(unused)]
160    async fn force_remove_node(
161        &self,
162        node_id: u32,
163    ) -> Result<()>;
164
165    /// Get all node status
166    #[allow(unused)]
167    async fn get_all_nodes(&self) -> Vec<crate::proto::cluster::NodeMeta>;
168
169    /// Pre-warms connection cache for all replication peers
170    async fn pre_warm_connections(&self) -> Result<()>;
171
172    async fn get_peer_channel(
173        &self,
174        node_id: u32,
175        conn_type: ConnectionType,
176    ) -> Option<Channel>;
177
178    async fn get_address(
179        &self,
180        node_id: u32,
181    ) -> Option<String>;
182
183    /// Apply committed configuration change
184    async fn apply_config_change(
185        &self,
186        change: crate::proto::common::MembershipChange,
187    ) -> Result<()>;
188
189    async fn notify_config_applied(
190        &self,
191        index: u64,
192    );
193
194    async fn get_zombie_candidates(&self) -> Vec<u32>;
195
196    /// If new node could rejoin the cluster again
197    async fn can_rejoin(
198        &self,
199        node_id: u32,
200        role: i32,
201    ) -> Result<()>;
202}
203
204impl ClusterConfUpdateResponse {
205    /// Generate a successful response (full success)
206    pub(crate) fn success(
207        node_id: u32,
208        term: u64,
209        version: u64,
210    ) -> Self {
211        Self {
212            id: node_id,
213            term,
214            version,
215            success: true,
216            error_code: ErrorCode::None.into(),
217        }
218    }
219
220    /// Generate a failed response (Stale leader term)
221    pub(crate) fn higher_term(
222        node_id: u32,
223        term: u64,
224        version: u64,
225    ) -> Self {
226        Self {
227            id: node_id,
228            term,
229            version,
230            success: false,
231            error_code: ErrorCode::TermOutdated.into(),
232        }
233    }
234
235    /// Generate a failed response (Request sent to non-leader or an out-dated leader)
236    pub(crate) fn not_leader(
237        node_id: u32,
238        term: u64,
239        version: u64,
240    ) -> Self {
241        Self {
242            id: node_id,
243            term,
244            version,
245            success: false,
246            error_code: ErrorCode::NotLeader.into(),
247        }
248    }
249
250    /// Generate a failed response (Stale configuration version)
251    pub(crate) fn version_conflict(
252        node_id: u32,
253        term: u64,
254        version: u64,
255    ) -> Self {
256        Self {
257            id: node_id,
258            term,
259            version,
260            success: false,
261            error_code: ErrorCode::VersionConflict.into(),
262        }
263    }
264
265    /// Generate a failed response (Malformed change request)
266    #[allow(unused)]
267    pub(crate) fn invalid_change(
268        node_id: u32,
269        term: u64,
270        version: u64,
271    ) -> Self {
272        Self {
273            id: node_id,
274            term,
275            version,
276            success: false,
277            error_code: ErrorCode::InvalidChange.into(),
278        }
279    }
280
281    /// Generate a failed response (Server-side processing error)
282    pub(crate) fn internal_error(
283        node_id: u32,
284        term: u64,
285        version: u64,
286    ) -> Self {
287        Self {
288            id: node_id,
289            term,
290            version,
291            success: false,
292            error_code: ErrorCode::InternalError.into(),
293        }
294    }
295
296    #[allow(unused)]
297    pub(crate) fn is_higher_term(&self) -> bool {
298        self.error_code == <ErrorCode as Into<i32>>::into(ErrorCode::TermOutdated)
299    }
300}
301
302impl NodeStatus {
303    pub fn is_promotable(&self) -> bool {
304        matches!(self, NodeStatus::Syncing)
305    }
306
307    pub fn is_i32_promotable(value: i32) -> bool {
308        matches!(value, v if v == (NodeStatus::Syncing as i32))
309    }
310}