1use log::debug;
2use log::error;
3use log::info;
4use tonic::codec::CompressionEncoding;
5use tonic::transport::Channel;
6use tonic::transport::Endpoint;
7
8use crate::proto::rpc_service_client::RpcServiceClient;
9use crate::proto::MetadataRequest;
10use crate::proto::NodeMeta;
11use crate::ClientConfig;
12use crate::Error;
13use crate::Result;
14
15#[derive(Clone)]
20pub struct ConnectionPool {
21 pub(super) leader_conn: Channel,
23 pub(super) follower_conns: Vec<Channel>,
24 pub(super) config: ClientConfig,
25 pub(super) members: Vec<NodeMeta>,
26}
27
28impl ConnectionPool {
29 pub(crate) async fn new(
36 endpoints: Vec<String>,
37 config: ClientConfig,
38 ) -> Result<Self> {
39 let members = Self::load_cluster_metadata(&endpoints, &config).await?;
40 info!("Retrieved members: {:?}", &members);
41 let (leader_addr, followers) = Self::parse_cluster_metadata(&members)?;
42
43 let leader_conn = Self::create_channel(leader_addr, &config).await?;
44 let mut follower_conns = Vec::new();
45
46 let follower_futures = followers.into_iter().map(|addr| Self::create_channel(addr, &config));
48 let connections = futures::future::join_all(follower_futures).await;
49
50 for conn in connections {
51 if let Ok(channel) = conn {
52 follower_conns.push(channel);
53 }
54 }
55 Ok(Self {
56 leader_conn,
57 follower_conns,
58 config,
59 members,
60 })
61 }
62
63 pub(super) async fn create_channel(
64 addr: String,
65 config: &ClientConfig,
66 ) -> Result<Channel> {
67 debug!("create_channel, addr = {:?}", &addr);
68 Endpoint::try_from(addr)?
69 .connect_timeout(config.connect_timeout)
70 .timeout(config.request_timeout)
71 .tcp_keepalive(Some(config.tcp_keepalive))
72 .http2_keep_alive_interval(config.http2_keepalive_interval)
73 .keep_alive_timeout(config.http2_keepalive_timeout)
74 .connect()
75 .await
76 .map_err(Into::into)
77 }
78 pub(crate) fn get_leader(&self) -> Channel {
82 self.leader_conn.clone()
83 }
84
85 pub(crate) fn get_all_channels(&self) -> Vec<Channel> {
86 let mut cloned = self.follower_conns.clone();
87 cloned.push(self.leader_conn.clone());
88 cloned
89 }
90
91 pub(crate) fn get_all_members(&self) -> Vec<NodeMeta> {
92 self.members.clone()
93 }
94
95 pub(super) async fn load_cluster_metadata(
97 endpoints: &[String],
98 config: &ClientConfig,
99 ) -> Result<Vec<NodeMeta>> {
100 for addr in endpoints {
101 match Self::create_channel(addr.clone(), config).await {
102 Ok(channel) => {
103 let mut client = RpcServiceClient::new(channel);
104 if config.enable_compression {
105 client = client
106 .send_compressed(CompressionEncoding::Gzip)
107 .accept_compressed(CompressionEncoding::Gzip);
108 }
109 match client
110 .get_cluster_metadata(tonic::Request::new(MetadataRequest {}))
111 .await
112 {
113 Ok(response) => return Ok(response.into_inner().nodes),
114 Err(e) => {
115 error!("get_cluster_metadata: {:?}", e);
116 continue;
118 }
119 }
120 }
121 Err(e) => {
122 error!("load_cluster_metadata from addr: {:?}, failed: {:?}", &addr, e);
123 continue;
124 } }
126 }
127 Err(Error::ClusterMembershipNotFound)
128 }
129
130 pub(super) fn parse_cluster_metadata(nodes: &Vec<NodeMeta>) -> Result<(String, Vec<String>)> {
132 let mut leader_addr = None;
133 let mut followers = Vec::new();
134
135 for node in nodes {
136 let addr = format!("http://{}:{}", node.ip, node.port);
137 debug!("parse_cluster_metadata, addr: {:?}", &addr);
138 if node.role == crate::LEADER {
139 leader_addr = Some(addr);
140 } else {
141 followers.push(addr);
142 }
143 }
144
145 leader_addr.map(|addr| (addr, followers)).ok_or(Error::NoLeaderFound)
146 }
147}