d_engine_client/
pool.rs

1use d_engine_proto::error::ErrorCode;
2use d_engine_proto::server::cluster::ClusterMembership;
3use d_engine_proto::server::cluster::MetadataRequest;
4use d_engine_proto::server::cluster::NodeMeta;
5use d_engine_proto::server::cluster::cluster_management_service_client::ClusterManagementServiceClient;
6use tonic::codec::CompressionEncoding;
7use tonic::transport::Channel;
8use tonic::transport::Endpoint;
9use tracing::debug;
10use tracing::error;
11use tracing::info;
12
13use super::ClientApiError;
14use crate::ClientConfig;
15use crate::utils::address_str;
16
17/// Manages connections to cluster nodes
18///
19/// Implements connection pooling and leader/follower routing.
20/// Automatically handles connection health checks and failover.
21#[derive(Clone)]
22pub struct ConnectionPool {
23    // Tonic's Channel is thread-safe and reference-counted.
24    pub(super) leader_conn: Channel,
25    pub(super) follower_conns: Vec<Channel>,
26    pub(super) config: ClientConfig,
27    pub(super) members: Vec<NodeMeta>,
28    pub(super) endpoints: Vec<String>,
29    pub(super) current_leader_id: Option<u32>,
30}
31
32impl ConnectionPool {
33    /// Creates new connection pool with bootstrap nodes
34    ///
35    /// # Implementation Details
36    /// 1. Discovers cluster metadata
37    /// 2. Establishes leader connection
38    /// 3. Creates follower connections
39    pub(crate) async fn create(
40        endpoints: Vec<String>,
41        config: ClientConfig,
42    ) -> std::result::Result<Self, ClientApiError> {
43        let (leader_conn, follower_conns, members, current_leader_id) =
44            Self::build_connections(&endpoints, &config).await?;
45
46        Ok(Self {
47            leader_conn,
48            follower_conns,
49            config,
50            members,
51            endpoints,
52            current_leader_id,
53        })
54    }
55
56    /// Refreshes cluster connections by reloading metadata and rebuilding channels
57    ///
58    /// # Behavior
59    /// 1. Discovers fresh cluster metadata from provided endpoints
60    /// 2. Re-establishes leader connection using latest config
61    /// 3. Rebuilds follower connections pool
62    #[allow(dead_code)]
63    pub(crate) async fn refresh(
64        &mut self,
65        new_endpoints: Option<Vec<String>>,
66    ) -> std::result::Result<(), ClientApiError> {
67        if let Some(endpoints) = new_endpoints {
68            self.endpoints = endpoints;
69        }
70        let (leader_conn, follower_conns, members, current_leader_id) =
71            Self::build_connections(&self.endpoints, &self.config).await?;
72
73        // Atomic update of fields
74        self.leader_conn = leader_conn;
75        self.follower_conns = follower_conns;
76        self.members = members;
77        self.current_leader_id = current_leader_id;
78
79        Ok(())
80    }
81
82    /// Create the core logic of the connection pool (extract common code)
83    async fn build_connections(
84        endpoints: &[String],
85        config: &ClientConfig,
86    ) -> std::result::Result<(Channel, Vec<Channel>, Vec<NodeMeta>, Option<u32>), ClientApiError>
87    {
88        // 1. Load cluster metadata
89        let membership = Self::load_cluster_metadata(endpoints, config).await?;
90        info!("Cluster members discovered: {:?}", membership.nodes);
91
92        // 2. Parse leader and follower addresses
93        let (leader_addr, follower_addrs) = Self::parse_cluster_metadata(&membership)?;
94
95        // 3. Establish all connections in parallel
96        let leader_future = Self::create_channel(leader_addr, config);
97        let follower_futures =
98            follower_addrs.into_iter().map(|addr| Self::create_channel(addr, config));
99
100        let (leader_conn, follower_conns) =
101            tokio::join!(leader_future, futures::future::join_all(follower_futures));
102
103        // 4. Filter valid connections
104        let leader_conn = leader_conn?;
105        let follower_conns =
106            follower_conns.into_iter().filter_map(std::result::Result::ok).collect();
107
108        Ok((
109            leader_conn,
110            follower_conns,
111            membership.nodes,
112            membership.current_leader_id,
113        ))
114    }
115
116    pub(super) async fn create_channel(
117        addr: String,
118        config: &ClientConfig,
119    ) -> std::result::Result<Channel, ClientApiError> {
120        debug!("create_channel, addr = {:?}", &addr);
121        Endpoint::try_from(addr)?
122            .connect_timeout(config.connect_timeout)
123            .timeout(config.request_timeout)
124            .tcp_keepalive(Some(config.tcp_keepalive))
125            .http2_keep_alive_interval(config.http2_keepalive_interval)
126            .keep_alive_timeout(config.http2_keepalive_timeout)
127            .connect()
128            .await
129            .map_err(Into::into)
130    }
131    /// Retrieves active leader connection
132    ///
133    /// Used for all write operations and linear reads
134    pub(crate) fn get_leader(&self) -> Channel {
135        self.leader_conn.clone()
136    }
137
138    pub(crate) fn get_all_channels(&self) -> Vec<Channel> {
139        let mut cloned = self.follower_conns.clone();
140        cloned.push(self.leader_conn.clone());
141        cloned
142    }
143
144    pub(crate) fn get_all_members(&self) -> Vec<NodeMeta> {
145        self.members.clone()
146    }
147
148    /// Get the current leader ID
149    pub(crate) fn get_leader_id(&self) -> Option<u32> {
150        self.current_leader_id
151    }
152
153    /// Discover cluster metadata by probing nodes
154    pub(super) async fn load_cluster_metadata(
155        endpoints: &[String],
156        config: &ClientConfig,
157    ) -> std::result::Result<ClusterMembership, ClientApiError> {
158        for addr in endpoints {
159            match Self::create_channel(addr.clone(), config).await {
160                Ok(channel) => {
161                    let mut client = ClusterManagementServiceClient::new(channel);
162                    if config.enable_compression {
163                        client = client
164                            .send_compressed(CompressionEncoding::Gzip)
165                            .accept_compressed(CompressionEncoding::Gzip);
166                    }
167                    match client.get_cluster_metadata(tonic::Request::new(MetadataRequest {})).await
168                    {
169                        Ok(response) => return Ok(response.into_inner()),
170                        Err(e) => {
171                            error!("get_cluster_metadata: {:?}", e);
172                            // Try next node
173                            continue;
174                        }
175                    }
176                }
177                Err(e) => {
178                    error!(
179                        "load_cluster_metadata from addr: {:?}, failed: {:?}",
180                        &addr, e
181                    );
182                    continue;
183                } // Connection failed, try next
184            }
185        }
186        Err(ErrorCode::ClusterUnavailable.into())
187    }
188
189    /// Extract leader address from metadata using current_leader_id
190    pub(super) fn parse_cluster_metadata(
191        membership: &ClusterMembership
192    ) -> std::result::Result<(String, Vec<String>), ClientApiError> {
193        let leader_id = membership.current_leader_id.ok_or(ErrorCode::ClusterUnavailable)?;
194
195        let mut leader_addr = None;
196        let mut followers = Vec::new();
197
198        for node in &membership.nodes {
199            let addr = address_str(&node.address);
200            debug!(
201                "parse_cluster_metadata, node_id: {}, addr: {:?}",
202                node.id, &addr
203            );
204            if node.id == leader_id {
205                leader_addr = Some(addr);
206            } else {
207                followers.push(addr);
208            }
209        }
210
211        leader_addr
212            .ok_or(ErrorCode::ClusterUnavailable.into())
213            .map(|addr| (addr, followers))
214    }
215}