1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
use d_engine_proto::common::MembershipChange;
use d_engine_proto::common::NodeStatus;
use d_engine_proto::server::cluster::ClusterConfChangeRequest;
use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
use d_engine_proto::server::cluster::ClusterMembership;
use d_engine_proto::server::cluster::NodeMeta;
#[cfg(any(test, feature = "__test_support"))]
use mockall::automock;
use tonic::async_trait;
use tonic::transport::Channel;
use tracing::warn;
use crate::MembershipError;
use crate::Result;
use crate::TypeConfig;
// Add connection type management in RpcPeerChannels
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub enum ConnectionType {
Control, // Used for key operations such as elections/heartbeats
Data, // Used for log replication
Bulk, // Used for high-traffic operations such as snapshot transmission
}
impl ConnectionType {
pub fn all() -> Vec<ConnectionType> {
vec![
ConnectionType::Control,
ConnectionType::Data,
ConnectionType::Bulk,
]
}
}
#[cfg_attr(any(test, feature = "__test_support"), automock)]
#[async_trait]
pub trait Membership<T>: Sync + Send + 'static
where
T: TypeConfig,
{
/// All nodes (including itself)
#[allow(unused)]
async fn members(&self) -> Vec<NodeMeta>;
/// All non-self nodes (including Syncing and Active)
/// Note:
/// Joining node has not start its Raft event processing engine yet.
async fn replication_peers(&self) -> Vec<NodeMeta>;
/// All non-self nodes in Active state
async fn voters(&self) -> Vec<NodeMeta>;
/// Get the initial cluster size from configuration
///
/// This value is determined at node startup from the `initial_cluster` configuration
/// and remains constant throughout the node's lifetime. It represents the designed
/// cluster size, not the current runtime membership state.
///
/// Used for:
/// - Quorum calculations
/// - Cluster topology decisions
///
/// # Safety
/// This method is safe to use for cluster topology decisions as it's based on
/// immutable configuration rather than runtime state that could be affected by
/// network partitions or bugs.
async fn initial_cluster_size(&self) -> usize;
/// Check if this is a single-node cluster
///
/// Returns `true` if the initial cluster size is 1, indicating this node
/// was configured to run in standalone mode without any peers.
///
/// This is a convenience method equivalent to `initial_cluster_size() == 1`.
///
/// # Use Cases
/// - Skip Raft election in single-node mode (no peers to vote)
/// - Skip log replication in single-node mode (no peers to replicate to)
/// - Optimize performance by avoiding unnecessary network operations
///
/// # Safety
/// Safe for all cluster topology decisions as it's based on immutable configuration.
async fn is_single_node_cluster(&self) -> bool {
self.initial_cluster_size().await == 1
}
/// All pending active nodes in Active state
#[allow(unused)]
async fn nodes_with_status(
&self,
status: NodeStatus,
) -> Vec<NodeMeta>;
async fn get_node_status(
&self,
node_id: u32,
) -> Option<NodeStatus>;
async fn check_cluster_is_ready(&self) -> Result<()>;
async fn get_peers_id_with_condition<F>(
&self,
condition: F,
) -> Vec<u32>
where
F: Fn(i32) -> bool + Send + Sync + 'static;
/// retrieve latest cluster membership with current leader ID
///
/// # Parameters
/// - `current_leader_id`: Optional current leader ID from runtime state
async fn retrieve_cluster_membership_config(
&self,
current_leader_id: Option<u32>,
) -> ClusterMembership;
/// invoked when receive requests from Leader
async fn update_cluster_conf_from_leader(
&self,
my_id: u32,
my_current_term: u64,
current_conf_version: u64,
current_leader_id: Option<u32>,
cluster_conf_change_req: &ClusterConfChangeRequest,
) -> Result<ClusterConfUpdateResponse>;
async fn get_cluster_conf_version(&self) -> u64;
async fn update_conf_version(
&self,
version: u64,
);
#[allow(unused)]
async fn incr_conf_version(&self);
/// Add a new node as a learner
async fn add_learner(
&self,
node_id: u32,
address: String,
status: NodeStatus,
) -> Result<()>;
/// Activate node
#[allow(unused)]
async fn activate_node(
&mut self,
new_node_id: u32,
) -> Result<()>;
/// Update status of a node
async fn update_node_status(
&self,
node_id: u32,
status: NodeStatus,
) -> Result<()>;
/// Check if the node already exists
async fn contains_node(
&self,
node_id: u32,
) -> bool;
async fn retrieve_node_meta(
&self,
node_id: u32,
) -> Option<NodeMeta>;
/// Elegantly remove nodes
async fn remove_node(
&self,
node_id: u32,
) -> Result<()>;
/// Forcefully remove faulty nodes
#[allow(unused)]
async fn force_remove_node(
&self,
node_id: u32,
) -> Result<()>;
/// Get all node status
#[allow(unused)]
async fn get_all_nodes(&self) -> Vec<NodeMeta>;
/// Pre-warms connection cache for all replication peers
async fn pre_warm_connections(&self) -> Result<()>;
async fn get_peer_channel(
&self,
node_id: u32,
conn_type: ConnectionType,
) -> Option<Channel>;
async fn get_address(
&self,
node_id: u32,
) -> Option<String>;
/// Apply committed configuration change
async fn apply_config_change(
&self,
change: MembershipChange,
) -> Result<()>;
async fn notify_config_applied(
&self,
index: u64,
);
async fn get_zombie_candidates(&self) -> Vec<u32>;
/// If new node could rejoin the cluster again
async fn can_rejoin(
&self,
node_id: u32,
role: i32,
) -> Result<()>;
}
pub fn ensure_safe_join(
node_id: u32,
current_voters: usize,
) -> Result<()> {
// Total voters including leader = current_voters + 1
let total_voters = current_voters + 1;
// Always allow if cluster will have even number of voters
if (total_voters + 1).is_multiple_of(2) {
Ok(())
} else {
// metrics::counter!("cluster.unsafe_join_attempts", 1);
metrics::counter!(
"cluster.unsafe_join_attempts",
&[("node_id", node_id.to_string())]
)
.increment(1);
warn!(
"Unsafe join attempt: current_voters={} (total_voters={})",
current_voters, total_voters
);
Err(
MembershipError::JoinClusterError("Cluster must maintain odd number of voters".into())
.into(),
)
}
}