Skip to main content

d_engine_client/
pool.rs

1use std::time::Duration;
2
3use d_engine_proto::error::ErrorCode;
4use d_engine_proto::server::cluster::ClusterMembership;
5use d_engine_proto::server::cluster::MetadataRequest;
6use d_engine_proto::server::cluster::NodeMeta;
7use d_engine_proto::server::cluster::cluster_management_service_client::ClusterManagementServiceClient;
8use tonic::codec::CompressionEncoding;
9use tonic::transport::Channel;
10use tonic::transport::Endpoint;
11use tracing::debug;
12use tracing::info;
13
14use super::ClientApiError;
15use crate::ClientConfig;
16use crate::utils::address_str;
17
18/// Manages connections to cluster nodes
19///
20/// Implements connection pooling and leader/follower routing.
21/// Automatically handles connection health checks and failover.
22#[derive(Clone)]
23pub struct ConnectionPool {
24    // Tonic's Channel is thread-safe and reference-counted.
25    pub(super) leader_conn: Channel,
26    pub(super) follower_conns: Vec<Channel>,
27    pub(super) config: ClientConfig,
28    pub(super) members: Vec<NodeMeta>,
29    pub(super) endpoints: Vec<String>,
30    pub(super) current_leader_id: Option<u32>,
31}
32
33impl ConnectionPool {
34    /// Creates new connection pool with bootstrap nodes
35    ///
36    /// # Implementation Details
37    /// 1. Discovers cluster metadata
38    /// 2. Establishes leader connection
39    /// 3. Creates follower connections
40    pub(crate) async fn create(
41        endpoints: Vec<String>,
42        config: ClientConfig,
43    ) -> std::result::Result<Self, ClientApiError> {
44        let (leader_conn, follower_conns, members, current_leader_id) =
45            Self::build_connections(&endpoints, &config).await?;
46
47        Ok(Self {
48            leader_conn,
49            follower_conns,
50            config,
51            members,
52            endpoints,
53            current_leader_id,
54        })
55    }
56
57    /// Refreshes cluster connections by reloading metadata and rebuilding channels
58    ///
59    /// # Behavior
60    /// 1. Discovers fresh cluster metadata from provided endpoints
61    /// 2. Re-establishes leader connection using latest config
62    /// 3. Rebuilds follower connections pool
63    #[allow(dead_code)]
64    pub(crate) async fn refresh(
65        &mut self,
66        new_endpoints: Option<Vec<String>>,
67    ) -> std::result::Result<(), ClientApiError> {
68        let endpoints_to_use = new_endpoints.unwrap_or_else(|| self.endpoints.clone());
69        let (leader_conn, follower_conns, members, current_leader_id) =
70            Self::build_connections(&endpoints_to_use, &self.config).await?;
71
72        // Only update endpoints and connections after successful build
73        self.endpoints = endpoints_to_use;
74        self.leader_conn = leader_conn;
75        self.follower_conns = follower_conns;
76        self.members = members;
77        self.current_leader_id = current_leader_id;
78
79        Ok(())
80    }
81
82    /// Create the core logic of the connection pool (extract common code)
83    async fn build_connections(
84        endpoints: &[String],
85        config: &ClientConfig,
86    ) -> std::result::Result<(Channel, Vec<Channel>, Vec<NodeMeta>, Option<u32>), ClientApiError>
87    {
88        // 1. Load cluster metadata + establish verified leader channel atomically.
89        //    Returns only when a ready leader is confirmed AND its channel is connectable.
90        let (membership, leader_conn) = Self::load_cluster_metadata(endpoints, config).await?;
91        info!("Cluster members discovered: {:?}", membership.nodes);
92
93        // 2. Parse follower addresses (leader channel already established above)
94        let (_, follower_addrs) = Self::parse_cluster_metadata(&membership)?;
95
96        // 3. Establish follower connections in parallel (best-effort, failures are filtered)
97        let follower_conns = futures::future::join_all(
98            follower_addrs.into_iter().map(|addr| Self::create_channel(addr, config)),
99        )
100        .await
101        .into_iter()
102        .filter_map(std::result::Result::ok)
103        .collect();
104
105        Ok((
106            leader_conn,
107            follower_conns,
108            membership.nodes,
109            membership.current_leader_id,
110        ))
111    }
112
113    pub(super) async fn create_channel(
114        addr: String,
115        config: &ClientConfig,
116    ) -> std::result::Result<Channel, ClientApiError> {
117        debug!("create_channel, addr = {:?}", &addr);
118        Endpoint::try_from(addr)?
119            .connect_timeout(config.connect_timeout)
120            .timeout(config.request_timeout)
121            .tcp_keepalive(Some(config.tcp_keepalive))
122            .http2_keep_alive_interval(config.http2_keepalive_interval)
123            .keep_alive_timeout(config.http2_keepalive_timeout)
124            .connect()
125            .await
126            .map_err(Into::into)
127    }
128    /// Retrieves active leader connection
129    ///
130    /// Used for all write operations and linear reads
131    pub(crate) fn get_leader(&self) -> Channel {
132        self.leader_conn.clone()
133    }
134
135    pub(crate) fn get_all_channels(&self) -> Vec<Channel> {
136        let mut cloned = self.follower_conns.clone();
137        cloned.push(self.leader_conn.clone());
138        cloned
139    }
140
141    pub(crate) fn get_all_members(&self) -> Vec<NodeMeta> {
142        self.members.clone()
143    }
144
145    /// Get the current leader ID
146    pub(crate) fn get_leader_id(&self) -> Option<u32> {
147        self.current_leader_id
148    }
149
150    /// Probe a single endpoint and classify the cluster metadata state.
151    ///
152    /// Returns:
153    /// - `Some(Ok(membership))` — leader is known and present in member list (ready)
154    /// - `Some(Err(()))`        — node responded but cluster not yet ready (election in progress
155    ///   or stale leader ID); caller should try next node
156    /// - `None`                 — node unreachable; caller should try next node
157    pub(super) async fn probe_endpoint(
158        addr: &str,
159        config: &ClientConfig,
160    ) -> Option<std::result::Result<ClusterMembership, ()>> {
161        // Use a short connect timeout for probing — dead nodes should fail fast
162        // so the retry loop can move to the next endpoint without burning the
163        // cluster_ready_timeout budget on TCP handshake waits.
164        const MAX_PROBE_CONNECT_MS: u64 = 500;
165        let probe_timeout = config.connect_timeout.min(Duration::from_millis(MAX_PROBE_CONNECT_MS));
166        let channel = Endpoint::try_from(addr.to_string())
167            .ok()?
168            .connect_timeout(probe_timeout)
169            .timeout(config.request_timeout)
170            .connect()
171            .await
172            .ok()?;
173        let mut client = ClusterManagementServiceClient::new(channel);
174        if config.enable_compression {
175            client = client
176                .send_compressed(CompressionEncoding::Gzip)
177                .accept_compressed(CompressionEncoding::Gzip);
178        }
179        let membership = client
180            .get_cluster_metadata(tonic::Request::new(MetadataRequest {}))
181            .await
182            .ok()?
183            .into_inner();
184
185        // Classify: ready only if leader_id is known AND present in member list
186        let ready = membership
187            .current_leader_id
188            .is_some_and(|leader_id| membership.nodes.iter().any(|n| n.id == leader_id));
189
190        if ready {
191            Some(Ok(membership))
192        } else {
193            debug!(
194                "probe_endpoint {}: cluster not ready (leader_id={:?})",
195                addr, membership.current_leader_id
196            );
197            Some(Err(()))
198        }
199    }
200
201    /// Wait until the cluster has a reachable, noop-committed leader, then return
202    /// its metadata and an established leader channel.
203    ///
204    /// Probe and connect are treated as a single atomic step inside one retry loop
205    /// sharing `config.cluster_ready_timeout`. This prevents the TOCTOU race where
206    /// probe succeeds but the leader crashes before the channel is established.
207    ///
208    /// Ready = `current_leader_id` is `Some`, present in member list, AND the leader
209    /// channel can be established (TCP connect succeeds).
210    pub(super) async fn load_cluster_metadata(
211        endpoints: &[String],
212        config: &ClientConfig,
213    ) -> std::result::Result<(ClusterMembership, Channel), ClientApiError> {
214        const RETRY_BACKOFF_MS: u64 = 200;
215
216        let deadline = tokio::time::Instant::now() + config.cluster_ready_timeout;
217
218        loop {
219            // Probe every endpoint in this round
220            for addr in endpoints {
221                if tokio::time::Instant::now() >= deadline {
222                    return Err(ErrorCode::ClusterUnavailable.into());
223                }
224
225                let membership = match Self::probe_endpoint(addr, config).await {
226                    Some(Ok(m)) => m,
227                    Some(Err(())) => continue, // election in progress — try next
228                    None => continue,          // node unreachable — try next
229                };
230
231                // Check deadline after probe to prevent overshoot
232                if tokio::time::Instant::now() >= deadline {
233                    return Err(ErrorCode::ClusterUnavailable.into());
234                }
235
236                // Probe succeeded: try to establish leader channel in the same step.
237                // If connect fails (leader crashed between probe and connect), continue
238                // to next endpoint — do NOT return error (shared deadline handles timeout).
239                let Ok((leader_addr, _)) = Self::parse_cluster_metadata(&membership) else {
240                    // This should be unreachable: probe_endpoint already verified ready state
241                    tracing::warn!(
242                        "parse_cluster_metadata failed after successful probe for {addr}, this is a bug"
243                    );
244                    continue;
245                };
246                match Self::create_channel(leader_addr, config).await {
247                    Ok(leader_conn) => return Ok((membership, leader_conn)),
248                    Err(e) => {
249                        debug!("load_cluster_metadata: leader connect failed ({e:?}), retrying");
250                        continue;
251                    }
252                }
253            }
254
255            // Full round completed with no ready+connectable leader — backoff then retry
256            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
257            if remaining.is_zero() {
258                return Err(ErrorCode::ClusterUnavailable.into());
259            }
260            let backoff = Duration::from_millis(RETRY_BACKOFF_MS).min(remaining);
261            debug!(
262                "load_cluster_metadata: no ready leader found, retrying in {:?}",
263                backoff
264            );
265            tokio::time::sleep(backoff).await;
266        }
267    }
268
269    /// Extract leader address from metadata using current_leader_id.
270    ///
271    /// Precondition: `membership` must be in Ready state (leader_id is Some and
272    /// present in member list). Guaranteed by `load_cluster_metadata`.
273    pub(super) fn parse_cluster_metadata(
274        membership: &ClusterMembership
275    ) -> std::result::Result<(String, Vec<String>), ClientApiError> {
276        let leader_id = membership.current_leader_id.ok_or(ErrorCode::ClusterUnavailable)?;
277
278        let mut leader_addr = None;
279        let mut followers = Vec::new();
280
281        for node in &membership.nodes {
282            let addr = address_str(&node.address);
283            debug!(
284                "parse_cluster_metadata, node_id: {}, addr: {:?}",
285                node.id, &addr
286            );
287            if node.id == leader_id {
288                leader_addr = Some(addr);
289            } else {
290                followers.push(addr);
291            }
292        }
293
294        leader_addr
295            .ok_or(ErrorCode::ClusterUnavailable.into())
296            .map(|addr| (addr, followers))
297    }
298}