Skip to main content

fluss/client/
admin.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::client::metadata::Metadata;
19use crate::cluster::ServerNode;
20use crate::metadata::{
21    DatabaseDescriptor, DatabaseInfo, JsonSerde, LakeSnapshot, PartitionInfo, PartitionSpec,
22    PhysicalTablePath, TableBucket, TableDescriptor, TableInfo, TablePath,
23};
24use crate::rpc::message::{
25    CreateDatabaseRequest, CreatePartitionRequest, CreateTableRequest, DatabaseExistsRequest,
26    DropDatabaseRequest, DropPartitionRequest, DropTableRequest, GetDatabaseInfoRequest,
27    GetLatestLakeSnapshotRequest, GetTableRequest, ListDatabasesRequest, ListPartitionInfosRequest,
28    ListTablesRequest, TableExistsRequest,
29};
30use crate::rpc::message::{ListOffsetsRequest, OffsetSpec};
31use crate::rpc::{RpcClient, ServerConnection};
32
33use crate::error::{Error, Result};
34use crate::proto::GetTableInfoResponse;
35use crate::{BucketId, PartitionId, TableId};
36use std::collections::{HashMap, HashSet};
37use std::sync::Arc;
38use tokio::task::JoinHandle;
39
40pub struct FlussAdmin {
41    metadata: Arc<Metadata>,
42    rpc_client: Arc<RpcClient>,
43}
44
45impl FlussAdmin {
46    pub fn new(connections: Arc<RpcClient>, metadata: Arc<Metadata>) -> Self {
47        FlussAdmin {
48            metadata,
49            rpc_client: connections,
50        }
51    }
52
53    async fn admin_gateway(&self) -> Result<ServerConnection> {
54        let cluster = self.metadata.get_cluster();
55        let coordinator =
56            cluster
57                .get_coordinator_server()
58                .ok_or_else(|| Error::UnexpectedError {
59                    message: "Coordinator server not found in cluster metadata".to_string(),
60                    source: None,
61                })?;
62        self.rpc_client.get_connection(coordinator).await
63    }
64
65    pub async fn create_database(
66        &self,
67        database_name: &str,
68        database_descriptor: Option<&DatabaseDescriptor>,
69        ignore_if_exists: bool,
70    ) -> Result<()> {
71        let _response = self
72            .admin_gateway()
73            .await?
74            .request(CreateDatabaseRequest::new(
75                database_name,
76                database_descriptor,
77                ignore_if_exists,
78            )?)
79            .await?;
80        Ok(())
81    }
82
83    pub async fn create_table(
84        &self,
85        table_path: &TablePath,
86        table_descriptor: &TableDescriptor,
87        ignore_if_exists: bool,
88    ) -> Result<()> {
89        let _response = self
90            .admin_gateway()
91            .await?
92            .request(CreateTableRequest::new(
93                table_path,
94                table_descriptor,
95                ignore_if_exists,
96            )?)
97            .await?;
98        Ok(())
99    }
100
101    pub async fn drop_table(
102        &self,
103        table_path: &TablePath,
104        ignore_if_not_exists: bool,
105    ) -> Result<()> {
106        let _response = self
107            .admin_gateway()
108            .await?
109            .request(DropTableRequest::new(table_path, ignore_if_not_exists))
110            .await?;
111        Ok(())
112    }
113
114    pub async fn get_table_info(&self, table_path: &TablePath) -> Result<TableInfo> {
115        let response = self
116            .admin_gateway()
117            .await?
118            .request(GetTableRequest::new(table_path))
119            .await?;
120
121        // force update to avoid stale data in cache
122        self.metadata
123            .update_tables_metadata(&HashSet::from([table_path]), &HashSet::new(), vec![])
124            .await?;
125
126        let GetTableInfoResponse {
127            table_id,
128            schema_id,
129            table_json,
130            created_time,
131            modified_time,
132        } = response;
133        let v: &[u8] = &table_json[..];
134        let table_descriptor =
135            TableDescriptor::deserialize_json(&serde_json::from_slice(v).unwrap())?;
136        Ok(TableInfo::of(
137            table_path.clone(),
138            table_id,
139            schema_id,
140            table_descriptor,
141            created_time,
142            modified_time,
143        ))
144    }
145
146    /// List all tables in the given database
147    pub async fn list_tables(&self, database_name: &str) -> Result<Vec<String>> {
148        let response = self
149            .admin_gateway()
150            .await?
151            .request(ListTablesRequest::new(database_name))
152            .await?;
153        Ok(response.table_name)
154    }
155
156    /// List all partitions in the given table.
157    pub async fn list_partition_infos(&self, table_path: &TablePath) -> Result<Vec<PartitionInfo>> {
158        self.list_partition_infos_with_spec(table_path, None).await
159    }
160
161    /// List partitions in the given table that match the partial partition spec.
162    pub async fn list_partition_infos_with_spec(
163        &self,
164        table_path: &TablePath,
165        partial_partition_spec: Option<&PartitionSpec>,
166    ) -> Result<Vec<PartitionInfo>> {
167        let response = self
168            .admin_gateway()
169            .await?
170            .request(ListPartitionInfosRequest::new(
171                table_path,
172                partial_partition_spec,
173            ))
174            .await?;
175        Ok(response.get_partitions_info())
176    }
177
178    /// Create a new partition for a partitioned table.
179    pub async fn create_partition(
180        &self,
181        table_path: &TablePath,
182        partition_spec: &PartitionSpec,
183        ignore_if_exists: bool,
184    ) -> Result<()> {
185        let _response = self
186            .admin_gateway()
187            .await?
188            .request(CreatePartitionRequest::new(
189                table_path,
190                partition_spec,
191                ignore_if_exists,
192            ))
193            .await?;
194        Ok(())
195    }
196
197    /// Drop a partition from a partitioned table.
198    pub async fn drop_partition(
199        &self,
200        table_path: &TablePath,
201        partition_spec: &PartitionSpec,
202        ignore_if_not_exists: bool,
203    ) -> Result<()> {
204        let _response = self
205            .admin_gateway()
206            .await?
207            .request(DropPartitionRequest::new(
208                table_path,
209                partition_spec,
210                ignore_if_not_exists,
211            ))
212            .await?;
213        Ok(())
214    }
215
216    /// Check if a table exists
217    pub async fn table_exists(&self, table_path: &TablePath) -> Result<bool> {
218        let response = self
219            .admin_gateway()
220            .await?
221            .request(TableExistsRequest::new(table_path))
222            .await?;
223        Ok(response.exists)
224    }
225
226    /// Drop a database
227    pub async fn drop_database(
228        &self,
229        database_name: &str,
230        ignore_if_not_exists: bool,
231        cascade: bool,
232    ) -> Result<()> {
233        let _response = self
234            .admin_gateway()
235            .await?
236            .request(DropDatabaseRequest::new(
237                database_name,
238                ignore_if_not_exists,
239                cascade,
240            ))
241            .await?;
242        Ok(())
243    }
244
245    /// List all databases
246    pub async fn list_databases(&self) -> Result<Vec<String>> {
247        let response = self
248            .admin_gateway()
249            .await?
250            .request(ListDatabasesRequest::new())
251            .await?;
252        Ok(response.database_name)
253    }
254
255    /// Check if a database exists
256    pub async fn database_exists(&self, database_name: &str) -> Result<bool> {
257        let response = self
258            .admin_gateway()
259            .await?
260            .request(DatabaseExistsRequest::new(database_name))
261            .await?;
262        Ok(response.exists)
263    }
264
265    /// Get database information
266    pub async fn get_database_info(&self, database_name: &str) -> Result<DatabaseInfo> {
267        let request = GetDatabaseInfoRequest::new(database_name);
268        let response = self.admin_gateway().await?.request(request).await?;
269
270        // Convert proto response to DatabaseInfo
271        let database_descriptor = DatabaseDescriptor::from_json_bytes(&response.database_json)?;
272
273        Ok(DatabaseInfo::new(
274            database_name.to_string(),
275            database_descriptor,
276            response.created_time,
277            response.modified_time,
278        ))
279    }
280
281    /// Get all alive server nodes in the cluster, including the coordinator
282    /// and all tablet servers. Refreshes cluster metadata before returning.
283    pub async fn get_server_nodes(&self) -> Result<Vec<ServerNode>> {
284        self.metadata.reinit_cluster().await?;
285        Ok(self.metadata.get_cluster().get_server_nodes())
286    }
287
288    /// Get the latest lake snapshot for a table
289    pub async fn get_latest_lake_snapshot(&self, table_path: &TablePath) -> Result<LakeSnapshot> {
290        let response = self
291            .admin_gateway()
292            .await?
293            .request(GetLatestLakeSnapshotRequest::new(table_path))
294            .await?;
295
296        // Convert proto response to LakeSnapshot
297        let mut table_buckets_offset = HashMap::new();
298        for bucket_snapshot in response.bucket_snapshots {
299            let table_bucket = TableBucket::new_with_partition(
300                response.table_id,
301                bucket_snapshot.partition_id,
302                bucket_snapshot.bucket_id,
303            );
304            if let Some(log_offset) = bucket_snapshot.log_offset {
305                table_buckets_offset.insert(table_bucket, log_offset);
306            }
307        }
308
309        Ok(LakeSnapshot::new(
310            response.snapshot_id,
311            table_buckets_offset,
312        ))
313    }
314
315    /// List offset for the specified buckets. This operation enables to find the beginning offset,
316    /// end offset as well as the offset matching a timestamp in buckets.
317    pub async fn list_offsets(
318        &self,
319        table_path: &TablePath,
320        buckets_id: &[BucketId],
321        offset_spec: OffsetSpec,
322    ) -> Result<HashMap<i32, i64>> {
323        self.do_list_offsets(table_path, None, buckets_id, offset_spec)
324            .await
325    }
326
327    /// List offset for the specified buckets in a partition. This operation enables to find
328    /// the beginning offset, end offset as well as the offset matching a timestamp in buckets.
329    pub async fn list_partition_offsets(
330        &self,
331        table_path: &TablePath,
332        partition_name: &str,
333        buckets_id: &[BucketId],
334        offset_spec: OffsetSpec,
335    ) -> Result<HashMap<i32, i64>> {
336        self.do_list_offsets(table_path, Some(partition_name), buckets_id, offset_spec)
337            .await
338    }
339
340    async fn do_list_offsets(
341        &self,
342        table_path: &TablePath,
343        partition_name: Option<&str>,
344        buckets_id: &[BucketId],
345        offset_spec: OffsetSpec,
346    ) -> Result<HashMap<i32, i64>> {
347        if buckets_id.is_empty() {
348            return Err(Error::IllegalArgument {
349                message: "Buckets are empty.".to_string(),
350            });
351        }
352
353        // force to update table metadata like java side
354        self.metadata.update_table_metadata(table_path).await?;
355
356        let cluster = self.metadata.get_cluster();
357        let table_id = cluster.get_table(table_path)?.table_id;
358
359        // Resolve partition_id from partition_name if provided
360        let partition_id = if let Some(name) = partition_name {
361            let physical_table_path = Arc::new(PhysicalTablePath::of_partitioned(
362                Arc::new(table_path.clone()),
363                Some(name.to_string()),
364            ));
365
366            // Update partition metadata like java side
367            self.metadata
368                .update_physical_table_metadata(std::slice::from_ref(&physical_table_path))
369                .await?;
370
371            let cluster = self.metadata.get_cluster();
372            Some(
373                cluster
374                    .get_partition_id(&physical_table_path)
375                    .ok_or_else(|| {
376                        Error::partition_not_exist(format!(
377                            "Partition '{name}' not found for table '{table_path}'"
378                        ))
379                    })?,
380            )
381        } else {
382            None
383        };
384
385        // Prepare requests
386        let requests_by_server =
387            self.prepare_list_offsets_requests(table_id, partition_id, buckets_id, offset_spec)?;
388
389        // Send Requests
390        let response_futures = self.send_list_offsets_request(requests_by_server).await?;
391
392        let mut results = HashMap::new();
393
394        for response_future in response_futures {
395            let offsets = response_future.await.map_err(|e| Error::UnexpectedError {
396                message: "Fail to get result for list offsets.".to_string(),
397                source: Some(Box::new(e)),
398            })?;
399            results.extend(offsets?);
400        }
401        Ok(results)
402    }
403
404    fn prepare_list_offsets_requests(
405        &self,
406        table_id: TableId,
407        partition_id: Option<PartitionId>,
408        buckets: &[BucketId],
409        offset_spec: OffsetSpec,
410    ) -> Result<HashMap<i32, ListOffsetsRequest>> {
411        let cluster = self.metadata.get_cluster();
412        let mut node_for_bucket_list: HashMap<i32, Vec<BucketId>> = HashMap::new();
413
414        for bucket_id in buckets {
415            let table_bucket = TableBucket::new_with_partition(table_id, partition_id, *bucket_id);
416            let leader = cluster.leader_for(&table_bucket).ok_or_else(|| {
417                // todo: consider retry?
418                Error::UnexpectedError {
419                    message: format!("No leader found for table bucket: {table_bucket}."),
420                    source: None,
421                }
422            })?;
423
424            node_for_bucket_list
425                .entry(leader.id())
426                .or_default()
427                .push(*bucket_id);
428        }
429
430        let mut list_offsets_requests = HashMap::new();
431        for (leader_id, bucket_ids) in node_for_bucket_list {
432            let request =
433                ListOffsetsRequest::new(table_id, partition_id, bucket_ids, offset_spec.clone());
434            list_offsets_requests.insert(leader_id, request);
435        }
436        Ok(list_offsets_requests)
437    }
438
439    async fn send_list_offsets_request(
440        &self,
441        request_map: HashMap<i32, ListOffsetsRequest>,
442    ) -> Result<Vec<JoinHandle<Result<HashMap<i32, i64>>>>> {
443        let mut tasks = Vec::new();
444
445        for (leader_id, request) in request_map {
446            let rpc_client = self.rpc_client.clone();
447            let metadata = self.metadata.clone();
448
449            let task = tokio::spawn(async move {
450                let cluster = metadata.get_cluster();
451                let tablet_server = cluster.get_tablet_server(leader_id).ok_or_else(|| {
452                    Error::leader_not_available(format!(
453                        "Tablet server {leader_id} is not found in metadata cache."
454                    ))
455                })?;
456                let connection = rpc_client.get_connection(tablet_server).await?;
457                let list_offsets_response = connection.request(request).await?;
458                list_offsets_response.offsets()
459            });
460            tasks.push(task);
461        }
462        Ok(tasks)
463    }
464}