1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
use std::time::Duration;
use d_engine_core::client::ErrorCode;
use d_engine_proto::server::cluster::ClusterMembership;
use d_engine_proto::server::cluster::MetadataRequest;
use d_engine_proto::server::cluster::NodeMeta;
use d_engine_proto::server::cluster::cluster_management_service_client::ClusterManagementServiceClient;
use tonic::codec::CompressionEncoding;
use tonic::transport::Channel;
use tonic::transport::Endpoint;
use tracing::debug;
use tracing::info;
use super::ClientApiError;
use crate::ClientConfig;
use crate::utils::address_str;
/// Manages connections to cluster nodes
///
/// Implements connection pooling and leader/follower routing.
/// Automatically handles connection health checks and failover.
#[derive(Clone)]
pub struct ConnectionPool {
// Tonic's Channel is thread-safe and reference-counted.
pub(super) leader_conn: Channel,
pub(super) follower_conns: Vec<Channel>,
pub(super) config: ClientConfig,
pub(super) members: Vec<NodeMeta>,
pub(super) endpoints: Vec<String>,
pub(super) current_leader_id: Option<u32>,
}
impl ConnectionPool {
/// Creates new connection pool with bootstrap nodes
///
/// # Implementation Details
/// 1. Discovers cluster metadata
/// 2. Establishes leader connection
/// 3. Creates follower connections
pub(crate) async fn create(
endpoints: Vec<String>,
config: ClientConfig,
) -> std::result::Result<Self, ClientApiError> {
let (leader_conn, follower_conns, members, current_leader_id) =
Self::build_connections(&endpoints, &config).await?;
Ok(Self {
leader_conn,
follower_conns,
config,
members,
endpoints,
current_leader_id,
})
}
/// Refreshes cluster connections by reloading metadata and rebuilding channels
///
/// # Behavior
/// 1. Discovers fresh cluster metadata from provided endpoints
/// 2. Re-establishes leader connection using latest config
/// 3. Rebuilds follower connections pool
#[allow(dead_code)]
pub(crate) async fn refresh(
&mut self,
new_endpoints: Option<Vec<String>>,
) -> std::result::Result<(), ClientApiError> {
let endpoints_to_use = new_endpoints.unwrap_or_else(|| self.endpoints.clone());
let (leader_conn, follower_conns, members, current_leader_id) =
Self::build_connections(&endpoints_to_use, &self.config).await?;
// Only update endpoints and connections after successful build
self.endpoints = endpoints_to_use;
self.leader_conn = leader_conn;
self.follower_conns = follower_conns;
self.members = members;
self.current_leader_id = current_leader_id;
Ok(())
}
/// Create the core logic of the connection pool (extract common code)
async fn build_connections(
endpoints: &[String],
config: &ClientConfig,
) -> std::result::Result<(Channel, Vec<Channel>, Vec<NodeMeta>, Option<u32>), ClientApiError>
{
// 1. Load cluster metadata + establish verified leader channel atomically.
// Returns only when a ready leader is confirmed AND its channel is connectable.
let (membership, leader_conn) = Self::load_cluster_metadata(endpoints, config).await?;
info!("Cluster members discovered: {:?}", membership.nodes);
// 2. Parse follower addresses (leader channel already established above)
let (_, follower_addrs) = Self::parse_cluster_metadata(&membership)?;
// 3. Establish follower connections in parallel (best-effort, failures are filtered)
let follower_conns = futures::future::join_all(
follower_addrs.into_iter().map(|addr| Self::create_channel(addr, config)),
)
.await
.into_iter()
.filter_map(std::result::Result::ok)
.collect();
Ok((
leader_conn,
follower_conns,
membership.nodes,
membership.current_leader_id,
))
}
pub(super) async fn create_channel(
addr: String,
config: &ClientConfig,
) -> std::result::Result<Channel, ClientApiError> {
debug!("create_channel, addr = {:?}", &addr);
Endpoint::try_from(addr)?
.connect_timeout(config.connect_timeout)
.timeout(config.request_timeout)
.tcp_keepalive(Some(config.tcp_keepalive))
.http2_keep_alive_interval(config.http2_keepalive_interval)
.keep_alive_timeout(config.http2_keepalive_timeout)
.connect()
.await
.map_err(Into::into)
}
/// Retrieves active leader connection
///
/// Used for all write operations and linear reads
pub(crate) fn get_leader(&self) -> Channel {
self.leader_conn.clone()
}
pub(crate) fn get_all_channels(&self) -> Vec<Channel> {
let mut cloned = self.follower_conns.clone();
cloned.push(self.leader_conn.clone());
cloned
}
pub(crate) fn get_all_members(&self) -> Vec<NodeMeta> {
self.members.clone()
}
/// Get the current leader ID
pub(crate) fn get_leader_id(&self) -> Option<u32> {
self.current_leader_id
}
/// Probe a single endpoint and classify the cluster metadata state.
///
/// Returns:
/// - `Some(Ok(membership))` — leader is known and present in member list (ready)
/// - `Some(Err(()))` — node responded but cluster not yet ready (election in progress
/// or stale leader ID); caller should try next node
/// - `None` — node unreachable; caller should try next node
pub(super) async fn probe_endpoint(
addr: &str,
config: &ClientConfig,
) -> Option<std::result::Result<ClusterMembership, ()>> {
// Use a short connect timeout for probing — dead nodes should fail fast
// so the retry loop can move to the next endpoint without burning the
// cluster_ready_timeout budget on TCP handshake waits.
const MAX_PROBE_CONNECT_MS: u64 = 500;
let probe_timeout = config.connect_timeout.min(Duration::from_millis(MAX_PROBE_CONNECT_MS));
let channel = Endpoint::try_from(addr.to_string())
.ok()?
.connect_timeout(probe_timeout)
.timeout(config.request_timeout)
.connect()
.await
.ok()?;
let mut client = ClusterManagementServiceClient::new(channel);
if config.enable_compression {
client = client
.send_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Gzip);
}
let membership = client
.get_cluster_metadata(tonic::Request::new(MetadataRequest {}))
.await
.ok()?
.into_inner();
// Classify: ready only if leader_id is known AND present in member list
let ready = membership
.current_leader_id
.is_some_and(|leader_id| membership.nodes.iter().any(|n| n.id == leader_id));
if ready {
Some(Ok(membership))
} else {
debug!(
"probe_endpoint {}: cluster not ready (leader_id={:?})",
addr, membership.current_leader_id
);
Some(Err(()))
}
}
/// Wait until the cluster has a reachable, noop-committed leader, then return
/// its metadata and an established leader channel.
///
/// Probe and connect are treated as a single atomic step inside one retry loop
/// sharing `config.cluster_ready_timeout`. This prevents the TOCTOU race where
/// probe succeeds but the leader crashes before the channel is established.
///
/// Ready = `current_leader_id` is `Some`, present in member list, AND the leader
/// channel can be established (TCP connect succeeds).
pub(super) async fn load_cluster_metadata(
endpoints: &[String],
config: &ClientConfig,
) -> std::result::Result<(ClusterMembership, Channel), ClientApiError> {
const RETRY_BACKOFF_MS: u64 = 200;
let deadline = tokio::time::Instant::now() + config.cluster_ready_timeout;
loop {
// Probe every endpoint in this round
for addr in endpoints {
if tokio::time::Instant::now() >= deadline {
return Err(ErrorCode::ClusterUnavailable.into());
}
let membership = match Self::probe_endpoint(addr, config).await {
Some(Ok(m)) => m,
Some(Err(())) => continue, // election in progress — try next
None => continue, // node unreachable — try next
};
// Check deadline after probe to prevent overshoot
if tokio::time::Instant::now() >= deadline {
return Err(ErrorCode::ClusterUnavailable.into());
}
// Probe succeeded: try to establish leader channel in the same step.
// If connect fails (leader crashed between probe and connect), continue
// to next endpoint — do NOT return error (shared deadline handles timeout).
let Ok((leader_addr, _)) = Self::parse_cluster_metadata(&membership) else {
// This should be unreachable: probe_endpoint already verified ready state
tracing::warn!(
"parse_cluster_metadata failed after successful probe for {addr}, this is a bug"
);
continue;
};
match Self::create_channel(leader_addr, config).await {
Ok(leader_conn) => return Ok((membership, leader_conn)),
Err(e) => {
debug!("load_cluster_metadata: leader connect failed ({e:?}), retrying");
continue;
}
}
}
// Full round completed with no ready+connectable leader — backoff then retry
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
return Err(ErrorCode::ClusterUnavailable.into());
}
let backoff = Duration::from_millis(RETRY_BACKOFF_MS).min(remaining);
debug!(
"load_cluster_metadata: no ready leader found, retrying in {:?}",
backoff
);
tokio::time::sleep(backoff).await;
}
}
/// Extract leader address from metadata using current_leader_id.
///
/// Precondition: `membership` must be in Ready state (leader_id is Some and
/// present in member list). Guaranteed by `load_cluster_metadata`.
pub(super) fn parse_cluster_metadata(
membership: &ClusterMembership
) -> std::result::Result<(String, Vec<String>), ClientApiError> {
let leader_id = membership.current_leader_id.ok_or(ErrorCode::ClusterUnavailable)?;
let mut leader_addr = None;
let mut followers = Vec::new();
for node in &membership.nodes {
let addr = address_str(&node.address);
debug!(
"parse_cluster_metadata, node_id: {}, addr: {:?}",
node.id, &addr
);
if node.id == leader_id {
leader_addr = Some(addr);
} else {
followers.push(addr);
}
}
leader_addr
.ok_or(ErrorCode::ClusterUnavailable.into())
.map(|addr| (addr, followers))
}
}