Skip to main content

fluss/client/
metadata.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::cluster::{Cluster, ServerNode, ServerType};
19use crate::error::{Error, Result};
20use crate::metadata::{PhysicalTablePath, TableBucket, TablePath};
21use crate::proto::MetadataResponse;
22use crate::rpc::message::UpdateMetadataRequest;
23use crate::rpc::{RpcClient, ServerConnection};
24use log::info;
25use parking_lot::RwLock;
26use std::collections::HashSet;
27use std::net::{SocketAddr, ToSocketAddrs};
28use std::sync::Arc;
29
30#[derive(Default)]
31pub struct Metadata {
32    cluster: RwLock<Arc<Cluster>>,
33    connections: Arc<RpcClient>,
34    bootstrap: Arc<str>,
35}
36
37impl Metadata {
38    pub async fn new(bootstrap: &str, connections: Arc<RpcClient>) -> Result<Self> {
39        let cluster = Self::init_cluster(bootstrap, connections.clone()).await?;
40        Ok(Metadata {
41            cluster: RwLock::new(Arc::new(cluster)),
42            connections,
43            bootstrap: bootstrap.into(),
44        })
45    }
46
47    fn parse_bootstrap(boot_strap: &str) -> Result<SocketAddr> {
48        // Resolve all socket addresses and deterministically choose one.
49        let addrs = boot_strap
50            .to_socket_addrs()
51            .map_err(|e| Error::IllegalArgument {
52                message: format!("Invalid bootstrap address '{boot_strap}': {e}"),
53            })?;
54
55        // Prefer IPv4 addresses; if none are available, fall back to the first IPv6.
56        let mut ipv6_candidate: Option<SocketAddr> = None;
57        for addr in addrs {
58            if addr.is_ipv4() {
59                return Ok(addr);
60            }
61            if ipv6_candidate.is_none() {
62                ipv6_candidate = Some(addr);
63            }
64        }
65
66        let addr = ipv6_candidate.ok_or_else(|| Error::IllegalArgument {
67            message: format!("Unable to resolve bootstrap address '{boot_strap}'"),
68        })?;
69        Ok(addr)
70    }
71
72    async fn init_cluster(boot_strap: &str, connections: Arc<RpcClient>) -> Result<Cluster> {
73        let socket_address = Self::parse_bootstrap(boot_strap)?;
74        let server_node = ServerNode::new(
75            -1,
76            socket_address.ip().to_string(),
77            socket_address.port() as u32,
78            ServerType::CoordinatorServer,
79        );
80        let con = connections.get_connection(&server_node).await?;
81
82        let response = con
83            .request(UpdateMetadataRequest::new(
84                &HashSet::default(),
85                &HashSet::new(),
86                vec![],
87            ))
88            .await?;
89        Cluster::from_metadata_response(response, None)
90    }
91
92    pub(crate) async fn reinit_cluster(&self) -> Result<()> {
93        let cluster = Self::init_cluster(&self.bootstrap, self.connections.clone()).await?;
94        *self.cluster.write() = cluster.into();
95        Ok(())
96    }
97
98    pub fn invalidate_server(&self, server_id: &i32, table_ids: Vec<i64>) {
99        // Take a write lock for the entire operation to avoid races between
100        // reading the current cluster state and writing back the updated one.
101        let mut cluster_guard = self.cluster.write();
102        let updated_cluster = cluster_guard.invalidate_server(server_id, table_ids);
103        *cluster_guard = Arc::new(updated_cluster);
104    }
105
106    pub fn invalidate_physical_table_meta(
107        &self,
108        physical_tables_to_invalid: &HashSet<PhysicalTablePath>,
109    ) {
110        let mut cluster_guard = self.cluster.write();
111        let updated_cluster =
112            cluster_guard.invalidate_physical_table_meta(physical_tables_to_invalid);
113        *cluster_guard = Arc::new(updated_cluster);
114    }
115
116    pub async fn update(&self, metadata_response: MetadataResponse) -> Result<()> {
117        let origin_cluster = self.cluster.read().clone();
118        let new_cluster =
119            Cluster::from_metadata_response(metadata_response, Some(&origin_cluster))?;
120        let mut cluster = self.cluster.write();
121        *cluster = Arc::new(new_cluster);
122        Ok(())
123    }
124
125    pub async fn update_tables_metadata(
126        &self,
127        table_paths: &HashSet<&TablePath>,
128        physical_table_paths: &HashSet<&Arc<PhysicalTablePath>>,
129        partition_ids: Vec<i64>,
130    ) -> Result<()> {
131        let maybe_server = {
132            let guard = self.cluster.read();
133            guard.get_one_available_server().cloned()
134        };
135
136        let server = match maybe_server {
137            Some(s) => s,
138            None => {
139                info!(
140                    "No available tablet server to update metadata, attempting to re-initialize cluster using bootstrap server."
141                );
142                self.reinit_cluster().await?;
143                return Ok(());
144            }
145        };
146
147        let conn = self.connections.get_connection(&server).await?;
148
149        let response = conn
150            .request(UpdateMetadataRequest::new(
151                table_paths,
152                physical_table_paths,
153                partition_ids,
154            ))
155            .await?;
156        self.update(response).await?;
157        Ok(())
158    }
159
160    pub async fn update_table_metadata(&self, table_path: &TablePath) -> Result<()> {
161        self.update_tables_metadata(&HashSet::from([table_path]), &HashSet::new(), vec![])
162            .await
163    }
164
165    pub async fn update_physical_table_metadata(
166        &self,
167        physical_table_paths: &[Arc<PhysicalTablePath>],
168    ) -> Result<()> {
169        let mut update_table_paths = HashSet::new();
170        let mut update_partition_paths = HashSet::new();
171        for physical_table_path in physical_table_paths {
172            match physical_table_path.get_partition_name() {
173                Some(_) => {
174                    update_partition_paths.insert(physical_table_path);
175                }
176                None => {
177                    update_table_paths.insert(physical_table_path.get_table_path());
178                }
179            }
180        }
181
182        self.update_tables_metadata(&update_table_paths, &update_partition_paths, vec![])
183            .await
184    }
185
186    pub async fn check_and_update_table_metadata(&self, table_paths: &[TablePath]) -> Result<()> {
187        let cluster_binding = self.cluster.read().clone();
188        let need_update_table_paths: HashSet<&TablePath> = table_paths
189            .iter()
190            .filter(|table_path| cluster_binding.opt_get_table(table_path).is_none())
191            .collect();
192
193        if !need_update_table_paths.is_empty() {
194            self.update_tables_metadata(&need_update_table_paths, &HashSet::new(), vec![])
195                .await?;
196        }
197        Ok(())
198    }
199
200    pub async fn get_connection(&self, server_node: &ServerNode) -> Result<ServerConnection> {
201        let result = self.connections.get_connection(server_node).await?;
202        Ok(result)
203    }
204
205    pub fn get_cluster(&self) -> Arc<Cluster> {
206        let guard = self.cluster.read();
207        guard.clone()
208    }
209
210    const MAX_RETRY_TIMES: u8 = 3;
211
212    pub async fn leader_for(
213        &self,
214        table_path: &TablePath,
215        table_bucket: &TableBucket,
216    ) -> Result<Option<ServerNode>> {
217        let leader = self.get_leader_for(table_bucket);
218
219        if leader.is_some() {
220            Ok(leader)
221        } else {
222            for _ in 0..Self::MAX_RETRY_TIMES {
223                if let Some(partition_id) = table_bucket.partition_id() {
224                    self.update_tables_metadata(
225                        &HashSet::from([table_path]),
226                        &HashSet::new(),
227                        vec![partition_id],
228                    )
229                    .await?;
230                } else {
231                    self.update_tables_metadata(
232                        &HashSet::from([table_path]),
233                        &HashSet::new(),
234                        vec![],
235                    )
236                    .await?;
237                }
238
239                let cluster = self.cluster.read();
240                let leader = cluster.leader_for(table_bucket);
241
242                if leader.is_some() {
243                    return Ok(leader.cloned());
244                }
245            }
246
247            Ok(None)
248        }
249    }
250
251    fn get_leader_for(&self, table_bucket: &TableBucket) -> Option<ServerNode> {
252        let cluster = self.cluster.read();
253        cluster.leader_for(table_bucket).cloned()
254    }
255}
256
257#[cfg(test)]
258impl Metadata {
259    pub(crate) fn new_for_test(cluster: Arc<Cluster>) -> Self {
260        Metadata {
261            cluster: RwLock::new(cluster),
262            connections: Arc::new(RpcClient::new()),
263            bootstrap: Arc::from(""),
264        }
265    }
266}
267
268#[cfg(test)]
269mod tests {
270    use super::*;
271    use crate::metadata::{TableBucket, TablePath};
272    use crate::test_utils::build_cluster_arc;
273
274    #[tokio::test]
275    async fn leader_for_returns_server() {
276        let table_path = TablePath::new("db".to_string(), "tbl".to_string());
277        let cluster = build_cluster_arc(&table_path, 1, 1);
278        let metadata = Metadata::new_for_test(cluster);
279        let leader = metadata
280            .leader_for(&table_path, &TableBucket::new(1, 0))
281            .await
282            .unwrap()
283            .expect("leader");
284        assert_eq!(leader.id(), 1);
285    }
286
287    #[test]
288    fn invalidate_server_removes_leader() {
289        let table_path = TablePath::new("db".to_string(), "tbl".to_string());
290        let cluster = build_cluster_arc(&table_path, 1, 1);
291        let metadata = Metadata::new_for_test(cluster);
292        metadata.invalidate_server(&1, vec![1]);
293        let cluster = metadata.get_cluster();
294        assert!(cluster.get_tablet_server(1).is_none());
295    }
296
297    #[test]
298    fn parse_bootstrap_variants() {
299        // valid IP
300        let addr = Metadata::parse_bootstrap("127.0.0.1:8080").unwrap();
301        assert_eq!(addr.port(), 8080);
302
303        // valid hostname
304        let addr = Metadata::parse_bootstrap("localhost:9090").unwrap();
305        assert_eq!(addr.port(), 9090);
306
307        // valid IPv6 address
308        let addr = Metadata::parse_bootstrap("[::1]:8080").unwrap();
309        assert_eq!(addr.port(), 8080);
310
311        // invalid input: missing port
312        assert!(Metadata::parse_bootstrap("localhost").is_err());
313
314        // invalid input: out-of-range port
315        assert!(Metadata::parse_bootstrap("localhost:99999").is_err());
316
317        // invalid input: empty string
318        assert!(Metadata::parse_bootstrap("").is_err());
319
320        // invalid input: nonsensical address
321        assert!(Metadata::parse_bootstrap("invalid_address").is_err());
322    }
323}