1use d_engine_proto::error::ErrorCode;
2use d_engine_proto::server::cluster::ClusterMembership;
3use d_engine_proto::server::cluster::MetadataRequest;
4use d_engine_proto::server::cluster::NodeMeta;
5use d_engine_proto::server::cluster::cluster_management_service_client::ClusterManagementServiceClient;
6use tonic::codec::CompressionEncoding;
7use tonic::transport::Channel;
8use tonic::transport::Endpoint;
9use tracing::debug;
10use tracing::error;
11use tracing::info;
12
13use super::ClientApiError;
14use crate::ClientConfig;
15use crate::utils::address_str;
16
17#[derive(Clone)]
22pub struct ConnectionPool {
23 pub(super) leader_conn: Channel,
25 pub(super) follower_conns: Vec<Channel>,
26 pub(super) config: ClientConfig,
27 pub(super) members: Vec<NodeMeta>,
28 pub(super) endpoints: Vec<String>,
29 pub(super) current_leader_id: Option<u32>,
30}
31
32impl ConnectionPool {
33 pub(crate) async fn create(
40 endpoints: Vec<String>,
41 config: ClientConfig,
42 ) -> std::result::Result<Self, ClientApiError> {
43 let (leader_conn, follower_conns, members, current_leader_id) =
44 Self::build_connections(&endpoints, &config).await?;
45
46 Ok(Self {
47 leader_conn,
48 follower_conns,
49 config,
50 members,
51 endpoints,
52 current_leader_id,
53 })
54 }
55
56 #[allow(dead_code)]
63 pub(crate) async fn refresh(
64 &mut self,
65 new_endpoints: Option<Vec<String>>,
66 ) -> std::result::Result<(), ClientApiError> {
67 if let Some(endpoints) = new_endpoints {
68 self.endpoints = endpoints;
69 }
70 let (leader_conn, follower_conns, members, current_leader_id) =
71 Self::build_connections(&self.endpoints, &self.config).await?;
72
73 self.leader_conn = leader_conn;
75 self.follower_conns = follower_conns;
76 self.members = members;
77 self.current_leader_id = current_leader_id;
78
79 Ok(())
80 }
81
82 async fn build_connections(
84 endpoints: &[String],
85 config: &ClientConfig,
86 ) -> std::result::Result<(Channel, Vec<Channel>, Vec<NodeMeta>, Option<u32>), ClientApiError>
87 {
88 let membership = Self::load_cluster_metadata(endpoints, config).await?;
90 info!("Cluster members discovered: {:?}", membership.nodes);
91
92 let (leader_addr, follower_addrs) = Self::parse_cluster_metadata(&membership)?;
94
95 let leader_future = Self::create_channel(leader_addr, config);
97 let follower_futures =
98 follower_addrs.into_iter().map(|addr| Self::create_channel(addr, config));
99
100 let (leader_conn, follower_conns) =
101 tokio::join!(leader_future, futures::future::join_all(follower_futures));
102
103 let leader_conn = leader_conn?;
105 let follower_conns =
106 follower_conns.into_iter().filter_map(std::result::Result::ok).collect();
107
108 Ok((
109 leader_conn,
110 follower_conns,
111 membership.nodes,
112 membership.current_leader_id,
113 ))
114 }
115
116 pub(super) async fn create_channel(
117 addr: String,
118 config: &ClientConfig,
119 ) -> std::result::Result<Channel, ClientApiError> {
120 debug!("create_channel, addr = {:?}", &addr);
121 Endpoint::try_from(addr)?
122 .connect_timeout(config.connect_timeout)
123 .timeout(config.request_timeout)
124 .tcp_keepalive(Some(config.tcp_keepalive))
125 .http2_keep_alive_interval(config.http2_keepalive_interval)
126 .keep_alive_timeout(config.http2_keepalive_timeout)
127 .connect()
128 .await
129 .map_err(Into::into)
130 }
131 pub(crate) fn get_leader(&self) -> Channel {
135 self.leader_conn.clone()
136 }
137
138 pub(crate) fn get_all_channels(&self) -> Vec<Channel> {
139 let mut cloned = self.follower_conns.clone();
140 cloned.push(self.leader_conn.clone());
141 cloned
142 }
143
144 pub(crate) fn get_all_members(&self) -> Vec<NodeMeta> {
145 self.members.clone()
146 }
147
148 pub(crate) fn get_leader_id(&self) -> Option<u32> {
150 self.current_leader_id
151 }
152
153 pub(super) async fn load_cluster_metadata(
155 endpoints: &[String],
156 config: &ClientConfig,
157 ) -> std::result::Result<ClusterMembership, ClientApiError> {
158 for addr in endpoints {
159 match Self::create_channel(addr.clone(), config).await {
160 Ok(channel) => {
161 let mut client = ClusterManagementServiceClient::new(channel);
162 if config.enable_compression {
163 client = client
164 .send_compressed(CompressionEncoding::Gzip)
165 .accept_compressed(CompressionEncoding::Gzip);
166 }
167 match client.get_cluster_metadata(tonic::Request::new(MetadataRequest {})).await
168 {
169 Ok(response) => return Ok(response.into_inner()),
170 Err(e) => {
171 error!("get_cluster_metadata: {:?}", e);
172 continue;
174 }
175 }
176 }
177 Err(e) => {
178 error!(
179 "load_cluster_metadata from addr: {:?}, failed: {:?}",
180 &addr, e
181 );
182 continue;
183 } }
185 }
186 Err(ErrorCode::ClusterUnavailable.into())
187 }
188
189 pub(super) fn parse_cluster_metadata(
191 membership: &ClusterMembership
192 ) -> std::result::Result<(String, Vec<String>), ClientApiError> {
193 let leader_id = membership.current_leader_id.ok_or(ErrorCode::ClusterUnavailable)?;
194
195 let mut leader_addr = None;
196 let mut followers = Vec::new();
197
198 for node in &membership.nodes {
199 let addr = address_str(&node.address);
200 debug!(
201 "parse_cluster_metadata, node_id: {}, addr: {:?}",
202 node.id, &addr
203 );
204 if node.id == leader_id {
205 leader_addr = Some(addr);
206 } else {
207 followers.push(addr);
208 }
209 }
210
211 leader_addr
212 .ok_or(ErrorCode::ClusterUnavailable.into())
213 .map(|addr| (addr, followers))
214 }
215}