1use std::time::Duration;
2
3use d_engine_proto::error::ErrorCode;
4use d_engine_proto::server::cluster::ClusterMembership;
5use d_engine_proto::server::cluster::MetadataRequest;
6use d_engine_proto::server::cluster::NodeMeta;
7use d_engine_proto::server::cluster::cluster_management_service_client::ClusterManagementServiceClient;
8use tonic::codec::CompressionEncoding;
9use tonic::transport::Channel;
10use tonic::transport::Endpoint;
11use tracing::debug;
12use tracing::info;
13
14use super::ClientApiError;
15use crate::ClientConfig;
16use crate::utils::address_str;
17
18#[derive(Clone)]
23pub struct ConnectionPool {
24 pub(super) leader_conn: Channel,
26 pub(super) follower_conns: Vec<Channel>,
27 pub(super) config: ClientConfig,
28 pub(super) members: Vec<NodeMeta>,
29 pub(super) endpoints: Vec<String>,
30 pub(super) current_leader_id: Option<u32>,
31}
32
33impl ConnectionPool {
34 pub(crate) async fn create(
41 endpoints: Vec<String>,
42 config: ClientConfig,
43 ) -> std::result::Result<Self, ClientApiError> {
44 let (leader_conn, follower_conns, members, current_leader_id) =
45 Self::build_connections(&endpoints, &config).await?;
46
47 Ok(Self {
48 leader_conn,
49 follower_conns,
50 config,
51 members,
52 endpoints,
53 current_leader_id,
54 })
55 }
56
57 #[allow(dead_code)]
64 pub(crate) async fn refresh(
65 &mut self,
66 new_endpoints: Option<Vec<String>>,
67 ) -> std::result::Result<(), ClientApiError> {
68 let endpoints_to_use = new_endpoints.unwrap_or_else(|| self.endpoints.clone());
69 let (leader_conn, follower_conns, members, current_leader_id) =
70 Self::build_connections(&endpoints_to_use, &self.config).await?;
71
72 self.endpoints = endpoints_to_use;
74 self.leader_conn = leader_conn;
75 self.follower_conns = follower_conns;
76 self.members = members;
77 self.current_leader_id = current_leader_id;
78
79 Ok(())
80 }
81
82 async fn build_connections(
84 endpoints: &[String],
85 config: &ClientConfig,
86 ) -> std::result::Result<(Channel, Vec<Channel>, Vec<NodeMeta>, Option<u32>), ClientApiError>
87 {
88 let (membership, leader_conn) = Self::load_cluster_metadata(endpoints, config).await?;
91 info!("Cluster members discovered: {:?}", membership.nodes);
92
93 let (_, follower_addrs) = Self::parse_cluster_metadata(&membership)?;
95
96 let follower_conns = futures::future::join_all(
98 follower_addrs.into_iter().map(|addr| Self::create_channel(addr, config)),
99 )
100 .await
101 .into_iter()
102 .filter_map(std::result::Result::ok)
103 .collect();
104
105 Ok((
106 leader_conn,
107 follower_conns,
108 membership.nodes,
109 membership.current_leader_id,
110 ))
111 }
112
113 pub(super) async fn create_channel(
114 addr: String,
115 config: &ClientConfig,
116 ) -> std::result::Result<Channel, ClientApiError> {
117 debug!("create_channel, addr = {:?}", &addr);
118 Endpoint::try_from(addr)?
119 .connect_timeout(config.connect_timeout)
120 .timeout(config.request_timeout)
121 .tcp_keepalive(Some(config.tcp_keepalive))
122 .http2_keep_alive_interval(config.http2_keepalive_interval)
123 .keep_alive_timeout(config.http2_keepalive_timeout)
124 .connect()
125 .await
126 .map_err(Into::into)
127 }
128 pub(crate) fn get_leader(&self) -> Channel {
132 self.leader_conn.clone()
133 }
134
135 pub(crate) fn get_all_channels(&self) -> Vec<Channel> {
136 let mut cloned = self.follower_conns.clone();
137 cloned.push(self.leader_conn.clone());
138 cloned
139 }
140
141 pub(crate) fn get_all_members(&self) -> Vec<NodeMeta> {
142 self.members.clone()
143 }
144
145 pub(crate) fn get_leader_id(&self) -> Option<u32> {
147 self.current_leader_id
148 }
149
150 pub(super) async fn probe_endpoint(
158 addr: &str,
159 config: &ClientConfig,
160 ) -> Option<std::result::Result<ClusterMembership, ()>> {
161 const MAX_PROBE_CONNECT_MS: u64 = 500;
165 let probe_timeout = config.connect_timeout.min(Duration::from_millis(MAX_PROBE_CONNECT_MS));
166 let channel = Endpoint::try_from(addr.to_string())
167 .ok()?
168 .connect_timeout(probe_timeout)
169 .timeout(config.request_timeout)
170 .connect()
171 .await
172 .ok()?;
173 let mut client = ClusterManagementServiceClient::new(channel);
174 if config.enable_compression {
175 client = client
176 .send_compressed(CompressionEncoding::Gzip)
177 .accept_compressed(CompressionEncoding::Gzip);
178 }
179 let membership = client
180 .get_cluster_metadata(tonic::Request::new(MetadataRequest {}))
181 .await
182 .ok()?
183 .into_inner();
184
185 let ready = membership
187 .current_leader_id
188 .is_some_and(|leader_id| membership.nodes.iter().any(|n| n.id == leader_id));
189
190 if ready {
191 Some(Ok(membership))
192 } else {
193 debug!(
194 "probe_endpoint {}: cluster not ready (leader_id={:?})",
195 addr, membership.current_leader_id
196 );
197 Some(Err(()))
198 }
199 }
200
201 pub(super) async fn load_cluster_metadata(
211 endpoints: &[String],
212 config: &ClientConfig,
213 ) -> std::result::Result<(ClusterMembership, Channel), ClientApiError> {
214 const RETRY_BACKOFF_MS: u64 = 200;
215
216 let deadline = tokio::time::Instant::now() + config.cluster_ready_timeout;
217
218 loop {
219 for addr in endpoints {
221 if tokio::time::Instant::now() >= deadline {
222 return Err(ErrorCode::ClusterUnavailable.into());
223 }
224
225 let membership = match Self::probe_endpoint(addr, config).await {
226 Some(Ok(m)) => m,
227 Some(Err(())) => continue, None => continue, };
230
231 if tokio::time::Instant::now() >= deadline {
233 return Err(ErrorCode::ClusterUnavailable.into());
234 }
235
236 let Ok((leader_addr, _)) = Self::parse_cluster_metadata(&membership) else {
240 tracing::warn!(
242 "parse_cluster_metadata failed after successful probe for {addr}, this is a bug"
243 );
244 continue;
245 };
246 match Self::create_channel(leader_addr, config).await {
247 Ok(leader_conn) => return Ok((membership, leader_conn)),
248 Err(e) => {
249 debug!("load_cluster_metadata: leader connect failed ({e:?}), retrying");
250 continue;
251 }
252 }
253 }
254
255 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
257 if remaining.is_zero() {
258 return Err(ErrorCode::ClusterUnavailable.into());
259 }
260 let backoff = Duration::from_millis(RETRY_BACKOFF_MS).min(remaining);
261 debug!(
262 "load_cluster_metadata: no ready leader found, retrying in {:?}",
263 backoff
264 );
265 tokio::time::sleep(backoff).await;
266 }
267 }
268
269 pub(super) fn parse_cluster_metadata(
274 membership: &ClusterMembership
275 ) -> std::result::Result<(String, Vec<String>), ClientApiError> {
276 let leader_id = membership.current_leader_id.ok_or(ErrorCode::ClusterUnavailable)?;
277
278 let mut leader_addr = None;
279 let mut followers = Vec::new();
280
281 for node in &membership.nodes {
282 let addr = address_str(&node.address);
283 debug!(
284 "parse_cluster_metadata, node_id: {}, addr: {:?}",
285 node.id, &addr
286 );
287 if node.id == leader_id {
288 leader_addr = Some(addr);
289 } else {
290 followers.push(addr);
291 }
292 }
293
294 leader_addr
295 .ok_or(ErrorCode::ClusterUnavailable.into())
296 .map(|addr| (addr, followers))
297 }
298}