1use crate::client::metadata::Metadata;
19use crate::cluster::ServerNode;
20use crate::metadata::{
21 DatabaseDescriptor, DatabaseInfo, JsonSerde, LakeSnapshot, PartitionInfo, PartitionSpec,
22 PhysicalTablePath, TableBucket, TableDescriptor, TableInfo, TablePath,
23};
24use crate::rpc::message::{
25 CreateDatabaseRequest, CreatePartitionRequest, CreateTableRequest, DatabaseExistsRequest,
26 DropDatabaseRequest, DropPartitionRequest, DropTableRequest, GetDatabaseInfoRequest,
27 GetLatestLakeSnapshotRequest, GetTableRequest, ListDatabasesRequest, ListPartitionInfosRequest,
28 ListTablesRequest, TableExistsRequest,
29};
30use crate::rpc::message::{ListOffsetsRequest, OffsetSpec};
31use crate::rpc::{RpcClient, ServerConnection};
32
33use crate::error::{Error, Result};
34use crate::proto::GetTableInfoResponse;
35use crate::{BucketId, PartitionId, TableId};
36use std::collections::{HashMap, HashSet};
37use std::sync::Arc;
38use tokio::task::JoinHandle;
39
40pub struct FlussAdmin {
41 metadata: Arc<Metadata>,
42 rpc_client: Arc<RpcClient>,
43}
44
45impl FlussAdmin {
46 pub fn new(connections: Arc<RpcClient>, metadata: Arc<Metadata>) -> Self {
47 FlussAdmin {
48 metadata,
49 rpc_client: connections,
50 }
51 }
52
53 async fn admin_gateway(&self) -> Result<ServerConnection> {
54 let cluster = self.metadata.get_cluster();
55 let coordinator =
56 cluster
57 .get_coordinator_server()
58 .ok_or_else(|| Error::UnexpectedError {
59 message: "Coordinator server not found in cluster metadata".to_string(),
60 source: None,
61 })?;
62 self.rpc_client.get_connection(coordinator).await
63 }
64
65 pub async fn create_database(
66 &self,
67 database_name: &str,
68 database_descriptor: Option<&DatabaseDescriptor>,
69 ignore_if_exists: bool,
70 ) -> Result<()> {
71 let _response = self
72 .admin_gateway()
73 .await?
74 .request(CreateDatabaseRequest::new(
75 database_name,
76 database_descriptor,
77 ignore_if_exists,
78 )?)
79 .await?;
80 Ok(())
81 }
82
83 pub async fn create_table(
84 &self,
85 table_path: &TablePath,
86 table_descriptor: &TableDescriptor,
87 ignore_if_exists: bool,
88 ) -> Result<()> {
89 let _response = self
90 .admin_gateway()
91 .await?
92 .request(CreateTableRequest::new(
93 table_path,
94 table_descriptor,
95 ignore_if_exists,
96 )?)
97 .await?;
98 Ok(())
99 }
100
101 pub async fn drop_table(
102 &self,
103 table_path: &TablePath,
104 ignore_if_not_exists: bool,
105 ) -> Result<()> {
106 let _response = self
107 .admin_gateway()
108 .await?
109 .request(DropTableRequest::new(table_path, ignore_if_not_exists))
110 .await?;
111 Ok(())
112 }
113
114 pub async fn get_table_info(&self, table_path: &TablePath) -> Result<TableInfo> {
115 let response = self
116 .admin_gateway()
117 .await?
118 .request(GetTableRequest::new(table_path))
119 .await?;
120
121 self.metadata
123 .update_tables_metadata(&HashSet::from([table_path]), &HashSet::new(), vec![])
124 .await?;
125
126 let GetTableInfoResponse {
127 table_id,
128 schema_id,
129 table_json,
130 created_time,
131 modified_time,
132 } = response;
133 let v: &[u8] = &table_json[..];
134 let table_descriptor =
135 TableDescriptor::deserialize_json(&serde_json::from_slice(v).unwrap())?;
136 Ok(TableInfo::of(
137 table_path.clone(),
138 table_id,
139 schema_id,
140 table_descriptor,
141 created_time,
142 modified_time,
143 ))
144 }
145
146 pub async fn list_tables(&self, database_name: &str) -> Result<Vec<String>> {
148 let response = self
149 .admin_gateway()
150 .await?
151 .request(ListTablesRequest::new(database_name))
152 .await?;
153 Ok(response.table_name)
154 }
155
156 pub async fn list_partition_infos(&self, table_path: &TablePath) -> Result<Vec<PartitionInfo>> {
158 self.list_partition_infos_with_spec(table_path, None).await
159 }
160
161 pub async fn list_partition_infos_with_spec(
163 &self,
164 table_path: &TablePath,
165 partial_partition_spec: Option<&PartitionSpec>,
166 ) -> Result<Vec<PartitionInfo>> {
167 let response = self
168 .admin_gateway()
169 .await?
170 .request(ListPartitionInfosRequest::new(
171 table_path,
172 partial_partition_spec,
173 ))
174 .await?;
175 Ok(response.get_partitions_info())
176 }
177
178 pub async fn create_partition(
180 &self,
181 table_path: &TablePath,
182 partition_spec: &PartitionSpec,
183 ignore_if_exists: bool,
184 ) -> Result<()> {
185 let _response = self
186 .admin_gateway()
187 .await?
188 .request(CreatePartitionRequest::new(
189 table_path,
190 partition_spec,
191 ignore_if_exists,
192 ))
193 .await?;
194 Ok(())
195 }
196
197 pub async fn drop_partition(
199 &self,
200 table_path: &TablePath,
201 partition_spec: &PartitionSpec,
202 ignore_if_not_exists: bool,
203 ) -> Result<()> {
204 let _response = self
205 .admin_gateway()
206 .await?
207 .request(DropPartitionRequest::new(
208 table_path,
209 partition_spec,
210 ignore_if_not_exists,
211 ))
212 .await?;
213 Ok(())
214 }
215
216 pub async fn table_exists(&self, table_path: &TablePath) -> Result<bool> {
218 let response = self
219 .admin_gateway()
220 .await?
221 .request(TableExistsRequest::new(table_path))
222 .await?;
223 Ok(response.exists)
224 }
225
226 pub async fn drop_database(
228 &self,
229 database_name: &str,
230 ignore_if_not_exists: bool,
231 cascade: bool,
232 ) -> Result<()> {
233 let _response = self
234 .admin_gateway()
235 .await?
236 .request(DropDatabaseRequest::new(
237 database_name,
238 ignore_if_not_exists,
239 cascade,
240 ))
241 .await?;
242 Ok(())
243 }
244
245 pub async fn list_databases(&self) -> Result<Vec<String>> {
247 let response = self
248 .admin_gateway()
249 .await?
250 .request(ListDatabasesRequest::new())
251 .await?;
252 Ok(response.database_name)
253 }
254
255 pub async fn database_exists(&self, database_name: &str) -> Result<bool> {
257 let response = self
258 .admin_gateway()
259 .await?
260 .request(DatabaseExistsRequest::new(database_name))
261 .await?;
262 Ok(response.exists)
263 }
264
265 pub async fn get_database_info(&self, database_name: &str) -> Result<DatabaseInfo> {
267 let request = GetDatabaseInfoRequest::new(database_name);
268 let response = self.admin_gateway().await?.request(request).await?;
269
270 let database_descriptor = DatabaseDescriptor::from_json_bytes(&response.database_json)?;
272
273 Ok(DatabaseInfo::new(
274 database_name.to_string(),
275 database_descriptor,
276 response.created_time,
277 response.modified_time,
278 ))
279 }
280
281 pub async fn get_server_nodes(&self) -> Result<Vec<ServerNode>> {
284 self.metadata.reinit_cluster().await?;
285 Ok(self.metadata.get_cluster().get_server_nodes())
286 }
287
288 pub async fn get_latest_lake_snapshot(&self, table_path: &TablePath) -> Result<LakeSnapshot> {
290 let response = self
291 .admin_gateway()
292 .await?
293 .request(GetLatestLakeSnapshotRequest::new(table_path))
294 .await?;
295
296 let mut table_buckets_offset = HashMap::new();
298 for bucket_snapshot in response.bucket_snapshots {
299 let table_bucket = TableBucket::new_with_partition(
300 response.table_id,
301 bucket_snapshot.partition_id,
302 bucket_snapshot.bucket_id,
303 );
304 if let Some(log_offset) = bucket_snapshot.log_offset {
305 table_buckets_offset.insert(table_bucket, log_offset);
306 }
307 }
308
309 Ok(LakeSnapshot::new(
310 response.snapshot_id,
311 table_buckets_offset,
312 ))
313 }
314
315 pub async fn list_offsets(
318 &self,
319 table_path: &TablePath,
320 buckets_id: &[BucketId],
321 offset_spec: OffsetSpec,
322 ) -> Result<HashMap<i32, i64>> {
323 self.do_list_offsets(table_path, None, buckets_id, offset_spec)
324 .await
325 }
326
327 pub async fn list_partition_offsets(
330 &self,
331 table_path: &TablePath,
332 partition_name: &str,
333 buckets_id: &[BucketId],
334 offset_spec: OffsetSpec,
335 ) -> Result<HashMap<i32, i64>> {
336 self.do_list_offsets(table_path, Some(partition_name), buckets_id, offset_spec)
337 .await
338 }
339
340 async fn do_list_offsets(
341 &self,
342 table_path: &TablePath,
343 partition_name: Option<&str>,
344 buckets_id: &[BucketId],
345 offset_spec: OffsetSpec,
346 ) -> Result<HashMap<i32, i64>> {
347 if buckets_id.is_empty() {
348 return Err(Error::IllegalArgument {
349 message: "Buckets are empty.".to_string(),
350 });
351 }
352
353 self.metadata.update_table_metadata(table_path).await?;
355
356 let cluster = self.metadata.get_cluster();
357 let table_id = cluster.get_table(table_path)?.table_id;
358
359 let partition_id = if let Some(name) = partition_name {
361 let physical_table_path = Arc::new(PhysicalTablePath::of_partitioned(
362 Arc::new(table_path.clone()),
363 Some(name.to_string()),
364 ));
365
366 self.metadata
368 .update_physical_table_metadata(std::slice::from_ref(&physical_table_path))
369 .await?;
370
371 let cluster = self.metadata.get_cluster();
372 Some(
373 cluster
374 .get_partition_id(&physical_table_path)
375 .ok_or_else(|| {
376 Error::partition_not_exist(format!(
377 "Partition '{name}' not found for table '{table_path}'"
378 ))
379 })?,
380 )
381 } else {
382 None
383 };
384
385 let requests_by_server =
387 self.prepare_list_offsets_requests(table_id, partition_id, buckets_id, offset_spec)?;
388
389 let response_futures = self.send_list_offsets_request(requests_by_server).await?;
391
392 let mut results = HashMap::new();
393
394 for response_future in response_futures {
395 let offsets = response_future.await.map_err(|e| Error::UnexpectedError {
396 message: "Fail to get result for list offsets.".to_string(),
397 source: Some(Box::new(e)),
398 })?;
399 results.extend(offsets?);
400 }
401 Ok(results)
402 }
403
404 fn prepare_list_offsets_requests(
405 &self,
406 table_id: TableId,
407 partition_id: Option<PartitionId>,
408 buckets: &[BucketId],
409 offset_spec: OffsetSpec,
410 ) -> Result<HashMap<i32, ListOffsetsRequest>> {
411 let cluster = self.metadata.get_cluster();
412 let mut node_for_bucket_list: HashMap<i32, Vec<BucketId>> = HashMap::new();
413
414 for bucket_id in buckets {
415 let table_bucket = TableBucket::new_with_partition(table_id, partition_id, *bucket_id);
416 let leader = cluster.leader_for(&table_bucket).ok_or_else(|| {
417 Error::UnexpectedError {
419 message: format!("No leader found for table bucket: {table_bucket}."),
420 source: None,
421 }
422 })?;
423
424 node_for_bucket_list
425 .entry(leader.id())
426 .or_default()
427 .push(*bucket_id);
428 }
429
430 let mut list_offsets_requests = HashMap::new();
431 for (leader_id, bucket_ids) in node_for_bucket_list {
432 let request =
433 ListOffsetsRequest::new(table_id, partition_id, bucket_ids, offset_spec.clone());
434 list_offsets_requests.insert(leader_id, request);
435 }
436 Ok(list_offsets_requests)
437 }
438
439 async fn send_list_offsets_request(
440 &self,
441 request_map: HashMap<i32, ListOffsetsRequest>,
442 ) -> Result<Vec<JoinHandle<Result<HashMap<i32, i64>>>>> {
443 let mut tasks = Vec::new();
444
445 for (leader_id, request) in request_map {
446 let rpc_client = self.rpc_client.clone();
447 let metadata = self.metadata.clone();
448
449 let task = tokio::spawn(async move {
450 let cluster = metadata.get_cluster();
451 let tablet_server = cluster.get_tablet_server(leader_id).ok_or_else(|| {
452 Error::leader_not_available(format!(
453 "Tablet server {leader_id} is not found in metadata cache."
454 ))
455 })?;
456 let connection = rpc_client.get_connection(tablet_server).await?;
457 let list_offsets_response = connection.request(request).await?;
458 list_offsets_response.offsets()
459 });
460 tasks.push(task);
461 }
462 Ok(tasks)
463 }
464}