d_engine/client/
pool.rs

1use log::debug;
2use log::error;
3use log::info;
4use tonic::codec::CompressionEncoding;
5use tonic::transport::Channel;
6use tonic::transport::Endpoint;
7
8use crate::proto::rpc_service_client::RpcServiceClient;
9use crate::proto::MetadataRequest;
10use crate::proto::NodeMeta;
11use crate::ClientConfig;
12use crate::Error;
13use crate::Result;
14
15/// Manages connections to cluster nodes
16///
17/// Implements connection pooling and leader/follower routing.
18/// Automatically handles connection health checks and failover.
19#[derive(Clone)]
20pub struct ConnectionPool {
21    // Tonic's Channel is thread-safe and reference-counted.
22    pub(super) leader_conn: Channel,
23    pub(super) follower_conns: Vec<Channel>,
24    pub(super) config: ClientConfig,
25    pub(super) members: Vec<NodeMeta>,
26}
27
28impl ConnectionPool {
29    /// Creates new connection pool with bootstrap nodes
30    ///
31    /// # Implementation Details
32    /// 1. Discovers cluster metadata
33    /// 2. Establishes leader connection
34    /// 3. Creates follower connections
35    pub(crate) async fn new(
36        endpoints: Vec<String>,
37        config: ClientConfig,
38    ) -> Result<Self> {
39        let members = Self::load_cluster_metadata(&endpoints, &config).await?;
40        info!("Retrieved members: {:?}", &members);
41        let (leader_addr, followers) = Self::parse_cluster_metadata(&members)?;
42
43        let leader_conn = Self::create_channel(leader_addr, &config).await?;
44        let mut follower_conns = Vec::new();
45
46        // Build follower connections asynchronously
47        let follower_futures = followers.into_iter().map(|addr| Self::create_channel(addr, &config));
48        let connections = futures::future::join_all(follower_futures).await;
49
50        for conn in connections {
51            if let Ok(channel) = conn {
52                follower_conns.push(channel);
53            }
54        }
55        Ok(Self {
56            leader_conn,
57            follower_conns,
58            config,
59            members,
60        })
61    }
62
63    pub(super) async fn create_channel(
64        addr: String,
65        config: &ClientConfig,
66    ) -> Result<Channel> {
67        debug!("create_channel, addr = {:?}", &addr);
68        Endpoint::try_from(addr)?
69            .connect_timeout(config.connect_timeout)
70            .timeout(config.request_timeout)
71            .tcp_keepalive(Some(config.tcp_keepalive))
72            .http2_keep_alive_interval(config.http2_keepalive_interval)
73            .keep_alive_timeout(config.http2_keepalive_timeout)
74            .connect()
75            .await
76            .map_err(Into::into)
77    }
78    /// Retrieves active leader connection
79    ///
80    /// Used for all write operations and linear reads
81    pub(crate) fn get_leader(&self) -> Channel {
82        self.leader_conn.clone()
83    }
84
85    pub(crate) fn get_all_channels(&self) -> Vec<Channel> {
86        let mut cloned = self.follower_conns.clone();
87        cloned.push(self.leader_conn.clone());
88        cloned
89    }
90
91    pub(crate) fn get_all_members(&self) -> Vec<NodeMeta> {
92        self.members.clone()
93    }
94
95    /// Discover cluster metadata by probing nodes
96    pub(super) async fn load_cluster_metadata(
97        endpoints: &[String],
98        config: &ClientConfig,
99    ) -> Result<Vec<NodeMeta>> {
100        for addr in endpoints {
101            match Self::create_channel(addr.clone(), config).await {
102                Ok(channel) => {
103                    let mut client = RpcServiceClient::new(channel);
104                    if config.enable_compression {
105                        client = client
106                            .send_compressed(CompressionEncoding::Gzip)
107                            .accept_compressed(CompressionEncoding::Gzip);
108                    }
109                    match client
110                        .get_cluster_metadata(tonic::Request::new(MetadataRequest {}))
111                        .await
112                    {
113                        Ok(response) => return Ok(response.into_inner().nodes),
114                        Err(e) => {
115                            error!("get_cluster_metadata: {:?}", e);
116                            // Try next node
117                            continue;
118                        }
119                    }
120                }
121                Err(e) => {
122                    error!("load_cluster_metadata from addr: {:?}, failed: {:?}", &addr, e);
123                    continue;
124                } // Connection failed, try next
125            }
126        }
127        Err(Error::ClusterMembershipNotFound)
128    }
129
130    /// Extract leader address from metadata
131    pub(super) fn parse_cluster_metadata(nodes: &Vec<NodeMeta>) -> Result<(String, Vec<String>)> {
132        let mut leader_addr = None;
133        let mut followers = Vec::new();
134
135        for node in nodes {
136            let addr = format!("http://{}:{}", node.ip, node.port);
137            debug!("parse_cluster_metadata, addr: {:?}", &addr);
138            if node.role == crate::LEADER {
139                leader_addr = Some(addr);
140            } else {
141                followers.push(addr);
142            }
143        }
144
145        leader_addr.map(|addr| (addr, followers)).ok_or(Error::NoLeaderFound)
146    }
147}