1use crate::cluster::{Cluster, ServerNode, ServerType};
19use crate::error::{Error, Result};
20use crate::metadata::{PhysicalTablePath, TableBucket, TablePath};
21use crate::proto::MetadataResponse;
22use crate::rpc::message::UpdateMetadataRequest;
23use crate::rpc::{RpcClient, ServerConnection};
24use log::info;
25use parking_lot::RwLock;
26use std::collections::HashSet;
27use std::net::{SocketAddr, ToSocketAddrs};
28use std::sync::Arc;
29
30#[derive(Default)]
31pub struct Metadata {
32 cluster: RwLock<Arc<Cluster>>,
33 connections: Arc<RpcClient>,
34 bootstrap: Arc<str>,
35}
36
37impl Metadata {
38 pub async fn new(bootstrap: &str, connections: Arc<RpcClient>) -> Result<Self> {
39 let cluster = Self::init_cluster(bootstrap, connections.clone()).await?;
40 Ok(Metadata {
41 cluster: RwLock::new(Arc::new(cluster)),
42 connections,
43 bootstrap: bootstrap.into(),
44 })
45 }
46
47 fn parse_bootstrap(boot_strap: &str) -> Result<SocketAddr> {
48 let addrs = boot_strap
50 .to_socket_addrs()
51 .map_err(|e| Error::IllegalArgument {
52 message: format!("Invalid bootstrap address '{boot_strap}': {e}"),
53 })?;
54
55 let mut ipv6_candidate: Option<SocketAddr> = None;
57 for addr in addrs {
58 if addr.is_ipv4() {
59 return Ok(addr);
60 }
61 if ipv6_candidate.is_none() {
62 ipv6_candidate = Some(addr);
63 }
64 }
65
66 let addr = ipv6_candidate.ok_or_else(|| Error::IllegalArgument {
67 message: format!("Unable to resolve bootstrap address '{boot_strap}'"),
68 })?;
69 Ok(addr)
70 }
71
72 async fn init_cluster(boot_strap: &str, connections: Arc<RpcClient>) -> Result<Cluster> {
73 let socket_address = Self::parse_bootstrap(boot_strap)?;
74 let server_node = ServerNode::new(
75 -1,
76 socket_address.ip().to_string(),
77 socket_address.port() as u32,
78 ServerType::CoordinatorServer,
79 );
80 let con = connections.get_connection(&server_node).await?;
81
82 let response = con
83 .request(UpdateMetadataRequest::new(
84 &HashSet::default(),
85 &HashSet::new(),
86 vec![],
87 ))
88 .await?;
89 Cluster::from_metadata_response(response, None)
90 }
91
92 pub(crate) async fn reinit_cluster(&self) -> Result<()> {
93 let cluster = Self::init_cluster(&self.bootstrap, self.connections.clone()).await?;
94 *self.cluster.write() = cluster.into();
95 Ok(())
96 }
97
98 pub fn invalidate_server(&self, server_id: &i32, table_ids: Vec<i64>) {
99 let mut cluster_guard = self.cluster.write();
102 let updated_cluster = cluster_guard.invalidate_server(server_id, table_ids);
103 *cluster_guard = Arc::new(updated_cluster);
104 }
105
106 pub fn invalidate_physical_table_meta(
107 &self,
108 physical_tables_to_invalid: &HashSet<PhysicalTablePath>,
109 ) {
110 let mut cluster_guard = self.cluster.write();
111 let updated_cluster =
112 cluster_guard.invalidate_physical_table_meta(physical_tables_to_invalid);
113 *cluster_guard = Arc::new(updated_cluster);
114 }
115
116 pub async fn update(&self, metadata_response: MetadataResponse) -> Result<()> {
117 let origin_cluster = self.cluster.read().clone();
118 let new_cluster =
119 Cluster::from_metadata_response(metadata_response, Some(&origin_cluster))?;
120 let mut cluster = self.cluster.write();
121 *cluster = Arc::new(new_cluster);
122 Ok(())
123 }
124
125 pub async fn update_tables_metadata(
126 &self,
127 table_paths: &HashSet<&TablePath>,
128 physical_table_paths: &HashSet<&Arc<PhysicalTablePath>>,
129 partition_ids: Vec<i64>,
130 ) -> Result<()> {
131 let maybe_server = {
132 let guard = self.cluster.read();
133 guard.get_one_available_server().cloned()
134 };
135
136 let server = match maybe_server {
137 Some(s) => s,
138 None => {
139 info!(
140 "No available tablet server to update metadata, attempting to re-initialize cluster using bootstrap server."
141 );
142 self.reinit_cluster().await?;
143 return Ok(());
144 }
145 };
146
147 let conn = self.connections.get_connection(&server).await?;
148
149 let response = conn
150 .request(UpdateMetadataRequest::new(
151 table_paths,
152 physical_table_paths,
153 partition_ids,
154 ))
155 .await?;
156 self.update(response).await?;
157 Ok(())
158 }
159
160 pub async fn update_table_metadata(&self, table_path: &TablePath) -> Result<()> {
161 self.update_tables_metadata(&HashSet::from([table_path]), &HashSet::new(), vec![])
162 .await
163 }
164
165 pub async fn update_physical_table_metadata(
166 &self,
167 physical_table_paths: &[Arc<PhysicalTablePath>],
168 ) -> Result<()> {
169 let mut update_table_paths = HashSet::new();
170 let mut update_partition_paths = HashSet::new();
171 for physical_table_path in physical_table_paths {
172 match physical_table_path.get_partition_name() {
173 Some(_) => {
174 update_partition_paths.insert(physical_table_path);
175 }
176 None => {
177 update_table_paths.insert(physical_table_path.get_table_path());
178 }
179 }
180 }
181
182 self.update_tables_metadata(&update_table_paths, &update_partition_paths, vec![])
183 .await
184 }
185
186 pub async fn check_and_update_table_metadata(&self, table_paths: &[TablePath]) -> Result<()> {
187 let cluster_binding = self.cluster.read().clone();
188 let need_update_table_paths: HashSet<&TablePath> = table_paths
189 .iter()
190 .filter(|table_path| cluster_binding.opt_get_table(table_path).is_none())
191 .collect();
192
193 if !need_update_table_paths.is_empty() {
194 self.update_tables_metadata(&need_update_table_paths, &HashSet::new(), vec![])
195 .await?;
196 }
197 Ok(())
198 }
199
200 pub async fn get_connection(&self, server_node: &ServerNode) -> Result<ServerConnection> {
201 let result = self.connections.get_connection(server_node).await?;
202 Ok(result)
203 }
204
205 pub fn get_cluster(&self) -> Arc<Cluster> {
206 let guard = self.cluster.read();
207 guard.clone()
208 }
209
210 const MAX_RETRY_TIMES: u8 = 3;
211
212 pub async fn leader_for(
213 &self,
214 table_path: &TablePath,
215 table_bucket: &TableBucket,
216 ) -> Result<Option<ServerNode>> {
217 let leader = self.get_leader_for(table_bucket);
218
219 if leader.is_some() {
220 Ok(leader)
221 } else {
222 for _ in 0..Self::MAX_RETRY_TIMES {
223 if let Some(partition_id) = table_bucket.partition_id() {
224 self.update_tables_metadata(
225 &HashSet::from([table_path]),
226 &HashSet::new(),
227 vec![partition_id],
228 )
229 .await?;
230 } else {
231 self.update_tables_metadata(
232 &HashSet::from([table_path]),
233 &HashSet::new(),
234 vec![],
235 )
236 .await?;
237 }
238
239 let cluster = self.cluster.read();
240 let leader = cluster.leader_for(table_bucket);
241
242 if leader.is_some() {
243 return Ok(leader.cloned());
244 }
245 }
246
247 Ok(None)
248 }
249 }
250
251 fn get_leader_for(&self, table_bucket: &TableBucket) -> Option<ServerNode> {
252 let cluster = self.cluster.read();
253 cluster.leader_for(table_bucket).cloned()
254 }
255}
256
257#[cfg(test)]
258impl Metadata {
259 pub(crate) fn new_for_test(cluster: Arc<Cluster>) -> Self {
260 Metadata {
261 cluster: RwLock::new(cluster),
262 connections: Arc::new(RpcClient::new()),
263 bootstrap: Arc::from(""),
264 }
265 }
266}
267
268#[cfg(test)]
269mod tests {
270 use super::*;
271 use crate::metadata::{TableBucket, TablePath};
272 use crate::test_utils::build_cluster_arc;
273
274 #[tokio::test]
275 async fn leader_for_returns_server() {
276 let table_path = TablePath::new("db".to_string(), "tbl".to_string());
277 let cluster = build_cluster_arc(&table_path, 1, 1);
278 let metadata = Metadata::new_for_test(cluster);
279 let leader = metadata
280 .leader_for(&table_path, &TableBucket::new(1, 0))
281 .await
282 .unwrap()
283 .expect("leader");
284 assert_eq!(leader.id(), 1);
285 }
286
287 #[test]
288 fn invalidate_server_removes_leader() {
289 let table_path = TablePath::new("db".to_string(), "tbl".to_string());
290 let cluster = build_cluster_arc(&table_path, 1, 1);
291 let metadata = Metadata::new_for_test(cluster);
292 metadata.invalidate_server(&1, vec![1]);
293 let cluster = metadata.get_cluster();
294 assert!(cluster.get_tablet_server(1).is_none());
295 }
296
297 #[test]
298 fn parse_bootstrap_variants() {
299 let addr = Metadata::parse_bootstrap("127.0.0.1:8080").unwrap();
301 assert_eq!(addr.port(), 8080);
302
303 let addr = Metadata::parse_bootstrap("localhost:9090").unwrap();
305 assert_eq!(addr.port(), 9090);
306
307 let addr = Metadata::parse_bootstrap("[::1]:8080").unwrap();
309 assert_eq!(addr.port(), 8080);
310
311 assert!(Metadata::parse_bootstrap("localhost").is_err());
313
314 assert!(Metadata::parse_bootstrap("localhost:99999").is_err());
316
317 assert!(Metadata::parse_bootstrap("").is_err());
319
320 assert!(Metadata::parse_bootstrap("invalid_address").is_err());
322 }
323}